Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ target/
profile_default/
ipython_config.py


data/
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
Expand Down
97 changes: 83 additions & 14 deletions cbsurge/azure/blob_storage.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
import logging
import os

import aiofiles
from azure.storage.blob.aio import ContainerClient
from tqdm.asyncio import tqdm
from tqdm.asyncio import tqdm_asyncio, tqdm

from cbsurge.constants import AZURE_BLOB_CONTAINER_NAME

Expand All @@ -19,17 +20,19 @@ class AzureBlobStorageManager:
await az.download_blob(blob_name, local_directory)
await az.download_files(azure_directory, local_directory)
"""
def __aenter__(self):
return self

def __aexit__(self, exc_type, exc_val, exc_tb):
return self.close()

def __init__(self, conn_str):
logging.info("Initializing Azure Blob Storage Manager")
self.conn_str = conn_str
self.container_client = ContainerClient.from_connection_string(conn_str=conn_str, container_name=AZURE_BLOB_CONTAINER_NAME)

async def __aenter__(self):
self.container_client = ContainerClient.from_connection_string(conn_str=self.conn_str, container_name=AZURE_BLOB_CONTAINER_NAME)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.container_client:
return await self.close()

async def upload_blob(self, file_path=None, blob_name=None):
"""
Upload a file to Azure Blob Storage.
Expand All @@ -47,7 +50,7 @@ async def upload_blob(self, file_path=None, blob_name=None):
file_size = os.path.getsize(file_path)

async def __progress__(current, total):
tqdm(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current)
tqdm_asyncio(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current)

async with aiofiles.open(file_path, "rb") as data:
await blob_client.upload_blob(data, overwrite=True, max_concurrency=4, blob_type="BlockBlob", length=file_size, progress_hook=__progress__)
Expand All @@ -69,30 +72,42 @@ async def upload_files(self, local_directory=None, azure_directory=None):
for root, _, files in os.walk(local_directory):
for file in files:
async def __progress__(current, total):
tqdm(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current)
tqdm_asyncio(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current)
file_path = os.path.join(root, file)
blob_name = f"{azure_directory}/{file}"
await self.upload_blob(file_path=file_path, blob_name=blob_name)
return



async def download_blob(self, blob_name=None, local_directory=None):
"""
Download a blob from Azure Blob Storage.

Args:
blob_name: (str) The name of the blob to download.
local_directory: (str) The local path to save the downloaded blob. If not provided, the blob will be saved in the current working directory.
Returns:

Returns:
str: The path to the downloaded file.
"""
logging.info("Downloading blob: %s", blob_name)
blob_client = self.container_client.get_blob_client(blob=blob_name)
file_name = blob_name.split("/")[-1]
if local_directory:
os.makedirs(local_directory, exist_ok=True)
file_name = f"{local_directory}/{file_name}"

blob_properties = await blob_client.get_blob_properties()
total_size = blob_properties.size

async with aiofiles.open(file_name, "wb") as data:
blob = await blob_client.download_blob()
await data.write(await blob.readall())
progress = tqdm_asyncio(total=total_size, unit="B", unit_scale=True, desc=file_name)
async for chunk in blob.chunks():
await data.write(chunk)
progress.update(len(chunk))
progress.close()
logging.info("Download completed for blob: %s", blob_name)
return file_name

Expand All @@ -106,10 +121,65 @@ async def download_files(self, azure_directory=None, local_directory=None):
Returns:

"""
async for blob in self.container_client.list_blobs(name_starts_with=azure_directory):
await self.download_blob(blob_name=blob.name, local_directory=local_directory)
blobs = await self.list_blobs(prefix=azure_directory)
for blob in blobs:
await self.download_blob(blob_name=blob, local_directory=local_directory)
return

async def list_blobs(self, prefix=None):
"""
List blobs in the Azure Blob Storage container.
Args:
prefix: (str) The prefix to filter blobs by.

Returns:
"""
return [blob.name async for blob in self.container_client.list_blobs(name_starts_with=prefix)]


async def copy_file(self, source_blob=None, destination_blob=None):
"""
Copy a file from one blob to another.
Args:
source_blob: (str) The name of the source blob to copy.
destination_blob: (str) The name of the destination blob to copy to.

Returns:

"""
logging.info("Copying blob: %s to %s", source_blob, destination_blob)
source_blob_client = self.container_client.get_blob_client(blob=source_blob)
destination_blob_client = self.container_client.get_blob_client(blob=destination_blob)
await destination_blob_client.start_copy_from_url(source_blob_client.url)
return destination_blob_client.url

async def delete_blob(self, blob_name=None):
"""
Delete a blob from Azure Blob Storage.
Args:
blob_name:

Returns:

