diff --git a/.gitignore b/.gitignore index bd941f36..92c0250c 100644 --- a/.gitignore +++ b/.gitignore @@ -84,6 +84,8 @@ target/ profile_default/ ipython_config.py + +data/ # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: diff --git a/cbsurge/azure/blob_storage.py b/cbsurge/azure/blob_storage.py index cd379548..4a67f992 100644 --- a/cbsurge/azure/blob_storage.py +++ b/cbsurge/azure/blob_storage.py @@ -1,9 +1,10 @@ +import asyncio import logging import os import aiofiles from azure.storage.blob.aio import ContainerClient -from tqdm.asyncio import tqdm +from tqdm.asyncio import tqdm_asyncio, tqdm from cbsurge.constants import AZURE_BLOB_CONTAINER_NAME @@ -19,17 +20,19 @@ class AzureBlobStorageManager: await az.download_blob(blob_name, local_directory) await az.download_files(azure_directory, local_directory) """ - def __aenter__(self): - return self - - def __aexit__(self, exc_type, exc_val, exc_tb): - return self.close() - def __init__(self, conn_str): logging.info("Initializing Azure Blob Storage Manager") self.conn_str = conn_str self.container_client = ContainerClient.from_connection_string(conn_str=conn_str, container_name=AZURE_BLOB_CONTAINER_NAME) + async def __aenter__(self): + self.container_client = ContainerClient.from_connection_string(conn_str=self.conn_str, container_name=AZURE_BLOB_CONTAINER_NAME) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self.container_client: + return await self.close() + async def upload_blob(self, file_path=None, blob_name=None): """ Upload a file to Azure Blob Storage. @@ -47,7 +50,7 @@ async def upload_blob(self, file_path=None, blob_name=None): file_size = os.path.getsize(file_path) async def __progress__(current, total): - tqdm(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current) + tqdm_asyncio(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current) async with aiofiles.open(file_path, "rb") as data: await blob_client.upload_blob(data, overwrite=True, max_concurrency=4, blob_type="BlockBlob", length=file_size, progress_hook=__progress__) @@ -69,20 +72,24 @@ async def upload_files(self, local_directory=None, azure_directory=None): for root, _, files in os.walk(local_directory): for file in files: async def __progress__(current, total): - tqdm(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current) + tqdm_asyncio(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current) file_path = os.path.join(root, file) blob_name = f"{azure_directory}/{file}" await self.upload_blob(file_path=file_path, blob_name=blob_name) return + + async def download_blob(self, blob_name=None, local_directory=None): """ Download a blob from Azure Blob Storage. + Args: blob_name: (str) The name of the blob to download. local_directory: (str) The local path to save the downloaded blob. If not provided, the blob will be saved in the current working directory. - Returns: + Returns: + str: The path to the downloaded file. """ logging.info("Downloading blob: %s", blob_name) blob_client = self.container_client.get_blob_client(blob=blob_name) @@ -90,9 +97,17 @@ async def download_blob(self, blob_name=None, local_directory=None): if local_directory: os.makedirs(local_directory, exist_ok=True) file_name = f"{local_directory}/{file_name}" + + blob_properties = await blob_client.get_blob_properties() + total_size = blob_properties.size + async with aiofiles.open(file_name, "wb") as data: blob = await blob_client.download_blob() - await data.write(await blob.readall()) + progress = tqdm_asyncio(total=total_size, unit="B", unit_scale=True, desc=file_name) + async for chunk in blob.chunks(): + await data.write(chunk) + progress.update(len(chunk)) + progress.close() logging.info("Download completed for blob: %s", blob_name) return file_name @@ -106,10 +121,65 @@ async def download_files(self, azure_directory=None, local_directory=None): Returns: """ - async for blob in self.container_client.list_blobs(name_starts_with=azure_directory): - await self.download_blob(blob_name=blob.name, local_directory=local_directory) + blobs = await self.list_blobs(prefix=azure_directory) + for blob in blobs: + await self.download_blob(blob_name=blob, local_directory=local_directory) return + async def list_blobs(self, prefix=None): + """ + List blobs in the Azure Blob Storage container. + Args: + prefix: (str) The prefix to filter blobs by. + + Returns: + """ + return [blob.name async for blob in self.container_client.list_blobs(name_starts_with=prefix)] + + + async def copy_file(self, source_blob=None, destination_blob=None): + """ + Copy a file from one blob to another. + Args: + source_blob: (str) The name of the source blob to copy. + destination_blob: (str) The name of the destination blob to copy to. + + Returns: + + """ + logging.info("Copying blob: %s to %s", source_blob, destination_blob) + source_blob_client = self.container_client.get_blob_client(blob=source_blob) + destination_blob_client = self.container_client.get_blob_client(blob=destination_blob) + await destination_blob_client.start_copy_from_url(source_blob_client.url) + return destination_blob_client.url + + async def delete_blob(self, blob_name=None): + """ + Delete a blob from Azure Blob Storage. + Args: + blob_name: + + Returns: + + """ + logging.info("Deleting blob: %s", blob_name) + blob_client = self.container_client.get_blob_client(blob=blob_name) + await blob_client.delete_blob() + return blob_name + + async def rename_file(self, source_blob=None, destination_blob=None): + """ + Rename a blob file + Args: + source_blob: + destination_blob: + + Returns: + + """ + await self.copy_file(source_blob=source_blob, destination_blob=destination_blob) + await self.delete_blob(blob_name=source_blob) + return destination_blob async def close(self): """ @@ -118,4 +188,3 @@ async def close(self): """ logging.info("Closing Azure Blob Storage Manager") return await self.container_client.close() - diff --git a/cbsurge/azure/fileshare.py b/cbsurge/azure/fileshare.py index a70606c8..d193cb00 100644 --- a/cbsurge/azure/fileshare.py +++ b/cbsurge/azure/fileshare.py @@ -138,3 +138,19 @@ async def download_file(self, file_name, download_path): progress_bar.close() return file_name + async def copy_file(self, source_file, destination_file): + """ + Copy a file from one location to another in the Azure File Share. + Args: + source_file: The file to copy. + destination_file: The destination file. + + Returns: + + """ + source_file_client = self.share_client.get_file_client(source_file) + destination_file_client = self.share_client.get_file_client(destination_file) + + await destination_file_client.start_copy_from_url(source_file_client.url) + return destination_file + diff --git a/cbsurge/exposure/population/__init__.py b/cbsurge/exposure/population/__init__.py index 7cda60e5..8698d594 100644 --- a/cbsurge/exposure/population/__init__.py +++ b/cbsurge/exposure/population/__init__.py @@ -2,7 +2,7 @@ import click -from cbsurge.exposure.population.worldpop import download_data +from cbsurge.exposure.population.worldpop import population_sync, process_aggregates, download @click.group() @@ -15,5 +15,24 @@ def population(): @click.option('--force-reprocessing', help='Force reprocessing of data', is_flag=True) @click.option('--country', help='The ISO3 code of the country to process the data for') @click.option('--download-path', help='Download data locally', required=False) -def run_download(force_reprocessing, country, download_path): - asyncio.run(download_data(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path)) \ No newline at end of file +def sync(force_reprocessing, country, download_path): + asyncio.run(population_sync(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path)) + + +@population.command() +@click.option('--country', help='The ISO3 code of the country to process the data for') +@click.option('--force-reprocessing', help='Force reprocessing of data', is_flag=True) +@click.option('--download-path', help='Download data locally', required=False) +@click.option('--age-group', help='The age group to process the data for', type=click.Choice(['child', 'active', 'elderly'])) +@click.option('--sex', help='Path to the downloaded data', type=click.Choice(['male', 'female'])) +def download(country, force_reprocessing, download_path, age_group, sex): + asyncio.run(download(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path, age_group=age_group, sex=sex)) + +@population.command() +@click.option('--country', help='The ISO3 code of the country to process the data for') +@click.option('--age-group', help='The age group to process the data for', type=click.Choice(['child', 'active', 'elderly'])) +@click.option('--sex', help='Path to the downloaded data', type=click.Choice(['male', 'female'])) +@click.option('--download-path', help='Download data locally', required=False) +@click.option('--force-reprocessing', help='Force reprocessing of data', is_flag=True) +def run_aggregate(country, age_group, sex, download_path, force_reprocessing): + asyncio.run(process_aggregates(country_code=country, age_group=age_group, sex=sex, download_path=download_path, force_reprocessing=force_reprocessing)) \ No newline at end of file diff --git a/cbsurge/exposure/population/constants.py b/cbsurge/exposure/population/constants.py index a86bb348..28e778a2 100644 --- a/cbsurge/exposure/population/constants.py +++ b/cbsurge/exposure/population/constants.py @@ -3,22 +3,27 @@ AZ_ROOT_FILE_PATH = "worldpop" CONTAINER_NAME = "stacdata" WORLDPOP_AGE_MAPPING = { - "0-12": 0, - "1-4": 1, - "5-9": 5, - "10-14": 10, - "15-19": 15, - "20-24": 20, - "25-29": 25, - "30-34": 30, - "35-39": 35, - "40-44": 40, - "45-49": 45, - "50-54": 50, - "55-59": 55, - "60-64": 60, - "65-69": 65, - "70-74": 70, - "75-79": 75, - "80+": 80 + "child": [0, 14], + "active": [15, 64], + "elderly": [65, 100], } +SEX_MAPPING = { + "M": "male", + "F": "female", +} + +DATA_YEAR = 2020 + +AGESEX_STRUCTURE_COMBINATIONS = [ + {"sexes": ["male"], "age_group": None, "label": "male_total"}, + {"sexes": ["female"], "age_group": None, "label": "female_total"}, + {"sexes": ["male"], "age_group": "active", "label": "male_active"}, + {"sexes": ["female"], "age_group": "active", "label": "female_active"}, + {"sexes": ["male"], "age_group": "child", "label": "male_child"}, + {"sexes": ["female"], "age_group": "child", "label": "female_child"}, + {"sexes": ["male"], "age_group": "elderly", "label": "male_elderly"}, + {"sexes": ["female"], "age_group": "elderly", "label": "female_elderly"}, + {"sexes": ["male", "female"], "age_group": "elderly", "label": "elderly_total"}, + {"sexes": ["male", "female"], "age_group": "child", "label": "child_total"}, + {"sexes": ["male", "female"], "age_group": "active", "label": "active_total"}, + ] \ No newline at end of file diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index ff4a56f8..7bf01284 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -5,14 +5,20 @@ import tempfile from asyncio import subprocess from html.parser import HTMLParser +from concurrent.futures import ThreadPoolExecutor +from typing import List, Optional + import aiofiles import httpx - +import rasterio +from rasterio.windows import Window +import numpy as np from tqdm.asyncio import tqdm_asyncio from cbsurge.azure.blob_storage import AzureBlobStorageManager -from cbsurge.exposure.population.constants import AZ_ROOT_FILE_PATH +from cbsurge.exposure.population.constants import AZ_ROOT_FILE_PATH, WORLDPOP_AGE_MAPPING, DATA_YEAR, \ + AGESEX_STRUCTURE_COMBINATIONS, SEX_MAPPING logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logging.getLogger("azure").setLevel(logging.WARNING) @@ -64,8 +70,9 @@ def chunker_function(iterable, chunk_size=4): yield iterable[i:i + chunk_size] -async def get_available_data(country_code=None, year="2020"): +async def get_available_data(country_code=None, year=DATA_YEAR): """ + Return the Country Codes and IDs of the pages in worldpop where the Age-Sex structures are Args: country_code: The country code for which to fetch data year: (Not implemented) The year for which to fetch data @@ -186,8 +193,15 @@ async def process_single_file(file_url=None, storage_manager=None, country_code= try: async with httpx.AsyncClient() as client: + age = file_name.split("_")[2] + sex = SEX_MAPPING[file_name.split("_")[1].upper()] + + for age_group, age_range in WORLDPOP_AGE_MAPPING.items(): + if age_range[0] <= int(age) <= age_range[1]: + age = age_group + break cog_exists = await check_cog_exists(storage_manager=storage_manager, - blob_path=f"{AZ_ROOT_FILE_PATH}/{year}/{country_code}/{file_name}") + blob_path=f"{AZ_ROOT_FILE_PATH}/{year}/{country_code}/{sex}/{age}/{file_name}") if cog_exists and not force_reprocessing: logging.info("COG already exists in Azure, skipping upload: %s", file_name) return @@ -209,20 +223,21 @@ async def process_single_file(file_url=None, storage_manager=None, country_code= logging.info("Converting file to COG: %s", cog_path) await convert_to_cog(input_file=temp_file_path, output_file=cog_path) await validate_cog(file_path=cog_path) + if download_path: try: if not os.path.exists(download_path): logging.info("Download path does not exist. Creating download path: %s", download_path) os.makedirs(download_path, exist_ok=True) logging.info("Copying COG file locally: %s", cog_path) - os.makedirs(f"{download_path}/{year}/{country_code}", exist_ok=True) - shutil.move(cog_path, f"{download_path}/{year}/{country_code}/{file_name}") - logging.info("Successfully copied COG file: %s", f"{download_path}/{year}/{country_code}/{file_name}") + os.makedirs(f"{download_path}/{year}/{country_code}/{sex}/{age}", exist_ok=True) + shutil.move(cog_path, f"{download_path}/{year}/{country_code}/{sex}/{age}/{file_name}") + logging.info("Successfully copied COG file: %s", f"{download_path}/{year}/{country_code}/{sex}/{age}/{file_name}") except Exception as e: raise Exception(f"Error copying COG file: {e}") else: logging.info("Uploading COG file to Azure: %s", cog_path) - await storage_manager.upload_blob(file_path=cog_path, blob_name=f"{AZ_ROOT_FILE_PATH}/{year}/{country_code}/{file_name}") + await storage_manager.upload_blob(file_path=cog_path, blob_name=f"{AZ_ROOT_FILE_PATH}/{year}/{country_code}/{sex}/{age}/{file_name}") logging.info("Successfully processed file: %s", file_name) @@ -246,8 +261,23 @@ async def get_links_from_table(data_id=None): return [] return await extract_links_from_table(response.text) +async def download(country_code=None, year=DATA_YEAR, force_reprocessing=False, download_path=None, sex=None, age_group=None): + """ + Args: + country_code: The country code of the country intended to be downloaded + year: + force_reprocessing: + download_path: + sex: + age_group: -async def download_data(country_code=None, year="2020", force_reprocessing=False, download_path=None): + Returns: + + """ + # TODO: Should try to download data from azure, if the data is available + pass + +async def population_sync(country_code=None, year=DATA_YEAR, force_reprocessing=False, download_path=None): """ Download all available data for a given country and year. Args: @@ -263,13 +293,11 @@ async def download_data(country_code=None, year="2020", force_reprocessing=False available_data = await get_available_data(country_code=country_code, year=year) storage_manager = AzureBlobStorageManager(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING")) for country_code, country_id in available_data.items(): - # if country_code == "RUS": - # continue + if country_code == "RUS": + continue logging.info("Processing country: %s", country_code) file_links = await get_links_from_table(data_id=country_id) for i, file_urls_chunk in enumerate(chunker_function(file_links, chunk_size=4)): - # if i > 0: - # break logging.info("Processing chunk %d for country: %s", i + 1, country_code) # Create a fresh list of tasks for each file chunk tasks = [process_single_file(file_url=url, @@ -284,10 +312,205 @@ async def download_data(country_code=None, year="2020", force_reprocessing=False for result in results: if isinstance(result, Exception): logging.error("Error processing file: %s", result) - + logging.info("Data download complete for country: %s", country_code) + logging.info("Starting aggregate processing for country: %s", country_code) + await process_aggregates(country_code=country_code, download_path=download_path, force_reprocessing=force_reprocessing, ) + # if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code): + # logging.info("Aggregate files already exist for country: %s", country_code) + # else: + + # logging.info("Aggregate processing complete for country: %s", country_code) # Close the storage manager connection after all files have been processed logging.info("Closing storage manager after processing all files") await storage_manager.close() + +async def check_aggregates_exist(storage_manager=AzureBlobStorageManager(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING")), country_code=None): + """ + Check if aggregate files exist for a given country code. + Args: + storage_manager: AzStorageManager instance + country_code: The country code to check for + + Returns: True if all aggregate files exist, False otherwise + + """ + assert storage_manager is not None, "storage_manager is required" + assert country_code is not None, "country_code is required" + logging.info("Checking if aggregate files exist for country: %s", country_code) + for combo in AGESEX_STRUCTURE_COMBINATIONS: + blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{country_code}_{combo['label']}.tif" + cog_exists = await check_cog_exists(storage_manager=storage_manager, blob_path=blob_path) + if not cog_exists: + logging.info("Aggregate file does not exist: %s", blob_path) + return False + logging.info("All aggregate files exist for country: %s", country_code) + return True + +def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): + """ + Sum multiple raster files and save the result to an output file, processing in blocks. + If the data is not blocked, create blocks within the function. + + Args: + input_file_paths (list of str): Paths to input raster files. + output_file_path (str): Path to save the summed raster file. + block_size (tuple): Tuple representing the block size (rows, cols). Default is (256, 256). + + Returns: + None + """ + logging.info("Starting create_sum function") + logging.info("Input files: %s", input_file_paths) + logging.info("Output file: %s", output_file_path) + logging.info("Block size: %s", block_size) + # Open all input files + try: + datasets = [rasterio.open(file_path) for file_path in input_file_paths] + except Exception as e: + logging.error("Error opening input files: %s", e) + return + + logging.info("Successfully opened input raster files") + + # Use the first dataset as reference for metadata + ref_meta = datasets[0].meta.copy() + ref_meta.update(count=1, dtype="float32", nodata=0, compress="ZSTD") + + rows, cols = datasets[0].shape + + logging.info("Raster dimensions: %d rows x %d cols", rows, cols) + + # Create the output file + try: + with rasterio.open(output_file_path, "w", **ref_meta) as dst: + logging.info("Output file created successfully") + + # Process raster in blocks + for i in range(0, rows, block_size[0]): + for j in range(0, cols, block_size[1]): + window = Window(j, i, min(block_size[1], cols - j), min(block_size[0], rows - i)) + logging.info("Processing block: row %d to %d, col %d to %d", i, i + block_size[0], j, + j + block_size[1]) + + output_data = np.zeros((window.height, window.width), dtype=np.float32) + + for idx, src in enumerate(datasets): + try: + input_data = src.read(1, window=window) + + input_data = np.where(input_data == src.nodata, 0, input_data) + + output_data += input_data + + logging.debug("Added data from raster %d", idx + 1) + except Exception as e: + logging.error("Error reading block from raster %d: %s", idx + 1, e) + + dst.write(output_data, window=window, indexes=1) + logging.info("Finished processing all blocks") + except Exception as e: + logging.error("Error creating or writing to output file: %s", e) + finally: + # Close all input datasets + for src in datasets: + src.close() + logging.info("Closed all input raster files") + + logging.info("create_sum function completed successfully") + + + +async def process_aggregates(country_code: str, sex: Optional[str] = None, age_group: Optional[str] = None, year="2020", download_path=None, force_reprocessing=False): + """ + Process aggregate files for combinations of sex and age groups, or specific arguments passed. + Args: + country_code (Optional[str]): Country code for processing. If not provided, it will process all the countries available from the get_available_data() function. + sex (Optional[str]): Sex to process (male or female). + age_group (Optional[str]): Age group to process (child, active, elderly). + year (Optional[str]): (Unimplemented). The year for which the data should be produced for + download_path (Optional[str]): Local path to download the COG files to. If provided, the files will not be uploaded to Azure. + force_reprocessing (Optional[bool]): Force reprocessing of the files even if they already exist in Azure. + """ + async with AzureBlobStorageManager(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING")) as storage_manager: + + if not country_code: + country_codes = list((await get_available_data()).keys()) + else: + country_codes = [country_code] + for c_code in country_codes: + logging.info("Processing aggregate files for country: %s", c_code) + async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, output_label: str=None): + """ + Processes and sums files for specified sexes and age groups. + Args: + sexes (List[str]): List of sexes (e.g., ['male'], ['female'], or ['male', 'female']). + age_grouping (Optional[str]): Age group to process. + output_label (str): Label for the output file. + """ + # Construct paths + paths = [f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{c_code}/{s}" for s in sexes] + if age_grouping: + assert age_grouping in WORLDPOP_AGE_MAPPING, "Invalid age group provided" + paths = [f"{path}/{age_grouping}/" for path in paths] + + # Fetch blobs + blobs = [] + for path in paths: + blobs += await storage_manager.list_blobs(path) + if not blobs: + logging.warning("No blobs found for paths: %s", paths) + return + + # Download blobs and sum them + with tempfile.TemporaryDirectory() as temp_dir: + local_files = [] + # download blobs concurrently + async def create_local_files(blob): + local_file = os.path.join(temp_dir, os.path.basename(blob)) + await storage_manager.download_blob(blob, temp_dir) + local_files.append(local_file) + tasks = [create_local_files(blob) for blob in blobs] + await asyncio.gather(*tasks) + + output_file = f"{temp_dir}/{output_label}.tif" + with ThreadPoolExecutor() as executor: + executor.submit(create_sum, input_file_paths=local_files, output_file_path=output_file, block_size=(1028, 1028)) + + # Upload the result + blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{c_code}/aggregate/{c_code}_{output_label}.tif" + if download_path: + os.makedirs(f"{download_path}/{DATA_YEAR}/{c_code}/aggregate", exist_ok=True) + shutil.copy2(output_file, f"{download_path}/{DATA_YEAR}/{c_code}/aggregate/{c_code}_{output_label}.tif") + else: + await storage_manager.upload_blob(file_path=output_file, blob_name=blob_path) + logging.info("Processed and uploaded: %s", blob_path) + + if not force_reprocessing and await check_aggregates_exist(storage_manager=storage_manager, country_code=c_code): + logging.info("Aggregate files already exist for country: %s", c_code) + continue + # Dynamic argument-based processing + if sex and age_group: + label = f"{sex}_{age_group}" + logging.info("Processing for sex '%s' and age group '%s'", sex, age_group) + await process_group(sexes=[sex], age_grouping=age_group, output_label=label) + elif sex: + label = f"{sex}_total" + logging.info("Processing for sex '%s' (all age groups)", sex) + await process_group(sexes=[sex], output_label=label) + elif age_group: + label = f"{age_group}_total" + logging.info("Processing for age group '%s' (both sexes)", age_group) + await process_group(sexes=['male', 'female'], age_grouping=age_group, output_label=label) + else: + # Process predefined combinations + logging.info("Processing all predefined combinations...") + # random.shuffle(AGESEX_STRUCTURE_COMBINATIONS) + for combo in AGESEX_STRUCTURE_COMBINATIONS: + logging.info("Processing %s...", combo["label"]) + await process_group(sexes=combo["sexes"], age_grouping=combo["age_group"], output_label=combo["label"]) + logging.info("All processing complete for country: %s", c_code) + + if __name__ == "__main__": - asyncio.run(download_data(force_reprocessing=False)) \ No newline at end of file + pass \ No newline at end of file