Skip to content

Commit b45bc4a

Browse files
authored
feat: sync and aggregation of worldpop data (#89)
* fix: cleanup and aggregation * WIP: process specific age and sex groups: * run aggregations * feat: process all combinations implementation * fix: run it for all countries with sync command * remove persistent temp files * check for if aggregates exist * fix: update metadata for sizes * WIP: debugging * fix: breaking before all blocks are processed * upload data * refactor: to upload for all * refactor: process remaining * refactor: process remaining * WIP * fix: update to use and instead of and * add download-path argument * refactor: improve population workflow * shuffle list * shuffle age_sex constant * chunk1 * chunk2 * chunk3 * feat: cleanup
1 parent b32077c commit b45bc4a

File tree

6 files changed

+384
-50
lines changed

6 files changed

+384
-50
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ target/
8484
profile_default/
8585
ipython_config.py
8686

87+
88+
data/
8789
# pyenv
8890
# For a library or package, you might want to ignore these files since the code is
8991
# intended to run in multiple environments; otherwise, check them in:

cbsurge/azure/blob_storage.py

Lines changed: 83 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
import asyncio
12
import logging
23
import os
34

45
import aiofiles
56
from azure.storage.blob.aio import ContainerClient
6-
from tqdm.asyncio import tqdm
7+
from tqdm.asyncio import tqdm_asyncio, tqdm
78

89
from cbsurge.constants import AZURE_BLOB_CONTAINER_NAME
910

@@ -19,17 +20,19 @@ class AzureBlobStorageManager:
1920
await az.download_blob(blob_name, local_directory)
2021
await az.download_files(azure_directory, local_directory)
2122
"""
22-
def __aenter__(self):
23-
return self
24-
25-
def __aexit__(self, exc_type, exc_val, exc_tb):
26-
return self.close()
27-
2823
def __init__(self, conn_str):
2924
logging.info("Initializing Azure Blob Storage Manager")
3025
self.conn_str = conn_str
3126
self.container_client = ContainerClient.from_connection_string(conn_str=conn_str, container_name=AZURE_BLOB_CONTAINER_NAME)
3227

28+
async def __aenter__(self):
29+
self.container_client = ContainerClient.from_connection_string(conn_str=self.conn_str, container_name=AZURE_BLOB_CONTAINER_NAME)
30+
return self
31+
32+
async def __aexit__(self, exc_type, exc_val, exc_tb):
33+
if self.container_client:
34+
return await self.close()
35+
3336
async def upload_blob(self, file_path=None, blob_name=None):
3437
"""
3538
Upload a file to Azure Blob Storage.
@@ -47,7 +50,7 @@ async def upload_blob(self, file_path=None, blob_name=None):
4750
file_size = os.path.getsize(file_path)
4851

4952
async def __progress__(current, total):
50-
tqdm(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current)
53+
tqdm_asyncio(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current)
5154

5255
async with aiofiles.open(file_path, "rb") as data:
5356
await blob_client.upload_blob(data, overwrite=True, max_concurrency=4, blob_type="BlockBlob", length=file_size, progress_hook=__progress__)
@@ -69,30 +72,42 @@ async def upload_files(self, local_directory=None, azure_directory=None):
6972
for root, _, files in os.walk(local_directory):
7073
for file in files:
7174
async def __progress__(current, total):
72-
tqdm(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current)
75+
tqdm_asyncio(total=total, unit="B", unit_scale=True, desc=f"Uploading {blob_name}").update(current)
7376
file_path = os.path.join(root, file)
7477
blob_name = f"{azure_directory}/{file}"
7578
await self.upload_blob(file_path=file_path, blob_name=blob_name)
7679
return
7780

81+
82+
7883
async def download_blob(self, blob_name=None, local_directory=None):
7984
"""
8085
Download a blob from Azure Blob Storage.
86+
8187
Args:
8288
blob_name: (str) The name of the blob to download.
8389
local_directory: (str) The local path to save the downloaded blob. If not provided, the blob will be saved in the current working directory.
84-
Returns:
8590
91+
Returns:
92+
str: The path to the downloaded file.
8693
"""
8794
logging.info("Downloading blob: %s", blob_name)
8895
blob_client = self.container_client.get_blob_client(blob=blob_name)
8996
file_name = blob_name.split("/")[-1]
9097
if local_directory:
9198
os.makedirs(local_directory, exist_ok=True)
9299
file_name = f"{local_directory}/{file_name}"
100+
101+
blob_properties = await blob_client.get_blob_properties()
102+
total_size = blob_properties.size
103+
93104
async with aiofiles.open(file_name, "wb") as data:
94105
blob = await blob_client.download_blob()
95-
await data.write(await blob.readall())
106+
progress = tqdm_asyncio(total=total_size, unit="B", unit_scale=True, desc=file_name)
107+
async for chunk in blob.chunks():
108+
await data.write(chunk)
109+
progress.update(len(chunk))
110+
progress.close()
96111
logging.info("Download completed for blob: %s", blob_name)
97112
return file_name
98113

@@ -106,10 +121,65 @@ async def download_files(self, azure_directory=None, local_directory=None):
106121
Returns:
107122
108123
"""
109-
async for blob in self.container_client.list_blobs(name_starts_with=azure_directory):
110-
await self.download_blob(blob_name=blob.name, local_directory=local_directory)
124+
blobs = await self.list_blobs(prefix=azure_directory)
125+
for blob in blobs:
126+
await self.download_blob(blob_name=blob, local_directory=local_directory)
111127
return
112128

129+
async def list_blobs(self, prefix=None):
130+
"""
131+
List blobs in the Azure Blob Storage container.
132+
Args:
133+
prefix: (str) The prefix to filter blobs by.
134+
135+
Returns:
136+
"""
137+
return [blob.name async for blob in self.container_client.list_blobs(name_starts_with=prefix)]
138+
139+
140+
async def copy_file(self, source_blob=None, destination_blob=None):
141+
"""
142+
Copy a file from one blob to another.
143+
Args:
144+
source_blob: (str) The name of the source blob to copy.
145+
destination_blob: (str) The name of the destination blob to copy to.
146+
147+
Returns:
148+
149+
"""
150+
logging.info("Copying blob: %s to %s", source_blob, destination_blob)
151+
source_blob_client = self.container_client.get_blob_client(blob=source_blob)
152+
destination_blob_client = self.container_client.get_blob_client(blob=destination_blob)
153+
await destination_blob_client.start_copy_from_url(source_blob_client.url)
154+
return destination_blob_client.url
155+
156+
async def delete_blob(self, blob_name=None):
157+
"""
158+
Delete a blob from Azure Blob Storage.
159+
Args:
160+
blob_name:
161+
162+
Returns:
163+
164+
"""
165+
logging.info("Deleting blob: %s", blob_name)
166+
blob_client = self.container_client.get_blob_client(blob=blob_name)
167+
await blob_client.delete_blob()
168+
return blob_name
169+
170+
async def rename_file(self, source_blob=None, destination_blob=None):
171+
"""
172+
Rename a blob file
173+
Args:
174+
source_blob:
175+
destination_blob:
176+
177+
Returns:
178+
179+
"""
180+
await self.copy_file(source_blob=source_blob, destination_blob=destination_blob)
181+
await self.delete_blob(blob_name=source_blob)
182+
return destination_blob
113183

114184
async def close(self):
115185
"""
@@ -118,4 +188,3 @@ async def close(self):
118188
"""
119189
logging.info("Closing Azure Blob Storage Manager")
120190
return await self.container_client.close()
121-

cbsurge/azure/fileshare.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,19 @@ async def download_file(self, file_name, download_path):
138138
progress_bar.close()
139139
return file_name
140140

141+
async def copy_file(self, source_file, destination_file):
142+
"""
143+
Copy a file from one location to another in the Azure File Share.
144+
Args:
145+
source_file: The file to copy.
146+
destination_file: The destination file.
147+
148+
Returns:
149+
150+
"""
151+
source_file_client = self.share_client.get_file_client(source_file)
152+
destination_file_client = self.share_client.get_file_client(destination_file)
153+
154+
await destination_file_client.start_copy_from_url(source_file_client.url)
155+
return destination_file
156+

cbsurge/exposure/population/__init__.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import click
44

5-
from cbsurge.exposure.population.worldpop import download_data
5+
from cbsurge.exposure.population.worldpop import population_sync, process_aggregates, download
66

77

88
@click.group()
@@ -15,5 +15,24 @@ def population():
1515
@click.option('--force-reprocessing', help='Force reprocessing of data', is_flag=True)
1616
@click.option('--country', help='The ISO3 code of the country to process the data for')
1717
@click.option('--download-path', help='Download data locally', required=False)
18-
def run_download(force_reprocessing, country, download_path):
19-
asyncio.run(download_data(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path))
18+
def sync(force_reprocessing, country, download_path):
19+
asyncio.run(population_sync(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path))
20+
21+
22+
@population.command()
23+
@click.option('--country', help='The ISO3 code of the country to process the data for')
24+
@click.option('--force-reprocessing', help='Force reprocessing of data', is_flag=True)
25+
@click.option('--download-path', help='Download data locally', required=False)
26+
@click.option('--age-group', help='The age group to process the data for', type=click.Choice(['child', 'active', 'elderly']))
27+
@click.option('--sex', help='Path to the downloaded data', type=click.Choice(['male', 'female']))
28+
def download(country, force_reprocessing, download_path, age_group, sex):
29+
asyncio.run(download(force_reprocessing=force_reprocessing, country_code=country, download_path=download_path, age_group=age_group, sex=sex))
30+
31+
@population.command()
32+
@click.option('--country', help='The ISO3 code of the country to process the data for')
33+
@click.option('--age-group', help='The age group to process the data for', type=click.Choice(['child', 'active', 'elderly']))
34+
@click.option('--sex', help='Path to the downloaded data', type=click.Choice(['male', 'female']))
35+
@click.option('--download-path', help='Download data locally', required=False)
36+
@click.option('--force-reprocessing', help='Force reprocessing of data', is_flag=True)
37+
def run_aggregate(country, age_group, sex, download_path, force_reprocessing):
38+
asyncio.run(process_aggregates(country_code=country, age_group=age_group, sex=sex, download_path=download_path, force_reprocessing=force_reprocessing))

cbsurge/exposure/population/constants.py

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,27 @@
33
AZ_ROOT_FILE_PATH = "worldpop"
44
CONTAINER_NAME = "stacdata"
55
WORLDPOP_AGE_MAPPING = {
6-
"0-12": 0,
7-
"1-4": 1,
8-
"5-9": 5,
9-
"10-14": 10,
10-
"15-19": 15,
11-
"20-24": 20,
12-
"25-29": 25,
13-
"30-34": 30,
14-
"35-39": 35,
15-
"40-44": 40,
16-
"45-49": 45,
17-
"50-54": 50,
18-
"55-59": 55,
19-
"60-64": 60,
20-
"65-69": 65,
21-
"70-74": 70,
22-
"75-79": 75,
23-
"80+": 80
6+
"child": [0, 14],
7+
"active": [15, 64],
8+
"elderly": [65, 100],
249
}
10+
SEX_MAPPING = {
11+
"M": "male",
12+
"F": "female",
13+
}
14+
15+
DATA_YEAR = 2020
16+
17+
AGESEX_STRUCTURE_COMBINATIONS = [
18+
{"sexes": ["male"], "age_group": None, "label": "male_total"},
19+
{"sexes": ["female"], "age_group": None, "label": "female_total"},
20+
{"sexes": ["male"], "age_group": "active", "label": "male_active"},
21+
{"sexes": ["female"], "age_group": "active", "label": "female_active"},
22+
{"sexes": ["male"], "age_group": "child", "label": "male_child"},
23+
{"sexes": ["female"], "age_group": "child", "label": "female_child"},
24+
{"sexes": ["male"], "age_group": "elderly", "label": "male_elderly"},
25+
{"sexes": ["female"], "age_group": "elderly", "label": "female_elderly"},
26+
{"sexes": ["male", "female"], "age_group": "elderly", "label": "elderly_total"},
27+
{"sexes": ["male", "female"], "age_group": "child", "label": "child_total"},
28+
{"sexes": ["male", "female"], "age_group": "active", "label": "active_total"},
29+
]

0 commit comments

Comments
 (0)