"""
logging.info("Deleting blob: %s", blob_name)
blob_client = self.container_client.get_blob_client(blob=blob_name)
await blob_client.delete_blob()
return blob_name

async def rename_file(self, source_blob=None, destination_blob=None):
"""
Rename a blob file
Args:
source_blob:
destination_blob:

Returns:

"""
await self.copy_file(source_blob=source_blob, destination_blob=destination_blob)
await self.delete_blob(blob_name=source_blob)
return destination_blob

async def close(self):
"""
Expand All @@ -118,4 +188,3 @@ async def close(self):
"""
logging.info("Closing Azure Blob Storage Manager")
return await self.container_client.close()

16 changes: 16 additions & 0 deletions cbsurge/azure/fileshare.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,19 @@ async def download_file(self, file_name, download_path):
progress_bar.close()
return file_name

async def copy_file(self, source_file, destination_file):
"""
Copy a file from one location to another in the Azure File Share.
Args:
source_file: The file to copy.
destination_file: The destination file.

Returns:

"""
source_file_client = self.share_client.get_file_client(source_file)
destination_file_client = self.share_client.get_file_client(destination_file)

await destination_file_client.start_copy_from_url(source_file_client.url)
return destination_file

25 changes: 22 additions & 3 deletions cbsurge/exposure/population/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import click

from cbsurge.exposure.population.worldpop import download_data
from cbsurge.exposure.population.worldpop import population_sync, process_aggregates, download


@click.group()
Expand All @@ -15,5 +15,24 @@ def population():
@click.option('--force-reprocessing', help='Force reprocessing of data', is_flag=True)
@click.option('--country', help='The ISO3 code of the country to process the data for')
@click.option('--download-path', help='Download data locally', required=False)
def run_download(force_reprocessing, country, download_path):
asyncio.run(download_data(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path))
def sync(force_reprocessing, country, download_path):
asyncio.run(population_sync(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path))


@population.command()
@click.option('--country', help='The ISO3 code of the country to process the data for')
@click.option('--force-reprocessing', help='Force reprocessing of data', is_flag=True)
@click.option('--download-path', help='Download data locally', required=False)
@click.option('--age-group', help='The age group to process the data for', type=click.Choice(['child', 'active', 'elderly']))
@click.option('--sex', help='Path to the downloaded data', type=click.Choice(['male', 'female']))
def download(country, force_reprocessing, download_path, age_group, sex):
asyncio.run(download(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path, age_group=age_group, sex=sex))

@population.command()
@click.option('--country', help='The ISO3 code of the country to process the data for')
@click.option('--age-group', help='The age group to process the data for', type=click.Choice(['child', 'active', 'elderly']))
@click.option('--sex', help='Path to the downloaded data', type=click.Choice(['male', 'female']))
@click.option('--download-path', help='Download data locally', required=False)
@click.option('--force-reprocessing', help='Force reprocessing of data', is_flag=True)
def run_aggregate(country, age_group, sex, download_path, force_reprocessing):
asyncio.run(process_aggregates(country_code=country, age_group=age_group, sex=sex, download_path=download_path, force_reprocessing=force_reprocessing))
41 changes: 23 additions & 18 deletions cbsurge/exposure/population/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,27 @@
AZ_ROOT_FILE_PATH = "worldpop"
CONTAINER_NAME = "stacdata"
WORLDPOP_AGE_MAPPING = {
"0-12": 0,
"1-4": 1,
"5-9": 5,
"10-14": 10,
"15-19": 15,
"20-24": 20,
"25-29": 25,
"30-34": 30,
"35-39": 35,
"40-44": 40,
"45-49": 45,
"50-54": 50,
"55-59": 55,
"60-64": 60,
"65-69": 65,
"70-74": 70,
"75-79": 75,
"80+": 80
"child": [0, 14],
"active": [15, 64],
"elderly": [65, 100],
}
SEX_MAPPING = {
"M": "male",
"F": "female",
}

DATA_YEAR = 2020

AGESEX_STRUCTURE_COMBINATIONS = [
{"sexes": ["male"], "age_group": None, "label": "male_total"},
{"sexes": ["female"], "age_group": None, "label": "female_total"},
{"sexes": ["male"], "age_group": "active", "label": "male_active"},
{"sexes": ["female"], "age_group": "active", "label": "female_active"},
{"sexes": ["male"], "age_group": "child", "label": "male_child"},
{"sexes": ["female"], "age_group": "child", "label": "female_child"},
{"sexes": ["male"], "age_group": "elderly", "label": "male_elderly"},
{"sexes": ["female"], "age_group": "elderly", "label": "female_elderly"},
{"sexes": ["male", "female"], "age_group": "elderly", "label": "elderly_total"},
{"sexes": ["male", "female"], "age_group": "child", "label": "child_total"},
{"sexes": ["male", "female"], "age_group": "active", "label": "active_total"},
]
Loading
Loading