From b9709c592b8ced520f01573eb96d1aba8648fe40 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Fri, 13 Dec 2024 11:29:58 +0300 Subject: [PATCH 01/24] fix: cleanup and aggregation --- cbsurge/azure.py | 142 ----------------------- cbsurge/azure/blob_storage.py | 54 ++++++--- cbsurge/exposure/population/__init__.py | 12 +- cbsurge/exposure/population/constants.py | 21 +--- cbsurge/exposure/population/worldpop.py | 85 ++++++++++++-- cbsurge/fetch_data.py | 47 -------- 6 files changed, 131 insertions(+), 230 deletions(-) delete mode 100644 cbsurge/azure.py delete mode 100644 cbsurge/fetch_data.py diff --git a/cbsurge/azure.py b/cbsurge/azure.py deleted file mode 100644 index 621dc257..00000000 --- a/cbsurge/azure.py +++ /dev/null @@ -1,142 +0,0 @@ -import logging -import os - -import aiofiles -from azure.storage.blob.aio import BlobServiceClient -from azure.storage.fileshare.aio import ShareServiceClient -from rasterio.rio.blocks import blocks - -from tqdm.asyncio import tqdm - -CONTAINER_NAME = "stacdata" -FILE_SHARE_NAME = "cbrapida" - -class AzStorageManager: - - def __aenter__(self): - return self - - def __aexit__(self, exc_type, exc_val, exc_tb): - return self.close() - - def __init__(self, conn_str): - logging.info("Initializing Azure Storage Manager") - self.conn_str = conn_str - self.blob_service_client = BlobServiceClient.from_connection_string(conn_str) - self.share_service_client = ShareServiceClient.from_connection_string(conn_str) - - - async def upload_blob(self, file_path=None, blob_name=None): - """ - Upload a file to Azure Blob Storage. - Args: - file_path: (str) The path to the file to upload - blob_name: (str) The name of the blob to create - - Returns: - - """ - - container_client = self.blob_service_client.get_container_client(CONTAINER_NAME) - logging.info("Starting upload for blob: %s", blob_name) - blob_client = container_client.get_blob_client(blob_name) - - # Get the file size for progress tracking - file_size = os.path.getsize(file_path) - - async def __progress__(current, total): - tqdm(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current) - - async with aiofiles.open(file_path, "rb") as data: - await blob_client.upload_blob(data, overwrite=True, max_concurrency=4, blob_type="BlockBlob", length=file_size, progress_hook=__progress__) - - logging.info("Upload completed for blob: %s", blob_name) - return blob_client.url, await container_client.close() - - - - async def upload_files(self, local_directory=None, azure_directory=None): - """ - Upload multiple files to Azure Blob Storage. - Args: - local_directory: (str) The local directory containing the files to upload - azure_directory: (str) The directory in Azure Blob Storage to upload the files to - - Returns: - - """ - - - container_client = self.blob_service_client.get_container_client(CONTAINER_NAME) - for root, _, files in os.walk(local_directory): - for file in files: - async def __progress__(current, total): - tqdm(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current) - local_file_path = os.path.join(root, file) - blob_name = os.path.join(azure_directory, file) - logging.info(f"Uploading {local_file_path} as {blob_name}...") - async with aiofiles.open(local_file_path, "rb") as data: - await container_client.upload_blob(name=blob_name, data=data, overwrite=True, max_concurrency=4, progress_hook=__progress__) - logging.info("Upload completed for all files") - return await container_client.close() - - - - async def upload_to_fileshare(self, local_file=None, file_name=None): - """ - Upload a file to Azure File Share. - Args: - file_name: - - Returns: - - """ - - async def __progress__(current, total): - tqdm(total=total, unit="B", unit_scale=True, desc=f"Uploading {file_name}").update(current) - share_client = self.share_service_client.get_share_client("cbrapida") - file_client = share_client.get_file_client(file_name) - logging.info("Starting upload for file: %s", file_name) - async with aiofiles.open(file_path, "rb") as data: - await file_client.upload_file(data, max_concurrency=4, length=os.path.getsize(file_path), progress_hook=__progress__) - logging.info("Upload completed for file: %s", file_name) - return file_client.url, await share_client.close() - - async def download_blob(self, blob_name=None, save_as=None): - """ - Download a blob from Azure Blob Storage. - Args: - save_as: (str) The path to save the downloaded blob - blob_name: (str) The name of the blob to download - - Returns: - - """ - container_client = self.blob_service_client.get_container_client(CONTAINER_NAME) - blob_client = container_client.get_blob_client(blob_name) - logging.info("Starting download for blob: %s", blob_name) - async with aiofiles.open(save_as, "wb") as data: - blob = await blob_client.download_blob() - await data.write(await blob.readall()) - logging.info("Download completed for blob: %s", blob_name) - - async def download_from_fileshare(self, file_name=None, save_as=None): - """ - Download a file from Azure File Share. - Args: - file_name: (str) The name of the file to download - save_as: (str) The path to save the downloaded file - """ - share_client = self.share_service_client.get_share_client(FILE_SHARE_NAME) - file_client = share_client.get_file_client(file_name) - logging.info("Starting download for file: %s", file_name) - async with aiofiles.open(save_as, "wb") as data: - file = await file_client.download_file() - await data.write(await file.readall()) - logging.info("Download completed for file: %s", file_name) - - - async def close(self): - logging.info("Closing Azure Storage Manager") - await self.blob_service_client.close() - await self.share_service_client.close() \ No newline at end of file diff --git a/cbsurge/azure/blob_storage.py b/cbsurge/azure/blob_storage.py index cd379548..58bf815c 100644 --- a/cbsurge/azure/blob_storage.py +++ b/cbsurge/azure/blob_storage.py @@ -1,11 +1,13 @@ +import asyncio import logging import os import aiofiles from azure.storage.blob.aio import ContainerClient -from tqdm.asyncio import tqdm +from tqdm.asyncio import tqdm_asyncio, tqdm from cbsurge.constants import AZURE_BLOB_CONTAINER_NAME +from cbsurge.exposure.population.constants import WORLDPOP_AGE_MAPPING class AzureBlobStorageManager: @@ -19,17 +21,19 @@ class AzureBlobStorageManager: await az.download_blob(blob_name, local_directory) await az.download_files(azure_directory, local_directory) """ - def __aenter__(self): - return self - - def __aexit__(self, exc_type, exc_val, exc_tb): - return self.close() - def __init__(self, conn_str): logging.info("Initializing Azure Blob Storage Manager") self.conn_str = conn_str self.container_client = ContainerClient.from_connection_string(conn_str=conn_str, container_name=AZURE_BLOB_CONTAINER_NAME) + async def __aenter__(self): + self.container_client = ContainerClient.from_connection_string(conn_str=self.conn_str, container_name=AZURE_BLOB_CONTAINER_NAME) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self.container_client: + return await self.close() + async def upload_blob(self, file_path=None, blob_name=None): """ Upload a file to Azure Blob Storage. @@ -47,7 +51,7 @@ async def upload_blob(self, file_path=None, blob_name=None): file_size = os.path.getsize(file_path) async def __progress__(current, total): - tqdm(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current) + tqdm_asyncio(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current) async with aiofiles.open(file_path, "rb") as data: await blob_client.upload_blob(data, overwrite=True, max_concurrency=4, blob_type="BlockBlob", length=file_size, progress_hook=__progress__) @@ -69,20 +73,24 @@ async def upload_files(self, local_directory=None, azure_directory=None): for root, _, files in os.walk(local_directory): for file in files: async def __progress__(current, total): - tqdm(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current) + tqdm_asyncio(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current) file_path = os.path.join(root, file) blob_name = f"{azure_directory}/{file}" await self.upload_blob(file_path=file_path, blob_name=blob_name) return + + async def download_blob(self, blob_name=None, local_directory=None): """ Download a blob from Azure Blob Storage. + Args: blob_name: (str) The name of the blob to download. local_directory: (str) The local path to save the downloaded blob. If not provided, the blob will be saved in the current working directory. - Returns: + Returns: + str: The path to the downloaded file. """ logging.info("Downloading blob: %s", blob_name) blob_client = self.container_client.get_blob_client(blob=blob_name) @@ -90,9 +98,17 @@ async def download_blob(self, blob_name=None, local_directory=None): if local_directory: os.makedirs(local_directory, exist_ok=True) file_name = f"{local_directory}/{file_name}" + + blob_properties = await blob_client.get_blob_properties() + total_size = blob_properties.size + async with aiofiles.open(file_name, "wb") as data: blob = await blob_client.download_blob() - await data.write(await blob.readall()) + progress = tqdm_asyncio(total=total_size, unit="B", unit_scale=True, desc=file_name) + async for chunk in blob.chunks(): + await data.write(chunk) + progress.update(len(chunk)) + progress.close() logging.info("Download completed for blob: %s", blob_name) return file_name @@ -106,10 +122,21 @@ async def download_files(self, azure_directory=None, local_directory=None): Returns: """ - async for blob in self.container_client.list_blobs(name_starts_with=azure_directory): - await self.download_blob(blob_name=blob.name, local_directory=local_directory) + blobs = await self.list_blobs(prefix=azure_directory) + for blob in blobs: + await self.download_blob(blob_name=blob, local_directory=local_directory) return + async def list_blobs(self, prefix=None): + """ + List blobs in the Azure Blob Storage container. + Args: + prefix: (str) The prefix to filter blobs by. + + Returns: + """ + return [blob.name async for blob in self.container_client.list_blobs(name_starts_with=prefix)] + async def close(self): """ @@ -118,4 +145,3 @@ async def close(self): """ logging.info("Closing Azure Blob Storage Manager") return await self.container_client.close() - diff --git a/cbsurge/exposure/population/__init__.py b/cbsurge/exposure/population/__init__.py index 7cda60e5..25743bf1 100644 --- a/cbsurge/exposure/population/__init__.py +++ b/cbsurge/exposure/population/__init__.py @@ -2,7 +2,7 @@ import click -from cbsurge.exposure.population.worldpop import download_data +from cbsurge.exposure.population.worldpop import download_data, process_aggregates @click.group() @@ -16,4 +16,12 @@ def population(): @click.option('--country', help='The ISO3 code of the country to process the data for') @click.option('--download-path', help='Download data locally', required=False) def run_download(force_reprocessing, country, download_path): - asyncio.run(download_data(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path)) \ No newline at end of file + asyncio.run(download_data(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path)) + + +@population.command() +@click.option('--country', help='The ISO3 code of the country to process the data for') +@click.option('--age-group', help='The age group to process the data for', type=click.Choice(['child', 'active', 'elderly'])) +@click.option('--sex', help='Path to the downloaded data', type=click.Choice(['M', 'F'])) +def run_aggregate(country, age_group, sex): + asyncio.run(process_aggregates(country_code=country, age_group=age_group, sex=sex)) \ No newline at end of file diff --git a/cbsurge/exposure/population/constants.py b/cbsurge/exposure/population/constants.py index a86bb348..25b49d91 100644 --- a/cbsurge/exposure/population/constants.py +++ b/cbsurge/exposure/population/constants.py @@ -3,22 +3,7 @@ AZ_ROOT_FILE_PATH = "worldpop" CONTAINER_NAME = "stacdata" WORLDPOP_AGE_MAPPING = { - "0-12": 0, - "1-4": 1, - "5-9": 5, - "10-14": 10, - "15-19": 15, - "20-24": 20, - "25-29": 25, - "30-34": 30, - "35-39": 35, - "40-44": 40, - "45-49": 45, - "50-54": 50, - "55-59": 55, - "60-64": 60, - "65-69": 65, - "70-74": 70, - "75-79": 75, - "80+": 80 + "child": [0, 14], + "active": [15, 64], + "elderly": [65, 100], } diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index ff4a56f8..90c9b5cc 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -5,14 +5,16 @@ import tempfile from asyncio import subprocess from html.parser import HTMLParser +from concurrent.futures import ThreadPoolExecutor import aiofiles import httpx - +import rasterio +import numpy as np from tqdm.asyncio import tqdm_asyncio from cbsurge.azure.blob_storage import AzureBlobStorageManager -from cbsurge.exposure.population.constants import AZ_ROOT_FILE_PATH +from cbsurge.exposure.population.constants import AZ_ROOT_FILE_PATH, WORLDPOP_AGE_MAPPING logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logging.getLogger("azure").setLevel(logging.WARNING) @@ -186,8 +188,14 @@ async def process_single_file(file_url=None, storage_manager=None, country_code= try: async with httpx.AsyncClient() as client: + age = file_name.split("_")[2] + sex = file_name.split("_")[1] + for age_group, age_range in WORLDPOP_AGE_MAPPING.items(): + if age_range[0] <= int(age) <= age_range[1]: + age = age_group + break cog_exists = await check_cog_exists(storage_manager=storage_manager, - blob_path=f"{AZ_ROOT_FILE_PATH}/{year}/{country_code}/{file_name}") + blob_path=f"{AZ_ROOT_FILE_PATH}/{year}/{country_code}/{sex.upper()}/{age}/{file_name}") if cog_exists and not force_reprocessing: logging.info("COG already exists in Azure, skipping upload: %s", file_name) return @@ -209,20 +217,21 @@ async def process_single_file(file_url=None, storage_manager=None, country_code= logging.info("Converting file to COG: %s", cog_path) await convert_to_cog(input_file=temp_file_path, output_file=cog_path) await validate_cog(file_path=cog_path) + if download_path: try: if not os.path.exists(download_path): logging.info("Download path does not exist. Creating download path: %s", download_path) os.makedirs(download_path, exist_ok=True) logging.info("Copying COG file locally: %s", cog_path) - os.makedirs(f"{download_path}/{year}/{country_code}", exist_ok=True) - shutil.move(cog_path, f"{download_path}/{year}/{country_code}/{file_name}") - logging.info("Successfully copied COG file: %s", f"{download_path}/{year}/{country_code}/{file_name}") + os.makedirs(f"{download_path}/{year}/{country_code}/{sex.upper()}/{age}", exist_ok=True) + shutil.move(cog_path, f"{download_path}/{year}/{country_code}/{sex.upper()}/{age}/{file_name}") + logging.info("Successfully copied COG file: %s", f"{download_path}/{year}/{country_code}/{sex.upper()}/{age}/{file_name}") except Exception as e: raise Exception(f"Error copying COG file: {e}") else: logging.info("Uploading COG file to Azure: %s", cog_path) - await storage_manager.upload_blob(file_path=cog_path, blob_name=f"{AZ_ROOT_FILE_PATH}/{year}/{country_code}/{file_name}") + await storage_manager.upload_blob(file_path=cog_path, blob_name=f"{AZ_ROOT_FILE_PATH}/{year}/{country_code}/{sex.upper()}/{age}/{file_name}") logging.info("Successfully processed file: %s", file_name) @@ -289,5 +298,67 @@ async def download_data(country_code=None, year="2020", force_reprocessing=False logging.info("Closing storage manager after processing all files") await storage_manager.close() + + +def create_sum(input_file_paths, output_file_path): + """ + Sum multiple raster files and save the result to an output file, processing in blocks. + + Args: + input_file_paths (list of str): Paths to input raster files. + output_file_path (str): Path to save the summed raster file. + + Returns: + None + """ + datasets = [rasterio.open(file_path) for file_path in input_file_paths] + + # Use the first dataset as reference for metadata + ref_meta = datasets[0].meta.copy() + ref_meta.update(dtype="float32", count=1, nodata=0) + + # Create the output dataset + with rasterio.open(output_file_path, "w", **ref_meta) as dst: + for ji, window in dst.block_windows(1): + output_data = np.zeros((window.height, window.width), dtype=np.float32) + for src in datasets: + input_data = src.read(1, window=window) + input_data = np.where(input_data == src.nodata, 0, input_data) + output_data += input_data + + # Write the summed block to the output file + dst.write(output_data, window=window, indexes=1) + + for src in datasets: + src.close() + + +async def process_aggregates(country_code=None, age_group=None, sex=None): + """ + Process the aggregate files. + Args: + country_code: The country code for which to process the data + age_group: The age group to process the data + sex: The sex to process the data for + """ + assert sex or age_group, "Either age or sex must be provided" + assert age_group in WORLDPOP_AGE_MAPPING, "Invalid age group provided" + async with AzureBlobStorageManager(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING")) as storage_manager: + logging.info("Processing aggregate files for country: %s", country_code) + if age_group: + # process the specified age group for both male and female + male_blobs = await storage_manager.list_blobs(f"{AZ_ROOT_FILE_PATH}/2020/{country_code}/M/{age_group}") + female_blobs = await storage_manager.list_blobs(f"{AZ_ROOT_FILE_PATH}/2020/{country_code}/F/{age_group}") + # print(blobs) + dataset_lists = [] + with tempfile.TemporaryDirectory(delete=False) as temp_dir: + for blob in male_blobs: + await storage_manager.download_blob(blob_name=blob, local_directory=temp_dir) + dataset_lists.append(f"{temp_dir}/{blob.split('/')[-1]}") + with ThreadPoolExecutor() as executor: + executor.submit(create_sum, dataset_lists, f"{temp_dir}/{country_code}_{age_group}_M.tif") + shutil.move(f"{temp_dir}/{country_code}_{age_group}_M.tif", f"data/{country_code}_{age_group}_M.tif") + pass + if __name__ == "__main__": asyncio.run(download_data(force_reprocessing=False)) \ No newline at end of file diff --git a/cbsurge/fetch_data.py b/cbsurge/fetch_data.py deleted file mode 100644 index 49bd66cf..00000000 --- a/cbsurge/fetch_data.py +++ /dev/null @@ -1,47 +0,0 @@ -# fetch data from azure. if data is not found, download it from the source - -import logging - -import httpx - -class WorldPopFetcher: - def __init__(self): - self.file_name = None - self.year = 2020 - self.country = None - self.azure_root_url = "https://undpgeohub.blob.core.windows.net/stacdata/worldpop" - self.worldpop_root_url = "https://hub.worldpop.org/geodata" - - def __construct_url__(self): - """ - Construct the URL to fetch the data from. - Returns: - """ - assert self.file_name or (self.year and self.country), "Either file_name or year and country must be provided" - - if not self.file_name: - return f"{self.worldpop_root_url}/{self.year}/{self.country}/" - return f"{self.azure_root_url}/{self.year}/{self.country}/{self.file_name}" - - - def fetch_data(self, file_name=None, url=None): - """ - Fetch data from Azure Blob Storage. If data is not found, download it from the source. - Returns: - """ - assert file_name or url, "Either file_name or url must be provided" - - if not file_name: - file_name = self.file_name - logging.info("Fetching data from Azure Blob Storage") - return file_name - - - -def main(): - worldpop_fetcher = WorldPopFetcher() - file = worldpop_fetcher.fetch_data(file_name="file_name") - print(file) - -if __name__ == "__main__": - main() \ No newline at end of file From 3d83c934e47e8415fb33811d161dedc78f8915ef Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Fri, 13 Dec 2024 21:18:11 +0300 Subject: [PATCH 02/24] WIP: process specific age and sex groups: --- cbsurge/exposure/population/worldpop.py | 56 ++++++++++++++++++------- 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 90c9b5cc..9552b392 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -337,28 +337,52 @@ async def process_aggregates(country_code=None, age_group=None, sex=None): """ Process the aggregate files. Args: - country_code: The country code for which to process the data - age_group: The age group to process the data - sex: The sex to process the data for + country_code: The country code for which to process the data. + age_group: The age group to process the data. + sex: The sex to process the data for. """ + assert country_code, "Country code must be provided" assert sex or age_group, "Either age or sex must be provided" - assert age_group in WORLDPOP_AGE_MAPPING, "Invalid age group provided" + async with AzureBlobStorageManager(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING")) as storage_manager: logging.info("Processing aggregate files for country: %s", country_code) - if age_group: - # process the specified age group for both male and female - male_blobs = await storage_manager.list_blobs(f"{AZ_ROOT_FILE_PATH}/2020/{country_code}/M/{age_group}") - female_blobs = await storage_manager.list_blobs(f"{AZ_ROOT_FILE_PATH}/2020/{country_code}/F/{age_group}") - # print(blobs) - dataset_lists = [] + + async def process_group(sex: str, age_group: str = None): + """ + Process the files for a specific sex (M/F) and optionally an age group. + """ + path = f"{AZ_ROOT_FILE_PATH}/2020/{country_code}/{sex}/" + if age_group: + assert age_group in WORLDPOP_AGE_MAPPING, "Invalid age group provided" + path += f"{age_group}" + + blobs = await storage_manager.list_blobs(path) + if not blobs: + logging.warning("No blobs found for path: %s", path) + return + + dataset_files = [] with tempfile.TemporaryDirectory(delete=False) as temp_dir: - for blob in male_blobs: + for blob in blobs: await storage_manager.download_blob(blob_name=blob, local_directory=temp_dir) - dataset_lists.append(f"{temp_dir}/{blob.split('/')[-1]}") + dataset_files.append(f"{temp_dir}/{blob.split('/')[-1]}") + + output_file = f"{temp_dir}/{country_code}_{age_group or 'ALL'}_{sex}.tif" with ThreadPoolExecutor() as executor: - executor.submit(create_sum, dataset_lists, f"{temp_dir}/{country_code}_{age_group}_M.tif") - shutil.move(f"{temp_dir}/{country_code}_{age_group}_M.tif", f"data/{country_code}_{age_group}_M.tif") - pass + executor.submit(create_sum, dataset_files, output_file) + # TODO: Upload the output file to Azure Blob Storage `aggregate` folder + shutil.move(output_file, f"data/{country_code}_{age_group or 'ALL'}_{sex}.tif") + + if sex and age_group: + await process_group(sex, age_group) + elif age_group: + await process_group('M', age_group) + await process_group('F', age_group) + elif sex: + await process_group(sex) + if __name__ == "__main__": - asyncio.run(download_data(force_reprocessing=False)) \ No newline at end of file + # asyncio.run(download_data(force_reprocessing=False)) + + create_sum(["/media/thuha/Data/worldpop_data/m/0-12/BDI_m_0_2020_constrained.tif", "/media/thuha/Data/worldpop_data/f/0-12/BDI_f_0_2020_constrained.tif"], "data/BDI_0-12.tif") \ No newline at end of file From dfb2b79aa52a0ccc37ddbe804d8421f14e2d4615 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Tue, 17 Dec 2024 16:45:21 +0300 Subject: [PATCH 03/24] run aggregations --- cbsurge/azure/blob_storage.py | 16 +++ cbsurge/azure/fileshare.py | 16 +++ cbsurge/exposure/population/constants.py | 2 + cbsurge/exposure/population/worldpop.py | 155 +++++++++++++++++------ 4 files changed, 149 insertions(+), 40 deletions(-) diff --git a/cbsurge/azure/blob_storage.py b/cbsurge/azure/blob_storage.py index 58bf815c..df3ba98a 100644 --- a/cbsurge/azure/blob_storage.py +++ b/cbsurge/azure/blob_storage.py @@ -138,6 +138,22 @@ async def list_blobs(self, prefix=None): return [blob.name async for blob in self.container_client.list_blobs(name_starts_with=prefix)] + async def copy_file(self, source_blob=None, destination_blob=None): + """ + Copy a file from one blob to another. + Args: + source_blob: (str) The name of the source blob to copy. + destination_blob: (str) The name of the destination blob to copy to. + + Returns: + + """ + logging.info("Copying blob: %s to %s", source_blob, destination_blob) + source_blob_client = self.container_client.get_blob_client(blob=source_blob) + destination_blob_client = self.container_client.get_blob_client(blob=destination_blob) + await destination_blob_client.start_copy_from_url(source_blob_client.url) + return destination_blob_client.url + async def close(self): """ Close the Azure Blob Storage Manager. diff --git a/cbsurge/azure/fileshare.py b/cbsurge/azure/fileshare.py index a70606c8..d193cb00 100644 --- a/cbsurge/azure/fileshare.py +++ b/cbsurge/azure/fileshare.py @@ -138,3 +138,19 @@ async def download_file(self, file_name, download_path): progress_bar.close() return file_name + async def copy_file(self, source_file, destination_file): + """ + Copy a file from one location to another in the Azure File Share. + Args: + source_file: The file to copy. + destination_file: The destination file. + + Returns: + + """ + source_file_client = self.share_client.get_file_client(source_file) + destination_file_client = self.share_client.get_file_client(destination_file) + + await destination_file_client.start_copy_from_url(source_file_client.url) + return destination_file + diff --git a/cbsurge/exposure/population/constants.py b/cbsurge/exposure/population/constants.py index 25b49d91..189eb5da 100644 --- a/cbsurge/exposure/population/constants.py +++ b/cbsurge/exposure/population/constants.py @@ -7,3 +7,5 @@ "active": [15, 64], "elderly": [65, 100], } +WORLDPOP_SEXES = ["M", "F"] +DATA_YEAR = 2020 \ No newline at end of file diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 9552b392..1f05eaee 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -6,15 +6,17 @@ from asyncio import subprocess from html.parser import HTMLParser from concurrent.futures import ThreadPoolExecutor +from typing import List, Optional import aiofiles import httpx import rasterio +from rasterio.windows import Window import numpy as np from tqdm.asyncio import tqdm_asyncio from cbsurge.azure.blob_storage import AzureBlobStorageManager -from cbsurge.exposure.population.constants import AZ_ROOT_FILE_PATH, WORLDPOP_AGE_MAPPING +from cbsurge.exposure.population.constants import AZ_ROOT_FILE_PATH, WORLDPOP_AGE_MAPPING, DATA_YEAR logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logging.getLogger("azure").setLevel(logging.WARNING) @@ -66,7 +68,7 @@ def chunker_function(iterable, chunk_size=4): yield iterable[i:i + chunk_size] -async def get_available_data(country_code=None, year="2020"): +async def get_available_data(country_code=None, year=DATA_YEAR): """ Args: country_code: The country code for which to fetch data @@ -256,7 +258,7 @@ async def get_links_from_table(data_id=None): return await extract_links_from_table(response.text) -async def download_data(country_code=None, year="2020", force_reprocessing=False, download_path=None): +async def download_data(country_code=None, year=DATA_YEAR, force_reprocessing=False, download_path=None): """ Download all available data for a given country and year. Args: @@ -299,47 +301,85 @@ async def download_data(country_code=None, year="2020", force_reprocessing=False await storage_manager.close() - -def create_sum(input_file_paths, output_file_path): +def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): """ Sum multiple raster files and save the result to an output file, processing in blocks. + If the data is not blocked, create blocks within the function. Args: input_file_paths (list of str): Paths to input raster files. output_file_path (str): Path to save the summed raster file. + block_size (tuple): Tuple representing the block size (rows, cols). Default is (256, 256). Returns: None """ - datasets = [rasterio.open(file_path) for file_path in input_file_paths] + logging.info("Starting create_sum function") + logging.info("Input files: %s", input_file_paths) + logging.info("Output file: %s", output_file_path) + logging.info("Block size: %s", block_size) + + # Open all input files + try: + datasets = [rasterio.open(file_path) for file_path in input_file_paths] + except Exception as e: + logging.error("Error opening input files: %s", e) + return + + logging.info("Successfully opened input raster files") # Use the first dataset as reference for metadata ref_meta = datasets[0].meta.copy() ref_meta.update(dtype="float32", count=1, nodata=0) - # Create the output dataset - with rasterio.open(output_file_path, "w", **ref_meta) as dst: - for ji, window in dst.block_windows(1): - output_data = np.zeros((window.height, window.width), dtype=np.float32) - for src in datasets: - input_data = src.read(1, window=window) - input_data = np.where(input_data == src.nodata, 0, input_data) - output_data += input_data + rows, cols = datasets[0].shape + logging.info("Raster dimensions: %d rows x %d cols", rows, cols) - # Write the summed block to the output file - dst.write(output_data, window=window, indexes=1) + # Create the output file + try: + with rasterio.open(output_file_path, "w", **ref_meta) as dst: + logging.info("Output file created successfully") + + # Process raster in blocks + for i in range(0, rows, block_size[0]): + for j in range(0, cols, block_size[1]): + window = Window(j, i, min(block_size[1], cols - j), min(block_size[0], rows - i)) + logging.info("Processing block: row %d to %d, col %d to %d", i, i + block_size[0], j, + j + block_size[1]) + + output_data = np.zeros((window.height, window.width), dtype=np.float32) + + for idx, src in enumerate(datasets): + try: + input_data = src.read(1, window=window) + input_data = np.where(input_data == src.nodata, 0, input_data) + output_data += input_data + logging.debug("Added data from raster %d", idx + 1) + except Exception as e: + logging.error("Error reading block from raster %d: %s", idx + 1, e) + + dst.write(output_data, window=window, indexes=1) + + logging.info("Finished processing all blocks") + except Exception as e: + logging.error("Error creating or writing to output file: %s", e) + finally: + # Close all input datasets + for src in datasets: + src.close() + logging.info("Closed all input raster files") - for src in datasets: - src.close() + logging.info("create_sum function completed successfully") -async def process_aggregates(country_code=None, age_group=None, sex=None): +async def process_aggregates(country_code: str, age_group: Optional[str] = None, sex: Optional[str] = None): """ - Process the aggregate files. + Process the aggregate files based on sex and age group. + Args: - country_code: The country code for which to process the data. - age_group: The age group to process the data. - sex: The sex to process the data for. + country_code (str): The country code to process the data for. + age_group (Optional[str]): The age group to process (child, active, elderly). + sex (Optional[str]): The sex to process (M, F). """ assert country_code, "Country code must be provided" assert sex or age_group, "Either age or sex must be provided" @@ -347,42 +387,77 @@ async def process_aggregates(country_code=None, age_group=None, sex=None): async with AzureBlobStorageManager(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING")) as storage_manager: logging.info("Processing aggregate files for country: %s", country_code) - async def process_group(sex: str, age_group: str = None): + async def process_group(sexes: List[str], age_group: Optional[str] = None, output_blob_path: Optional[str] = None): """ - Process the files for a specific sex (M/F) and optionally an age group. + Processes a group of files for a specific sex and/or age group. + + Args: + sexes (List[str]): List of sexes to process (e.g., ['M', 'F']). + age_group (Optional[str]): The age group to process (child, active, elderly). + output_blob_path (Optional[str]): Path to store the final output file. """ - path = f"{AZ_ROOT_FILE_PATH}/2020/{country_code}/{sex}/" + # Construct paths for input blobs + paths = [f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/{sex_group}/" for sex_group in sexes] if age_group: assert age_group in WORLDPOP_AGE_MAPPING, "Invalid age group provided" - path += f"{age_group}" + paths = [f"{path}{age_group}/" for path in paths] + + # Fetch blobs for all paths + blobs = [] + for path in paths: + blobs += await storage_manager.list_blobs(path) - blobs = await storage_manager.list_blobs(path) if not blobs: - logging.warning("No blobs found for path: %s", path) + logging.warning("No blobs found for paths: %s", paths) return + # Download and process blobs dataset_files = [] with tempfile.TemporaryDirectory(delete=False) as temp_dir: for blob in blobs: + local_file = os.path.join(temp_dir, os.path.basename(blob)) await storage_manager.download_blob(blob_name=blob, local_directory=temp_dir) - dataset_files.append(f"{temp_dir}/{blob.split('/')[-1]}") + dataset_files.append(local_file) - output_file = f"{temp_dir}/{country_code}_{age_group or 'ALL'}_{sex}.tif" - with ThreadPoolExecutor() as executor: - executor.submit(create_sum, dataset_files, output_file) - # TODO: Upload the output file to Azure Blob Storage `aggregate` folder - shutil.move(output_file, f"data/{country_code}_{age_group or 'ALL'}_{sex}.tif") + # Prepare output directory + os.makedirs(f"data/{output_blob_path}", exist_ok=True) + output_file = f"{temp_dir}/{country_code}_{age_group or 'ALL'}_{'_'.join(sexes)}.tif" + # Perform summation using create_sum + with ThreadPoolExecutor() as executor: + executor.submit(create_sum, input_file_paths=dataset_files, output_file_path=output_file, block_size=(512, 512)) + + # Save final output + final_output_path = f"data/{output_blob_path}/{os.path.basename(output_file)}" + await storage_manager.upload_blob(file_path=output_file, blob_name=f"{output_blob_path}/{os.path.basename(output_file)}") + # shutil.copy2(output_file, final_output_path) + # logging.info("Output saved to: %s", final_output_path) + logging.info("Output saved to: %s", f"{output_blob_path}/{os.path.basename(output_file)}") + # Processing logic for combinations of sex and age group if sex and age_group: - await process_group(sex, age_group) + logging.info("Processing for sex '%s' and age group '%s'", sex, age_group) + output_blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{sex}/{age_group}" + await process_group([sex], age_group, output_blob_path) elif age_group: - await process_group('M', age_group) - await process_group('F', age_group) + logging.info("Processing for age group '%s' (both sexes)", age_group) + output_blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{age_group}" + await process_group(['M', 'F'], age_group, output_blob_path) elif sex: - await process_group(sex) + logging.info("Processing for sex '%s' (all age groups)", sex) + output_blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{sex}" + await process_group([sex], None, output_blob_path) + # else: + # # Process all + # logging.info("Processing all data") + # output_blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate" + # await process_group(['M', 'F'], None, output_blob_path) + # await process_group(['M', 'F'], 'child', output_blob_path) + # await process_group(['M', 'F'], 'active', output_blob_path) + # await process_group(['M', 'F'], 'elderly', output_blob_path) + logging.info("Processing complete") if __name__ == "__main__": # asyncio.run(download_data(force_reprocessing=False)) - create_sum(["/media/thuha/Data/worldpop_data/m/0-12/BDI_m_0_2020_constrained.tif", "/media/thuha/Data/worldpop_data/f/0-12/BDI_f_0_2020_constrained.tif"], "data/BDI_0-12.tif") \ No newline at end of file + create_sum(["/media/thuha/Data/worldpop_data/m/0-12/BDI_m_0_2020_constrained.tif", "/media/thuha/Data/worldpop_data/f/0-12/BDI_f_0_2020_constrained.tif"], "data/BDI_0-12.tif") From c059c6f5b5ee17c3f4a598395491e06877bb8e7f Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Tue, 17 Dec 2024 22:30:53 +0300 Subject: [PATCH 04/24] feat: process all combinations implementation --- cbsurge/exposure/population/__init__.py | 3 +- cbsurge/exposure/population/constants.py | 16 +++- cbsurge/exposure/population/worldpop.py | 108 +++++++++++------------ 3 files changed, 67 insertions(+), 60 deletions(-) diff --git a/cbsurge/exposure/population/__init__.py b/cbsurge/exposure/population/__init__.py index 25743bf1..3c4cd945 100644 --- a/cbsurge/exposure/population/__init__.py +++ b/cbsurge/exposure/population/__init__.py @@ -15,8 +15,9 @@ def population(): @click.option('--force-reprocessing', help='Force reprocessing of data', is_flag=True) @click.option('--country', help='The ISO3 code of the country to process the data for') @click.option('--download-path', help='Download data locally', required=False) -def run_download(force_reprocessing, country, download_path): +def sync(force_reprocessing, country, download_path): asyncio.run(download_data(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path)) + asyncio.run(process_aggregates(country_code=country)) @population.command() diff --git a/cbsurge/exposure/population/constants.py b/cbsurge/exposure/population/constants.py index 189eb5da..eb79493b 100644 --- a/cbsurge/exposure/population/constants.py +++ b/cbsurge/exposure/population/constants.py @@ -8,4 +8,18 @@ "elderly": [65, 100], } WORLDPOP_SEXES = ["M", "F"] -DATA_YEAR = 2020 \ No newline at end of file +DATA_YEAR = 2020 + +AGESEX_STRUCTURE_COMBINATIONS = [ + {"sexes": ["M"], "age_group": None, "label": "male_total"}, + {"sexes": ["F"], "age_group": None, "label": "female_total"}, + {"sexes": ["M"], "age_group": "active", "label": "male_active"}, + {"sexes": ["F"], "age_group": "active", "label": "female_active"}, + {"sexes": ["M"], "age_group": "child", "label": "male_child"}, + {"sexes": ["F"], "age_group": "child", "label": "female_child"}, + {"sexes": ["M"], "age_group": "elderly", "label": "male_elderly"}, + {"sexes": ["F"], "age_group": "elderly", "label": "female_elderly"}, + {"sexes": ["M", "F"], "age_group": "elderly", "label": "elderly_total"}, + {"sexes": ["M", "F"], "age_group": "child", "label": "child_total"}, + {"sexes": ["M", "F"], "age_group": "active", "label": "active_total"}, + ] \ No newline at end of file diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 1f05eaee..0089c9d0 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -16,7 +16,8 @@ from tqdm.asyncio import tqdm_asyncio from cbsurge.azure.blob_storage import AzureBlobStorageManager -from cbsurge.exposure.population.constants import AZ_ROOT_FILE_PATH, WORLDPOP_AGE_MAPPING, DATA_YEAR +from cbsurge.exposure.population.constants import AZ_ROOT_FILE_PATH, WORLDPOP_AGE_MAPPING, DATA_YEAR, \ + AGESEX_STRUCTURE_COMBINATIONS logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logging.getLogger("azure").setLevel(logging.WARNING) @@ -318,7 +319,6 @@ def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): logging.info("Input files: %s", input_file_paths) logging.info("Output file: %s", output_file_path) logging.info("Block size: %s", block_size) - # Open all input files try: datasets = [rasterio.open(file_path) for file_path in input_file_paths] @@ -359,7 +359,7 @@ def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): logging.error("Error reading block from raster %d: %s", idx + 1, e) dst.write(output_data, window=window, indexes=1) - + break logging.info("Finished processing all blocks") except Exception as e: logging.error("Error creating or writing to output file: %s", e) @@ -372,89 +372,81 @@ def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): logging.info("create_sum function completed successfully") -async def process_aggregates(country_code: str, age_group: Optional[str] = None, sex: Optional[str] = None): +async def process_aggregates(country_code: str, sex: Optional[str] = None, age_group: Optional[str] = None): """ - Process the aggregate files based on sex and age group. - + Process aggregate files for combinations of sex and age groups, or specific arguments passed. Args: - country_code (str): The country code to process the data for. - age_group (Optional[str]): The age group to process (child, active, elderly). - sex (Optional[str]): The sex to process (M, F). + country_code (str): Country code for processing. + sex (Optional[str]): Sex to process (M or F). + age_group (Optional[str]): Age group to process (child, active, elderly). """ assert country_code, "Country code must be provided" - assert sex or age_group, "Either age or sex must be provided" async with AzureBlobStorageManager(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING")) as storage_manager: logging.info("Processing aggregate files for country: %s", country_code) - async def process_group(sexes: List[str], age_group: Optional[str] = None, output_blob_path: Optional[str] = None): - """ - Processes a group of files for a specific sex and/or age group. + async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, output_label: str=None): + """ + Processes and sums files for specified sexes and age groups. Args: - sexes (List[str]): List of sexes to process (e.g., ['M', 'F']). - age_group (Optional[str]): The age group to process (child, active, elderly). - output_blob_path (Optional[str]): Path to store the final output file. + sexes (List[str]): List of sexes (e.g., ['M'], ['F'], or ['M', 'F']). + age_grouping (Optional[str]): Age group to process. + output_label (str): Label for the output file. """ - # Construct paths for input blobs - paths = [f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/{sex_group}/" for sex_group in sexes] - if age_group: - assert age_group in WORLDPOP_AGE_MAPPING, "Invalid age group provided" - paths = [f"{path}{age_group}/" for path in paths] + # Construct paths + paths = [f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/{s}" for s in sexes] + if age_grouping: + assert age_grouping in WORLDPOP_AGE_MAPPING, "Invalid age group provided" + paths = [f"{path}/{age_grouping}/" for path in paths] - # Fetch blobs for all paths + # Fetch blobs blobs = [] for path in paths: blobs += await storage_manager.list_blobs(path) - if not blobs: logging.warning("No blobs found for paths: %s", paths) return - # Download and process blobs - dataset_files = [] + # Download blobs and sum them with tempfile.TemporaryDirectory(delete=False) as temp_dir: + local_files = [] for blob in blobs: local_file = os.path.join(temp_dir, os.path.basename(blob)) - await storage_manager.download_blob(blob_name=blob, local_directory=temp_dir) - dataset_files.append(local_file) + await storage_manager.download_blob(blob, temp_dir) + local_files.append(local_file) - # Prepare output directory - os.makedirs(f"data/{output_blob_path}", exist_ok=True) - output_file = f"{temp_dir}/{country_code}_{age_group or 'ALL'}_{'_'.join(sexes)}.tif" - - # Perform summation using create_sum + output_file = f"{temp_dir}/{output_label}.tif" with ThreadPoolExecutor() as executor: - executor.submit(create_sum, input_file_paths=dataset_files, output_file_path=output_file, block_size=(512, 512)) - - # Save final output - final_output_path = f"data/{output_blob_path}/{os.path.basename(output_file)}" - await storage_manager.upload_blob(file_path=output_file, blob_name=f"{output_blob_path}/{os.path.basename(output_file)}") - # shutil.copy2(output_file, final_output_path) - # logging.info("Output saved to: %s", final_output_path) - logging.info("Output saved to: %s", f"{output_blob_path}/{os.path.basename(output_file)}") - # Processing logic for combinations of sex and age group + executor.submit(create_sum, input_file_paths=local_files, output_file_path=output_file, block_size=(512, 512)) + + # Upload the result + blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{country_code}_{output_label}.tif" + await storage_manager.upload_blob(file_path=output_file, blob_name=blob_path) + # shutil.copy2(output_file, f"data/{country_code}_{output_label}.tif") + logging.info("Processed and uploaded: %s", blob_path) + + # Dynamic argument-based processing if sex and age_group: + label = f"{sex}_{age_group}" logging.info("Processing for sex '%s' and age group '%s'", sex, age_group) - output_blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{sex}/{age_group}" - await process_group([sex], age_group, output_blob_path) - elif age_group: - logging.info("Processing for age group '%s' (both sexes)", age_group) - output_blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{age_group}" - await process_group(['M', 'F'], age_group, output_blob_path) + await process_group(sexes=[sex], age_grouping=age_group, output_label=label) elif sex: + label = f"{sex}_total" logging.info("Processing for sex '%s' (all age groups)", sex) - output_blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{sex}" - await process_group([sex], None, output_blob_path) - # else: - # # Process all - # logging.info("Processing all data") - # output_blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate" - # await process_group(['M', 'F'], None, output_blob_path) - # await process_group(['M', 'F'], 'child', output_blob_path) - # await process_group(['M', 'F'], 'active', output_blob_path) - # await process_group(['M', 'F'], 'elderly', output_blob_path) - logging.info("Processing complete") + await process_group(sexes=[sex], output_label=label) + elif age_group: + label = f"{age_group}_total" + logging.info("Processing for age group '%s' (both sexes)", age_group) + await process_group(sexes=['M', 'F'], age_grouping=age_group, output_label=label) + else: + # Process predefined combinations + logging.info("Processing all predefined combinations...") + for combo in AGESEX_STRUCTURE_COMBINATIONS: + logging.info("Processing %s...", combo["label"]) + await process_group(sexes=combo["sexes"], age_grouping=combo["age_group"], output_label=combo["label"]) + + logging.info("All processing complete for country: %s", country_code) if __name__ == "__main__": From caffa88ba4e8abe63da02eb925e547f5619ed140 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Tue, 17 Dec 2024 22:55:31 +0300 Subject: [PATCH 05/24] fix: run it for all countries with sync command --- cbsurge/exposure/population/__init__.py | 1 - cbsurge/exposure/population/worldpop.py | 13 +++++-------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/cbsurge/exposure/population/__init__.py b/cbsurge/exposure/population/__init__.py index 3c4cd945..d3198c1b 100644 --- a/cbsurge/exposure/population/__init__.py +++ b/cbsurge/exposure/population/__init__.py @@ -17,7 +17,6 @@ def population(): @click.option('--download-path', help='Download data locally', required=False) def sync(force_reprocessing, country, download_path): asyncio.run(download_data(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path)) - asyncio.run(process_aggregates(country_code=country)) @population.command() diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 0089c9d0..59440001 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -275,13 +275,9 @@ async def download_data(country_code=None, year=DATA_YEAR, force_reprocessing=Fa available_data = await get_available_data(country_code=country_code, year=year) storage_manager = AzureBlobStorageManager(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING")) for country_code, country_id in available_data.items(): - # if country_code == "RUS": - # continue logging.info("Processing country: %s", country_code) file_links = await get_links_from_table(data_id=country_id) for i, file_urls_chunk in enumerate(chunker_function(file_links, chunk_size=4)): - # if i > 0: - # break logging.info("Processing chunk %d for country: %s", i + 1, country_code) # Create a fresh list of tasks for each file chunk tasks = [process_single_file(file_url=url, @@ -296,7 +292,10 @@ async def download_data(country_code=None, year=DATA_YEAR, force_reprocessing=Fa for result in results: if isinstance(result, Exception): logging.error("Error processing file: %s", result) - + logging.info("Data download complete for country: %s", country_code) + logging.info("Starting aggregate processing for country: %s", country_code) + await process_aggregates(country_code=country_code) + logging.info("Aggregate processing complete for country: %s", country_code) # Close the storage manager connection after all files have been processed logging.info("Closing storage manager after processing all files") await storage_manager.close() @@ -450,6 +449,4 @@ async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, if __name__ == "__main__": - # asyncio.run(download_data(force_reprocessing=False)) - - create_sum(["/media/thuha/Data/worldpop_data/m/0-12/BDI_m_0_2020_constrained.tif", "/media/thuha/Data/worldpop_data/f/0-12/BDI_f_0_2020_constrained.tif"], "data/BDI_0-12.tif") + pass \ No newline at end of file From 081af9129fa66681a2ba67940195b42daaf96b5d Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Tue, 17 Dec 2024 23:15:43 +0300 Subject: [PATCH 06/24] remove persistent temp files --- cbsurge/exposure/population/worldpop.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 59440001..07358094 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -275,6 +275,8 @@ async def download_data(country_code=None, year=DATA_YEAR, force_reprocessing=Fa available_data = await get_available_data(country_code=country_code, year=year) storage_manager = AzureBlobStorageManager(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING")) for country_code, country_id in available_data.items(): + if country_code == "RUS": + continue logging.info("Processing country: %s", country_code) file_links = await get_links_from_table(data_id=country_id) for i, file_urls_chunk in enumerate(chunker_function(file_links, chunk_size=4)): @@ -408,7 +410,7 @@ async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, return # Download blobs and sum them - with tempfile.TemporaryDirectory(delete=False) as temp_dir: + with tempfile.TemporaryDirectory() as temp_dir: local_files = [] for blob in blobs: local_file = os.path.join(temp_dir, os.path.basename(blob)) @@ -424,7 +426,6 @@ async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, await storage_manager.upload_blob(file_path=output_file, blob_name=blob_path) # shutil.copy2(output_file, f"data/{country_code}_{output_label}.tif") logging.info("Processed and uploaded: %s", blob_path) - # Dynamic argument-based processing if sex and age_group: label = f"{sex}_{age_group}" From 08ee9523457f03788498711ce1b42dba6d85c87d Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Wed, 18 Dec 2024 10:44:42 +0300 Subject: [PATCH 07/24] check for if aggregates exist --- cbsurge/exposure/population/worldpop.py | 27 ++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 07358094..971adf6b 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -296,13 +296,38 @@ async def download_data(country_code=None, year=DATA_YEAR, force_reprocessing=Fa logging.error("Error processing file: %s", result) logging.info("Data download complete for country: %s", country_code) logging.info("Starting aggregate processing for country: %s", country_code) - await process_aggregates(country_code=country_code) + if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code): + logging.info("Aggregate files already exist for country: %s", country_code) + else: + await process_aggregates(country_code=country_code) logging.info("Aggregate processing complete for country: %s", country_code) # Close the storage manager connection after all files have been processed logging.info("Closing storage manager after processing all files") await storage_manager.close() +async def check_aggregates_exist(storage_manager=None, country_code=None): + """ + Check if aggregate files exist for a given country code. + Args: + storage_manager: AzStorageManager instance + country_code: The country code to check for + + Returns: True if all aggregate files exist, False otherwise + + """ + assert storage_manager is not None, "storage_manager is required" + assert country_code is not None, "country_code is required" + logging.info("Checking if aggregate files exist for country: %s", country_code) + for combo in AGESEX_STRUCTURE_COMBINATIONS: + blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{country_code}_{combo['label']}.tif" + cog_exists = await check_cog_exists(storage_manager=storage_manager, blob_path=blob_path) + if not cog_exists: + logging.info("Aggregate file does not exist: %s", blob_path) + return False + logging.info("All aggregate files exist for country: %s", country_code) + return True + def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): """ Sum multiple raster files and save the result to an output file, processing in blocks. From 535502f2288915c5919155ae76c12ca78c8ac912 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Wed, 18 Dec 2024 11:09:46 +0300 Subject: [PATCH 08/24] fix: update metadata for sizes --- cbsurge/exposure/population/worldpop.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 971adf6b..6e5ca373 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -296,11 +296,11 @@ async def download_data(country_code=None, year=DATA_YEAR, force_reprocessing=Fa logging.error("Error processing file: %s", result) logging.info("Data download complete for country: %s", country_code) logging.info("Starting aggregate processing for country: %s", country_code) - if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code): - logging.info("Aggregate files already exist for country: %s", country_code) - else: - await process_aggregates(country_code=country_code) - logging.info("Aggregate processing complete for country: %s", country_code) + # if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code) and not force_reprocessing: + # logging.info("Aggregate files already exist for country: %s", country_code) + # else: + await process_aggregates(country_code=country_code) + # logging.info("Aggregate processing complete for country: %s", country_code) # Close the storage manager connection after all files have been processed logging.info("Closing storage manager after processing all files") await storage_manager.close() @@ -357,6 +357,8 @@ def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): # Use the first dataset as reference for metadata ref_meta = datasets[0].meta.copy() ref_meta.update(dtype="float32", count=1, nodata=0) + ref_meta.update(compress="ZSTD") + ref_meta.update(driver="COG") rows, cols = datasets[0].shape logging.info("Raster dimensions: %d rows x %d cols", rows, cols) From 603ae1b4529f6675b47efe4b4996886c6629dd63 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Thu, 19 Dec 2024 11:58:10 +0300 Subject: [PATCH 09/24] WIP: debugging --- cbsurge/exposure/population/worldpop.py | 30 +++++++++++++++---------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 6e5ca373..34e6e400 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -296,11 +296,11 @@ async def download_data(country_code=None, year=DATA_YEAR, force_reprocessing=Fa logging.error("Error processing file: %s", result) logging.info("Data download complete for country: %s", country_code) logging.info("Starting aggregate processing for country: %s", country_code) - # if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code) and not force_reprocessing: - # logging.info("Aggregate files already exist for country: %s", country_code) - # else: - await process_aggregates(country_code=country_code) - # logging.info("Aggregate processing complete for country: %s", country_code) + if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code) and not force_reprocessing: + logging.info("Aggregate files already exist for country: %s", country_code) + else: + await process_aggregates(country_code=country_code) + logging.info("Aggregate processing complete for country: %s", country_code) # Close the storage manager connection after all files have been processed logging.info("Closing storage manager after processing all files") await storage_manager.close() @@ -356,9 +356,11 @@ def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): # Use the first dataset as reference for metadata ref_meta = datasets[0].meta.copy() - ref_meta.update(dtype="float32", count=1, nodata=0) - ref_meta.update(compress="ZSTD") - ref_meta.update(driver="COG") + + # Update metadata + ref_meta.update(count=1, nodata=0, dtype="float32") + + print(ref_meta) rows, cols = datasets[0].shape logging.info("Raster dimensions: %d rows x %d cols", rows, cols) @@ -380,12 +382,16 @@ def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): for idx, src in enumerate(datasets): try: input_data = src.read(1, window=window) + input_data = np.where(input_data == src.nodata, 0, input_data) + # print(input_data) + # continue output_data += input_data + logging.debug("Added data from raster %d", idx + 1) except Exception as e: logging.error("Error reading block from raster %d: %s", idx + 1, e) - + print(max(output_data.flatten())) dst.write(output_data, window=window, indexes=1) break logging.info("Finished processing all blocks") @@ -446,12 +452,12 @@ async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, output_file = f"{temp_dir}/{output_label}.tif" with ThreadPoolExecutor() as executor: - executor.submit(create_sum, input_file_paths=local_files, output_file_path=output_file, block_size=(512, 512)) + executor.submit(create_sum, input_file_paths=local_files, output_file_path=output_file, block_size=(256, 256)) # Upload the result blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{country_code}_{output_label}.tif" - await storage_manager.upload_blob(file_path=output_file, blob_name=blob_path) - # shutil.copy2(output_file, f"data/{country_code}_{output_label}.tif") + # await storage_manager.upload_blob(file_path=output_file, blob_name=blob_path) + shutil.copy2(output_file, f"data/{country_code}_{output_label}.tif") logging.info("Processed and uploaded: %s", blob_path) # Dynamic argument-based processing if sex and age_group: From 2c2a4fb6f9c9ecdc431d2537fc42128c41929256 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Fri, 20 Dec 2024 15:05:41 +0300 Subject: [PATCH 10/24] fix: breaking before all blocks are processed --- cbsurge/exposure/population/worldpop.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 34e6e400..fa672c21 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -356,13 +356,10 @@ def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): # Use the first dataset as reference for metadata ref_meta = datasets[0].meta.copy() - - # Update metadata - ref_meta.update(count=1, nodata=0, dtype="float32") - - print(ref_meta) + ref_meta.update(count=1, dtype="float32", nodata=0) rows, cols = datasets[0].shape + logging.info("Raster dimensions: %d rows x %d cols", rows, cols) # Create the output file @@ -384,16 +381,14 @@ def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): input_data = src.read(1, window=window) input_data = np.where(input_data == src.nodata, 0, input_data) - # print(input_data) - # continue + output_data += input_data logging.debug("Added data from raster %d", idx + 1) except Exception as e: logging.error("Error reading block from raster %d: %s", idx + 1, e) - print(max(output_data.flatten())) + dst.write(output_data, window=window, indexes=1) - break logging.info("Finished processing all blocks") except Exception as e: logging.error("Error creating or writing to output file: %s", e) From f8c00483c1e5b6159961820400001da43ac4dd26 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Fri, 20 Dec 2024 15:09:06 +0300 Subject: [PATCH 11/24] upload data --- cbsurge/exposure/population/worldpop.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index fa672c21..ec915359 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -451,8 +451,8 @@ async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, # Upload the result blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{country_code}_{output_label}.tif" - # await storage_manager.upload_blob(file_path=output_file, blob_name=blob_path) - shutil.copy2(output_file, f"data/{country_code}_{output_label}.tif") + await storage_manager.upload_blob(file_path=output_file, blob_name=blob_path) + # shutil.copy2(output_file, f"data/{country_code}_{output_label}.tif") logging.info("Processed and uploaded: %s", blob_path) # Dynamic argument-based processing if sex and age_group: From 0356e773626fce1d9b507d08a1f58c2a32ee2805 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Fri, 27 Dec 2024 11:58:25 +0300 Subject: [PATCH 12/24] refactor: to upload for all --- cbsurge/exposure/population/__init__.py | 4 ++-- cbsurge/exposure/population/worldpop.py | 21 ++++++++++----------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/cbsurge/exposure/population/__init__.py b/cbsurge/exposure/population/__init__.py index d3198c1b..31c814f5 100644 --- a/cbsurge/exposure/population/__init__.py +++ b/cbsurge/exposure/population/__init__.py @@ -2,7 +2,7 @@ import click -from cbsurge.exposure.population.worldpop import download_data, process_aggregates +from cbsurge.exposure.population.worldpop import download_data, process_aggregates_for_country @click.group() @@ -24,4 +24,4 @@ def sync(force_reprocessing, country, download_path): @click.option('--age-group', help='The age group to process the data for', type=click.Choice(['child', 'active', 'elderly'])) @click.option('--sex', help='Path to the downloaded data', type=click.Choice(['M', 'F'])) def run_aggregate(country, age_group, sex): - asyncio.run(process_aggregates(country_code=country, age_group=age_group, sex=sex)) \ No newline at end of file + asyncio.run(process_aggregates_for_country(country_code=country, age_group=age_group, sex=sex)) \ No newline at end of file diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index ec915359..e70eefa0 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -296,11 +296,11 @@ async def download_data(country_code=None, year=DATA_YEAR, force_reprocessing=Fa logging.error("Error processing file: %s", result) logging.info("Data download complete for country: %s", country_code) logging.info("Starting aggregate processing for country: %s", country_code) - if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code) and not force_reprocessing: - logging.info("Aggregate files already exist for country: %s", country_code) - else: - await process_aggregates(country_code=country_code) - logging.info("Aggregate processing complete for country: %s", country_code) + # if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code) and not force_reprocessing: + # logging.info("Aggregate files already exist for country: %s", country_code) + # else: + await process_aggregates_for_country(country_code=country_code) + # logging.info("Aggregate processing complete for country: %s", country_code) # Close the storage manager connection after all files have been processed logging.info("Closing storage manager after processing all files") await storage_manager.close() @@ -356,7 +356,7 @@ def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): # Use the first dataset as reference for metadata ref_meta = datasets[0].meta.copy() - ref_meta.update(count=1, dtype="float32", nodata=0) + ref_meta.update(count=1, dtype="float32", nodata=0, compress="ZSTD") rows, cols = datasets[0].shape @@ -401,20 +401,19 @@ def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): logging.info("create_sum function completed successfully") -async def process_aggregates(country_code: str, sex: Optional[str] = None, age_group: Optional[str] = None): +async def process_aggregates_for_country(country_code: str, sex: Optional[str] = None, age_group: Optional[str] = None, year="2020"): """ Process aggregate files for combinations of sex and age groups, or specific arguments passed. Args: country_code (str): Country code for processing. sex (Optional[str]): Sex to process (M or F). age_group (Optional[str]): Age group to process (child, active, elderly). + year (Optional[str]): (Unimplemented). The year for which the data should be produced for """ - assert country_code, "Country code must be provided" + assert country_code, "Country code must be provided" async with AzureBlobStorageManager(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING")) as storage_manager: logging.info("Processing aggregate files for country: %s", country_code) - - async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, output_label: str=None): """ Processes and sums files for specified sexes and age groups. @@ -447,7 +446,7 @@ async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, output_file = f"{temp_dir}/{output_label}.tif" with ThreadPoolExecutor() as executor: - executor.submit(create_sum, input_file_paths=local_files, output_file_path=output_file, block_size=(256, 256)) + executor.submit(create_sum, input_file_paths=local_files, output_file_path=output_file, block_size=(1028, 1028)) # Upload the result blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{country_code}_{output_label}.tif" From ebb92734e3a73ce5ca9d5b497bf9546da42f2894 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Fri, 27 Dec 2024 11:59:30 +0300 Subject: [PATCH 13/24] refactor: process remaining --- cbsurge/exposure/population/worldpop.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index e70eefa0..91b06a85 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -296,11 +296,11 @@ async def download_data(country_code=None, year=DATA_YEAR, force_reprocessing=Fa logging.error("Error processing file: %s", result) logging.info("Data download complete for country: %s", country_code) logging.info("Starting aggregate processing for country: %s", country_code) - # if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code) and not force_reprocessing: - # logging.info("Aggregate files already exist for country: %s", country_code) - # else: - await process_aggregates_for_country(country_code=country_code) - # logging.info("Aggregate processing complete for country: %s", country_code) + if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code) and not force_reprocessing: + logging.info("Aggregate files already exist for country: %s", country_code) + else: + await process_aggregates_for_country(country_code=country_code) + logging.info("Aggregate processing complete for country: %s", country_code) # Close the storage manager connection after all files have been processed logging.info("Closing storage manager after processing all files") await storage_manager.close() From 69f3dbdcbe468778908651158f83348f5ac9eed9 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Fri, 27 Dec 2024 11:59:37 +0300 Subject: [PATCH 14/24] refactor: process remaining --- cbsurge/exposure/population/worldpop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 91b06a85..5ee9ac83 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -296,7 +296,7 @@ async def download_data(country_code=None, year=DATA_YEAR, force_reprocessing=Fa logging.error("Error processing file: %s", result) logging.info("Data download complete for country: %s", country_code) logging.info("Starting aggregate processing for country: %s", country_code) - if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code) and not force_reprocessing: + if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code): logging.info("Aggregate files already exist for country: %s", country_code) else: await process_aggregates_for_country(country_code=country_code) From fee131550eed64feb66730eac9b496886ba585cc Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Tue, 7 Jan 2025 11:08:29 +0300 Subject: [PATCH 15/24] WIP --- .gitignore | 2 ++ cbsurge/azure/blob_storage.py | 28 +++++++++++++++++++++++++ cbsurge/exposure/population/__init__.py | 2 +- cbsurge/exposure/population/worldpop.py | 15 ++++++++++--- 4 files changed, 43 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index aa74adb1..504a0a16 100644 --- a/.gitignore +++ b/.gitignore @@ -82,6 +82,8 @@ target/ profile_default/ ipython_config.py + +data/ # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: diff --git a/cbsurge/azure/blob_storage.py b/cbsurge/azure/blob_storage.py index df3ba98a..46dd77c0 100644 --- a/cbsurge/azure/blob_storage.py +++ b/cbsurge/azure/blob_storage.py @@ -154,6 +154,34 @@ async def copy_file(self, source_blob=None, destination_blob=None): await destination_blob_client.start_copy_from_url(source_blob_client.url) return destination_blob_client.url + async def delete_blob(self, blob_name=None): + """ + Delete a blob from Azure Blob Storage. + Args: + blob_name: + + Returns: + + """ + logging.info("Deleting blob: %s", blob_name) + blob_client = self.container_client.get_blob_client(blob=blob_name) + await blob_client.delete_blob() + return blob_name + + async def rename_file(self, source_blob=None, destination_blob=None): + """ + Rename a blob file + Args: + source_blob: + destination_blob: + + Returns: + + """ + await self.copy_file(source_blob=source_blob, destination_blob=destination_blob) + await self.delete_blob(blob_name=source_blob) + return destination_blob + async def close(self): """ Close the Azure Blob Storage Manager. diff --git a/cbsurge/exposure/population/__init__.py b/cbsurge/exposure/population/__init__.py index 31c814f5..ae7c4c11 100644 --- a/cbsurge/exposure/population/__init__.py +++ b/cbsurge/exposure/population/__init__.py @@ -22,6 +22,6 @@ def sync(force_reprocessing, country, download_path): @population.command() @click.option('--country', help='The ISO3 code of the country to process the data for') @click.option('--age-group', help='The age group to process the data for', type=click.Choice(['child', 'active', 'elderly'])) -@click.option('--sex', help='Path to the downloaded data', type=click.Choice(['M', 'F'])) +@click.option('--sex', help='Path to the downloaded data', type=click.Choice(['male', 'female'])) def run_aggregate(country, age_group, sex): asyncio.run(process_aggregates_for_country(country_code=country, age_group=age_group, sex=sex)) \ No newline at end of file diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 5ee9ac83..1f8dedbe 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -401,6 +401,7 @@ def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): logging.info("create_sum function completed successfully") + async def process_aggregates_for_country(country_code: str, sex: Optional[str] = None, age_group: Optional[str] = None, year="2020"): """ Process aggregate files for combinations of sex and age groups, or specific arguments passed. @@ -439,10 +440,18 @@ async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, # Download blobs and sum them with tempfile.TemporaryDirectory() as temp_dir: local_files = [] - for blob in blobs: + # download blobs concurrently + async def create_local_files(blob): local_file = os.path.join(temp_dir, os.path.basename(blob)) await storage_manager.download_blob(blob, temp_dir) local_files.append(local_file) + tasks = [create_local_files(blob) for blob in blobs] + await asyncio.gather(*tasks) + + # for blob in blobs: + # local_file = os.path.join(temp_dir, os.path.basename(blob)) + # await storage_manager.download_blob(blob, temp_dir) + # local_files.append(local_file) output_file = f"{temp_dir}/{output_label}.tif" with ThreadPoolExecutor() as executor: @@ -450,8 +459,8 @@ async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, # Upload the result blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{country_code}_{output_label}.tif" - await storage_manager.upload_blob(file_path=output_file, blob_name=blob_path) - # shutil.copy2(output_file, f"data/{country_code}_{output_label}.tif") + # await storage_manager.upload_blob(file_path=output_file, blob_name=blob_path) + shutil.copy2(output_file, f"data/{country_code}_{output_label}.tif") logging.info("Processed and uploaded: %s", blob_path) # Dynamic argument-based processing if sex and age_group: From 00fb41d7157ce7a009396d391dce869cb8c92298 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Fri, 10 Jan 2025 16:29:27 +0300 Subject: [PATCH 16/24] fix: update to use and instead of and --- cbsurge/azure/blob_storage.py | 1 - cbsurge/exposure/population/constants.py | 6 +++++- cbsurge/exposure/population/worldpop.py | 21 +++++++++++---------- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/cbsurge/azure/blob_storage.py b/cbsurge/azure/blob_storage.py index 46dd77c0..4a67f992 100644 --- a/cbsurge/azure/blob_storage.py +++ b/cbsurge/azure/blob_storage.py @@ -7,7 +7,6 @@ from tqdm.asyncio import tqdm_asyncio, tqdm from cbsurge.constants import AZURE_BLOB_CONTAINER_NAME -from cbsurge.exposure.population.constants import WORLDPOP_AGE_MAPPING class AzureBlobStorageManager: diff --git a/cbsurge/exposure/population/constants.py b/cbsurge/exposure/population/constants.py index eb79493b..19cbf483 100644 --- a/cbsurge/exposure/population/constants.py +++ b/cbsurge/exposure/population/constants.py @@ -7,7 +7,11 @@ "active": [15, 64], "elderly": [65, 100], } -WORLDPOP_SEXES = ["M", "F"] +SEX_MAPPING = { + "M": "male", + "F": "female", +} + DATA_YEAR = 2020 AGESEX_STRUCTURE_COMBINATIONS = [ diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 1f8dedbe..f57cf851 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -17,7 +17,7 @@ from cbsurge.azure.blob_storage import AzureBlobStorageManager from cbsurge.exposure.population.constants import AZ_ROOT_FILE_PATH, WORLDPOP_AGE_MAPPING, DATA_YEAR, \ - AGESEX_STRUCTURE_COMBINATIONS + AGESEX_STRUCTURE_COMBINATIONS, SEX_MAPPING logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logging.getLogger("azure").setLevel(logging.WARNING) @@ -192,13 +192,14 @@ async def process_single_file(file_url=None, storage_manager=None, country_code= try: async with httpx.AsyncClient() as client: age = file_name.split("_")[2] - sex = file_name.split("_")[1] + sex = SEX_MAPPING[file_name.split("_")[1].upper()] + for age_group, age_range in WORLDPOP_AGE_MAPPING.items(): if age_range[0] <= int(age) <= age_range[1]: age = age_group break cog_exists = await check_cog_exists(storage_manager=storage_manager, - blob_path=f"{AZ_ROOT_FILE_PATH}/{year}/{country_code}/{sex.upper()}/{age}/{file_name}") + blob_path=f"{AZ_ROOT_FILE_PATH}/{year}/{country_code}/{sex}/{age}/{file_name}") if cog_exists and not force_reprocessing: logging.info("COG already exists in Azure, skipping upload: %s", file_name) return @@ -227,14 +228,14 @@ async def process_single_file(file_url=None, storage_manager=None, country_code= logging.info("Download path does not exist. Creating download path: %s", download_path) os.makedirs(download_path, exist_ok=True) logging.info("Copying COG file locally: %s", cog_path) - os.makedirs(f"{download_path}/{year}/{country_code}/{sex.upper()}/{age}", exist_ok=True) - shutil.move(cog_path, f"{download_path}/{year}/{country_code}/{sex.upper()}/{age}/{file_name}") - logging.info("Successfully copied COG file: %s", f"{download_path}/{year}/{country_code}/{sex.upper()}/{age}/{file_name}") + os.makedirs(f"{download_path}/{year}/{country_code}/{sex}/{age}", exist_ok=True) + shutil.move(cog_path, f"{download_path}/{year}/{country_code}/{sex}/{age}/{file_name}") + logging.info("Successfully copied COG file: %s", f"{download_path}/{year}/{country_code}/{sex}/{age}/{file_name}") except Exception as e: raise Exception(f"Error copying COG file: {e}") else: logging.info("Uploading COG file to Azure: %s", cog_path) - await storage_manager.upload_blob(file_path=cog_path, blob_name=f"{AZ_ROOT_FILE_PATH}/{year}/{country_code}/{sex.upper()}/{age}/{file_name}") + await storage_manager.upload_blob(file_path=cog_path, blob_name=f"{AZ_ROOT_FILE_PATH}/{year}/{country_code}/{sex}/{age}/{file_name}") logging.info("Successfully processed file: %s", file_name) @@ -407,7 +408,7 @@ async def process_aggregates_for_country(country_code: str, sex: Optional[str] = Process aggregate files for combinations of sex and age groups, or specific arguments passed. Args: country_code (str): Country code for processing. - sex (Optional[str]): Sex to process (M or F). + sex (Optional[str]): Sex to process (male or female). age_group (Optional[str]): Age group to process (child, active, elderly). year (Optional[str]): (Unimplemented). The year for which the data should be produced for """ @@ -419,7 +420,7 @@ async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, """ Processes and sums files for specified sexes and age groups. Args: - sexes (List[str]): List of sexes (e.g., ['M'], ['F'], or ['M', 'F']). + sexes (List[str]): List of sexes (e.g., ['male'], ['female'], or ['male', 'female']). age_grouping (Optional[str]): Age group to process. output_label (str): Label for the output file. """ @@ -474,7 +475,7 @@ async def create_local_files(blob): elif age_group: label = f"{age_group}_total" logging.info("Processing for age group '%s' (both sexes)", age_group) - await process_group(sexes=['M', 'F'], age_grouping=age_group, output_label=label) + await process_group(sexes=['male', 'female'], age_grouping=age_group, output_label=label) else: # Process predefined combinations logging.info("Processing all predefined combinations...") From 4283d8aa05f17fd5392eed5dc5edfa4b15ee3df7 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Mon, 13 Jan 2025 13:21:29 +0300 Subject: [PATCH 17/24] add download-path argument --- cbsurge/exposure/population/__init__.py | 5 +++-- cbsurge/exposure/population/constants.py | 22 +++++++++++----------- cbsurge/exposure/population/worldpop.py | 20 ++++++++++---------- 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/cbsurge/exposure/population/__init__.py b/cbsurge/exposure/population/__init__.py index ae7c4c11..ed7e70d3 100644 --- a/cbsurge/exposure/population/__init__.py +++ b/cbsurge/exposure/population/__init__.py @@ -23,5 +23,6 @@ def sync(force_reprocessing, country, download_path): @click.option('--country', help='The ISO3 code of the country to process the data for') @click.option('--age-group', help='The age group to process the data for', type=click.Choice(['child', 'active', 'elderly'])) @click.option('--sex', help='Path to the downloaded data', type=click.Choice(['male', 'female'])) -def run_aggregate(country, age_group, sex): - asyncio.run(process_aggregates_for_country(country_code=country, age_group=age_group, sex=sex)) \ No newline at end of file +@click.option('--download-path', help='Download data locally', required=False) +def run_aggregate(country, age_group, sex, download_path): + asyncio.run(process_aggregates_for_country(country_code=country, age_group=age_group, sex=sex, download_path=download_path)) \ No newline at end of file diff --git a/cbsurge/exposure/population/constants.py b/cbsurge/exposure/population/constants.py index 19cbf483..28e778a2 100644 --- a/cbsurge/exposure/population/constants.py +++ b/cbsurge/exposure/population/constants.py @@ -15,15 +15,15 @@ DATA_YEAR = 2020 AGESEX_STRUCTURE_COMBINATIONS = [ - {"sexes": ["M"], "age_group": None, "label": "male_total"}, - {"sexes": ["F"], "age_group": None, "label": "female_total"}, - {"sexes": ["M"], "age_group": "active", "label": "male_active"}, - {"sexes": ["F"], "age_group": "active", "label": "female_active"}, - {"sexes": ["M"], "age_group": "child", "label": "male_child"}, - {"sexes": ["F"], "age_group": "child", "label": "female_child"}, - {"sexes": ["M"], "age_group": "elderly", "label": "male_elderly"}, - {"sexes": ["F"], "age_group": "elderly", "label": "female_elderly"}, - {"sexes": ["M", "F"], "age_group": "elderly", "label": "elderly_total"}, - {"sexes": ["M", "F"], "age_group": "child", "label": "child_total"}, - {"sexes": ["M", "F"], "age_group": "active", "label": "active_total"}, + {"sexes": ["male"], "age_group": None, "label": "male_total"}, + {"sexes": ["female"], "age_group": None, "label": "female_total"}, + {"sexes": ["male"], "age_group": "active", "label": "male_active"}, + {"sexes": ["female"], "age_group": "active", "label": "female_active"}, + {"sexes": ["male"], "age_group": "child", "label": "male_child"}, + {"sexes": ["female"], "age_group": "child", "label": "female_child"}, + {"sexes": ["male"], "age_group": "elderly", "label": "male_elderly"}, + {"sexes": ["female"], "age_group": "elderly", "label": "female_elderly"}, + {"sexes": ["male", "female"], "age_group": "elderly", "label": "elderly_total"}, + {"sexes": ["male", "female"], "age_group": "child", "label": "child_total"}, + {"sexes": ["male", "female"], "age_group": "active", "label": "active_total"}, ] \ No newline at end of file diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index f57cf851..e1a34493 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -300,14 +300,14 @@ async def download_data(country_code=None, year=DATA_YEAR, force_reprocessing=Fa if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code): logging.info("Aggregate files already exist for country: %s", country_code) else: - await process_aggregates_for_country(country_code=country_code) + await process_aggregates_for_country(country_code=country_code, download_path=download_path) logging.info("Aggregate processing complete for country: %s", country_code) # Close the storage manager connection after all files have been processed logging.info("Closing storage manager after processing all files") await storage_manager.close() -async def check_aggregates_exist(storage_manager=None, country_code=None): +async def check_aggregates_exist(storage_manager=AzureBlobStorageManager(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING")), country_code=None): """ Check if aggregate files exist for a given country code. Args: @@ -403,7 +403,7 @@ def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): -async def process_aggregates_for_country(country_code: str, sex: Optional[str] = None, age_group: Optional[str] = None, year="2020"): +async def process_aggregates_for_country(country_code: str, sex: Optional[str] = None, age_group: Optional[str] = None, year="2020", download_path=None): """ Process aggregate files for combinations of sex and age groups, or specific arguments passed. Args: @@ -411,6 +411,7 @@ async def process_aggregates_for_country(country_code: str, sex: Optional[str] = sex (Optional[str]): Sex to process (male or female). age_group (Optional[str]): Age group to process (child, active, elderly). year (Optional[str]): (Unimplemented). The year for which the data should be produced for + download_path (Optional[str]): Local path to download the COG files to. If provided, the files will not be uploaded to Azure. """ assert country_code, "Country code must be provided" @@ -449,20 +450,19 @@ async def create_local_files(blob): tasks = [create_local_files(blob) for blob in blobs] await asyncio.gather(*tasks) - # for blob in blobs: - # local_file = os.path.join(temp_dir, os.path.basename(blob)) - # await storage_manager.download_blob(blob, temp_dir) - # local_files.append(local_file) - output_file = f"{temp_dir}/{output_label}.tif" with ThreadPoolExecutor() as executor: executor.submit(create_sum, input_file_paths=local_files, output_file_path=output_file, block_size=(1028, 1028)) # Upload the result blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{country_code}_{output_label}.tif" - # await storage_manager.upload_blob(file_path=output_file, blob_name=blob_path) - shutil.copy2(output_file, f"data/{country_code}_{output_label}.tif") + if download_path: + os.makedirs(f"{download_path}/{DATA_YEAR}/{country_code}/aggregate", exist_ok=True) + shutil.copy2(output_file, f"{download_path}/{DATA_YEAR}/{country_code}/aggregate/{country_code}_{output_label}.tif") + else: + await storage_manager.upload_blob(file_path=output_file, blob_name=blob_path) logging.info("Processed and uploaded: %s", blob_path) + # Dynamic argument-based processing if sex and age_group: label = f"{sex}_{age_group}" From 5cc809c8b08b44a47542f598c5a462d9d9823ecb Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Thu, 16 Jan 2025 16:20:54 +0300 Subject: [PATCH 18/24] refactor: improve population workflow --- cbsurge/exposure/population/__init__.py | 18 ++- cbsurge/exposure/population/worldpop.py | 174 ++++++++++++++---------- 2 files changed, 113 insertions(+), 79 deletions(-) diff --git a/cbsurge/exposure/population/__init__.py b/cbsurge/exposure/population/__init__.py index ed7e70d3..8698d594 100644 --- a/cbsurge/exposure/population/__init__.py +++ b/cbsurge/exposure/population/__init__.py @@ -2,7 +2,7 @@ import click -from cbsurge.exposure.population.worldpop import download_data, process_aggregates_for_country +from cbsurge.exposure.population.worldpop import population_sync, process_aggregates, download @click.group() @@ -16,13 +16,23 @@ def population(): @click.option('--country', help='The ISO3 code of the country to process the data for') @click.option('--download-path', help='Download data locally', required=False) def sync(force_reprocessing, country, download_path): - asyncio.run(download_data(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path)) + asyncio.run(population_sync(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path)) +@population.command() +@click.option('--country', help='The ISO3 code of the country to process the data for') +@click.option('--force-reprocessing', help='Force reprocessing of data', is_flag=True) +@click.option('--download-path', help='Download data locally', required=False) +@click.option('--age-group', help='The age group to process the data for', type=click.Choice(['child', 'active', 'elderly'])) +@click.option('--sex', help='Path to the downloaded data', type=click.Choice(['male', 'female'])) +def download(country, force_reprocessing, download_path, age_group, sex): + asyncio.run(download(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path, age_group=age_group, sex=sex)) + @population.command() @click.option('--country', help='The ISO3 code of the country to process the data for') @click.option('--age-group', help='The age group to process the data for', type=click.Choice(['child', 'active', 'elderly'])) @click.option('--sex', help='Path to the downloaded data', type=click.Choice(['male', 'female'])) @click.option('--download-path', help='Download data locally', required=False) -def run_aggregate(country, age_group, sex, download_path): - asyncio.run(process_aggregates_for_country(country_code=country, age_group=age_group, sex=sex, download_path=download_path)) \ No newline at end of file +@click.option('--force-reprocessing', help='Force reprocessing of data', is_flag=True) +def run_aggregate(country, age_group, sex, download_path, force_reprocessing): + asyncio.run(process_aggregates(country_code=country, age_group=age_group, sex=sex, download_path=download_path, force_reprocessing=force_reprocessing)) \ No newline at end of file diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index e1a34493..2ab1ec09 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -71,6 +71,7 @@ def chunker_function(iterable, chunk_size=4): async def get_available_data(country_code=None, year=DATA_YEAR): """ + Return the Country Codes and IDs of the pages in worldpop where the Age-Sex structures are Args: country_code: The country code for which to fetch data year: (Not implemented) The year for which to fetch data @@ -259,8 +260,23 @@ async def get_links_from_table(data_id=None): return [] return await extract_links_from_table(response.text) +async def download(country_code=None, year=DATA_YEAR, force_reprocessing=False, download_path=None, sex=None, age_group=None): + """ + Args: + country_code: The country code of the country intended to be downloaded + year: + force_reprocessing: + download_path: + sex: + age_group: + + Returns: + + """ + # TODO: Should try to download data from azure, if the data is available + pass -async def download_data(country_code=None, year=DATA_YEAR, force_reprocessing=False, download_path=None): +async def population_sync(country_code=None, year=DATA_YEAR, force_reprocessing=False, download_path=None): """ Download all available data for a given country and year. Args: @@ -297,11 +313,12 @@ async def download_data(country_code=None, year=DATA_YEAR, force_reprocessing=Fa logging.error("Error processing file: %s", result) logging.info("Data download complete for country: %s", country_code) logging.info("Starting aggregate processing for country: %s", country_code) - if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code): - logging.info("Aggregate files already exist for country: %s", country_code) - else: - await process_aggregates_for_country(country_code=country_code, download_path=download_path) - logging.info("Aggregate processing complete for country: %s", country_code) + await process_aggregates(country_code=country_code, download_path=download_path, force_reprocessing=force_reprocessing, ) + # if await check_aggregates_exist(storage_manager=storage_manager, country_code=country_code): + # logging.info("Aggregate files already exist for country: %s", country_code) + # else: + + # logging.info("Aggregate processing complete for country: %s", country_code) # Close the storage manager connection after all files have been processed logging.info("Closing storage manager after processing all files") await storage_manager.close() @@ -403,87 +420,94 @@ def create_sum(input_file_paths, output_file_path, block_size=(256, 256)): -async def process_aggregates_for_country(country_code: str, sex: Optional[str] = None, age_group: Optional[str] = None, year="2020", download_path=None): +async def process_aggregates(country_code: str, sex: Optional[str] = None, age_group: Optional[str] = None, year="2020", download_path=None, force_reprocessing=False): """ Process aggregate files for combinations of sex and age groups, or specific arguments passed. Args: - country_code (str): Country code for processing. + country_code (Optional[str]): Country code for processing. If not provided, it will process all the countries available from the get_available_data() function. sex (Optional[str]): Sex to process (male or female). age_group (Optional[str]): Age group to process (child, active, elderly). year (Optional[str]): (Unimplemented). The year for which the data should be produced for download_path (Optional[str]): Local path to download the COG files to. If provided, the files will not be uploaded to Azure. + force_reprocessing (Optional[bool]): Force reprocessing of the files even if they already exist in Azure. """ - - assert country_code, "Country code must be provided" async with AzureBlobStorageManager(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING")) as storage_manager: - logging.info("Processing aggregate files for country: %s", country_code) - async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, output_label: str=None): - """ - Processes and sums files for specified sexes and age groups. - Args: - sexes (List[str]): List of sexes (e.g., ['male'], ['female'], or ['male', 'female']). - age_grouping (Optional[str]): Age group to process. - output_label (str): Label for the output file. - """ - # Construct paths - paths = [f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/{s}" for s in sexes] - if age_grouping: - assert age_grouping in WORLDPOP_AGE_MAPPING, "Invalid age group provided" - paths = [f"{path}/{age_grouping}/" for path in paths] - - # Fetch blobs - blobs = [] - for path in paths: - blobs += await storage_manager.list_blobs(path) - if not blobs: - logging.warning("No blobs found for paths: %s", paths) - return - # Download blobs and sum them - with tempfile.TemporaryDirectory() as temp_dir: - local_files = [] - # download blobs concurrently - async def create_local_files(blob): - local_file = os.path.join(temp_dir, os.path.basename(blob)) - await storage_manager.download_blob(blob, temp_dir) - local_files.append(local_file) - tasks = [create_local_files(blob) for blob in blobs] - await asyncio.gather(*tasks) - - output_file = f"{temp_dir}/{output_label}.tif" - with ThreadPoolExecutor() as executor: - executor.submit(create_sum, input_file_paths=local_files, output_file_path=output_file, block_size=(1028, 1028)) - - # Upload the result - blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{country_code}/aggregate/{country_code}_{output_label}.tif" - if download_path: - os.makedirs(f"{download_path}/{DATA_YEAR}/{country_code}/aggregate", exist_ok=True) - shutil.copy2(output_file, f"{download_path}/{DATA_YEAR}/{country_code}/aggregate/{country_code}_{output_label}.tif") - else: - await storage_manager.upload_blob(file_path=output_file, blob_name=blob_path) - logging.info("Processed and uploaded: %s", blob_path) - - # Dynamic argument-based processing - if sex and age_group: - label = f"{sex}_{age_group}" - logging.info("Processing for sex '%s' and age group '%s'", sex, age_group) - await process_group(sexes=[sex], age_grouping=age_group, output_label=label) - elif sex: - label = f"{sex}_total" - logging.info("Processing for sex '%s' (all age groups)", sex) - await process_group(sexes=[sex], output_label=label) - elif age_group: - label = f"{age_group}_total" - logging.info("Processing for age group '%s' (both sexes)", age_group) - await process_group(sexes=['male', 'female'], age_grouping=age_group, output_label=label) + if not country_code: + country_codes = list((await get_available_data()).keys()) else: - # Process predefined combinations - logging.info("Processing all predefined combinations...") - for combo in AGESEX_STRUCTURE_COMBINATIONS: - logging.info("Processing %s...", combo["label"]) - await process_group(sexes=combo["sexes"], age_grouping=combo["age_group"], output_label=combo["label"]) + country_codes = [country_code] + for c_code in country_codes: + logging.info("Processing aggregate files for country: %s", c_code) + async def process_group(sexes: List[str]=None, age_grouping: Optional[str]=None, output_label: str=None): + """ + Processes and sums files for specified sexes and age groups. + Args: + sexes (List[str]): List of sexes (e.g., ['male'], ['female'], or ['male', 'female']). + age_grouping (Optional[str]): Age group to process. + output_label (str): Label for the output file. + """ + # Construct paths + paths = [f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{c_code}/{s}" for s in sexes] + if age_grouping: + assert age_grouping in WORLDPOP_AGE_MAPPING, "Invalid age group provided" + paths = [f"{path}/{age_grouping}/" for path in paths] + + # Fetch blobs + blobs = [] + for path in paths: + blobs += await storage_manager.list_blobs(path) + if not blobs: + logging.warning("No blobs found for paths: %s", paths) + return - logging.info("All processing complete for country: %s", country_code) + # Download blobs and sum them + with tempfile.TemporaryDirectory() as temp_dir: + local_files = [] + # download blobs concurrently + async def create_local_files(blob): + local_file = os.path.join(temp_dir, os.path.basename(blob)) + await storage_manager.download_blob(blob, temp_dir) + local_files.append(local_file) + tasks = [create_local_files(blob) for blob in blobs] + await asyncio.gather(*tasks) + + output_file = f"{temp_dir}/{output_label}.tif" + with ThreadPoolExecutor() as executor: + executor.submit(create_sum, input_file_paths=local_files, output_file_path=output_file, block_size=(1028, 1028)) + + # Upload the result + blob_path = f"{AZ_ROOT_FILE_PATH}/{DATA_YEAR}/{c_code}/aggregate/{c_code}_{output_label}.tif" + if download_path: + os.makedirs(f"{download_path}/{DATA_YEAR}/{c_code}/aggregate", exist_ok=True) + shutil.copy2(output_file, f"{download_path}/{DATA_YEAR}/{c_code}/aggregate/{c_code}_{output_label}.tif") + else: + await storage_manager.upload_blob(file_path=output_file, blob_name=blob_path) + logging.info("Processed and uploaded: %s", blob_path) + + if not force_reprocessing and await check_aggregates_exist(storage_manager=storage_manager, country_code=c_code): + logging.info("Aggregate files already exist for country: %s", c_code) + continue + # Dynamic argument-based processing + if sex and age_group: + label = f"{sex}_{age_group}" + logging.info("Processing for sex '%s' and age group '%s'", sex, age_group) + await process_group(sexes=[sex], age_grouping=age_group, output_label=label) + elif sex: + label = f"{sex}_total" + logging.info("Processing for sex '%s' (all age groups)", sex) + await process_group(sexes=[sex], output_label=label) + elif age_group: + label = f"{age_group}_total" + logging.info("Processing for age group '%s' (both sexes)", age_group) + await process_group(sexes=['male', 'female'], age_grouping=age_group, output_label=label) + else: + # Process predefined combinations + logging.info("Processing all predefined combinations...") + for combo in AGESEX_STRUCTURE_COMBINATIONS: + logging.info("Processing %s...", combo["label"]) + await process_group(sexes=combo["sexes"], age_grouping=combo["age_group"], output_label=combo["label"]) + logging.info("All processing complete for country: %s", c_code) if __name__ == "__main__": From 85791f48be5643beb41ec36f674e60af355ac2fc Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Thu, 16 Jan 2025 17:53:37 +0300 Subject: [PATCH 19/24] shuffle list --- cbsurge/exposure/population/worldpop.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 2ab1ec09..8c46ee1f 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -7,6 +7,8 @@ from html.parser import HTMLParser from concurrent.futures import ThreadPoolExecutor from typing import List, Optional +import random + import aiofiles import httpx @@ -435,6 +437,7 @@ async def process_aggregates(country_code: str, sex: Optional[str] = None, age_g if not country_code: country_codes = list((await get_available_data()).keys()) + random.shuffle(country_codes) else: country_codes = [country_code] for c_code in country_codes: From c6610753dc119103bc021f7df7af5716cacf7f2b Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Thu, 16 Jan 2025 19:59:18 +0300 Subject: [PATCH 20/24] shuffle age_sex constant --- cbsurge/exposure/population/worldpop.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 8c46ee1f..ef2ce48d 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -507,6 +507,7 @@ async def create_local_files(blob): else: # Process predefined combinations logging.info("Processing all predefined combinations...") + random.shuffle(AGESEX_STRUCTURE_COMBINATIONS) for combo in AGESEX_STRUCTURE_COMBINATIONS: logging.info("Processing %s...", combo["label"]) await process_group(sexes=combo["sexes"], age_grouping=combo["age_group"], output_label=combo["label"]) From 49144b0e396c1192887d6ec9b8e6a215802f7fb9 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Thu, 16 Jan 2025 20:32:28 +0300 Subject: [PATCH 21/24] chunk1 --- cbsurge/exposure/population/worldpop.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index ef2ce48d..75544864 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -507,8 +507,8 @@ async def create_local_files(blob): else: # Process predefined combinations logging.info("Processing all predefined combinations...") - random.shuffle(AGESEX_STRUCTURE_COMBINATIONS) - for combo in AGESEX_STRUCTURE_COMBINATIONS: + # random.shuffle(AGESEX_STRUCTURE_COMBINATIONS) + for combo in AGESEX_STRUCTURE_COMBINATIONS[0:2]: logging.info("Processing %s...", combo["label"]) await process_group(sexes=combo["sexes"], age_grouping=combo["age_group"], output_label=combo["label"]) logging.info("All processing complete for country: %s", c_code) From 301c2e51daf05d9fe0a593f1db53448589c0a3f2 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Thu, 16 Jan 2025 20:33:32 +0300 Subject: [PATCH 22/24] chunk2 --- cbsurge/exposure/population/worldpop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 75544864..39b03083 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -508,7 +508,7 @@ async def create_local_files(blob): # Process predefined combinations logging.info("Processing all predefined combinations...") # random.shuffle(AGESEX_STRUCTURE_COMBINATIONS) - for combo in AGESEX_STRUCTURE_COMBINATIONS[0:2]: + for combo in AGESEX_STRUCTURE_COMBINATIONS[2:4]: logging.info("Processing %s...", combo["label"]) await process_group(sexes=combo["sexes"], age_grouping=combo["age_group"], output_label=combo["label"]) logging.info("All processing complete for country: %s", c_code) From 7ba166448e57cec6dd44f36b7c4b7a712114eb89 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Thu, 16 Jan 2025 20:34:10 +0300 Subject: [PATCH 23/24] chunk3 --- cbsurge/exposure/population/worldpop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 39b03083..91101c40 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -508,7 +508,7 @@ async def create_local_files(blob): # Process predefined combinations logging.info("Processing all predefined combinations...") # random.shuffle(AGESEX_STRUCTURE_COMBINATIONS) - for combo in AGESEX_STRUCTURE_COMBINATIONS[2:4]: + for combo in AGESEX_STRUCTURE_COMBINATIONS[4:6]: logging.info("Processing %s...", combo["label"]) await process_group(sexes=combo["sexes"], age_grouping=combo["age_group"], output_label=combo["label"]) logging.info("All processing complete for country: %s", c_code) From 34e9156eef7e9611273c5650f37d0ae4804fb4e2 Mon Sep 17 00:00:00 2001 From: Thuhaa Date: Mon, 20 Jan 2025 11:36:17 +0300 Subject: [PATCH 24/24] feat: cleanup --- cbsurge/exposure/population/worldpop.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cbsurge/exposure/population/worldpop.py b/cbsurge/exposure/population/worldpop.py index 91101c40..7bf01284 100644 --- a/cbsurge/exposure/population/worldpop.py +++ b/cbsurge/exposure/population/worldpop.py @@ -7,7 +7,6 @@ from html.parser import HTMLParser from concurrent.futures import ThreadPoolExecutor from typing import List, Optional -import random import aiofiles @@ -437,7 +436,6 @@ async def process_aggregates(country_code: str, sex: Optional[str] = None, age_g if not country_code: country_codes = list((await get_available_data()).keys()) - random.shuffle(country_codes) else: country_codes = [country_code] for c_code in country_codes: @@ -508,7 +506,7 @@ async def create_local_files(blob): # Process predefined combinations logging.info("Processing all predefined combinations...") # random.shuffle(AGESEX_STRUCTURE_COMBINATIONS) - for combo in AGESEX_STRUCTURE_COMBINATIONS[4:6]: + for combo in AGESEX_STRUCTURE_COMBINATIONS: logging.info("Processing %s...", combo["label"]) await process_group(sexes=combo["sexes"], age_grouping=combo["age_group"], output_label=combo["label"]) logging.info("All processing complete for country: %s", c_code)