Skip to content

Commit 51581f4

Browse files
authored
Pr 31 fix (#39)
* update examples
1 parent 12d5495 commit 51581f4

File tree

5 files changed

+107
-39
lines changed

5 files changed

+107
-39
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,6 @@ jobs:
9898
with:
9999
python-version: 3.11
100100
- name: Install code quality tools
101-
run: pip install black
101+
run: pip install black==25.1.0
102102
- name: Check code formatting with black
103103
run: black --check .

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ coverage.xml
1515
gde/
1616
upg/
1717
docker/
18+
/examples/output/output-*

examples/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
FROM nycplanning/docker-geosupport:latest
22

3-
RUN pip install pandas nyc-parser
3+
RUN pip install pandas nyc-parser tqdm
44

55
WORKDIR /examples
66

examples/pandas_multiprocessing.py

Lines changed: 103 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,42 @@
1-
# from: https://gist.github.com/ishiland/824ddd386fcd0b90fc55aea573a28b22
2-
# written by ishiland: https://github.com/ishiland
3-
# Minor edits by torreyma: https://github.com/torreyma
4-
#
5-
from geosupport import Geosupport, GeosupportError
6-
from nycparser import Parser
1+
"""
2+
Example of how to use python-geosupport, Pandas and Multiprocessing to speed up geocoding workflows.
3+
"""
4+
5+
import os
76
import pandas as pd
7+
from typing import Callable
88
from multiprocessing import Pool, cpu_count
99
from functools import partial
10-
import numpy as np
10+
from tqdm import tqdm # Progress bar
1111

12-
"""
13-
Example of how to use python-geosupport, Pandas and Multiprocessing to speed up geocoding workflows.
14-
"""
12+
from geosupport import Geosupport, GeosupportError
13+
from nycparser import Parser
14+
15+
# Determine reasonable CPU count (avoid memory issues)
16+
cpus = min(cpu_count(), 8)
17+
18+
# Proper path handling relative to script location
19+
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
20+
INPUT_CSV = os.path.join(SCRIPT_DIR, "data/input.csv")
21+
OUTPUT_CSV = os.path.join(SCRIPT_DIR, "data/output-pandas-multiprocessing.csv")
1522

23+
24+
# Create a single global instance
1625
g = Geosupport()
1726
p = Parser()
1827

19-
cpus = cpu_count()
2028

21-
INPUT_CSV = "/examples/data/input.csv"
22-
OUTPUT_CSV = "/examples/data/output-pandas-multiprocessing.csv"
29+
def geo_by_address(row: pd.Series) -> pd.Series:
30+
"""
31+
Geocodes a pandas row containing address attributes.
2332
33+
Args:
34+
row: Pandas Series with 'Address' and 'Borough' columns
2435
25-
def geo_by_address(row):
36+
Returns:
37+
Pandas Series with lat, lon & Geosupport message.
2638
"""
27-
Geocodes a pandas row containing address atributes.
2839

29-
:param row: Pandas Series
30-
:return: Pandas Series with lat, lon & Geosupport message.
31-
"""
3240
try:
3341
# parse the address to separate PHN and street
3442
parsed = p.address(row["Address"])
@@ -40,38 +48,97 @@ def geo_by_address(row):
4048
)
4149
lat = result.get("Latitude")
4250
lon = result.get("Longitude")
43-
msg = result.get("Message")
51+
msg = result.get("Message", "")
4452
except GeosupportError as ge:
4553
lat = ""
4654
lon = ""
4755
msg = str(ge)
56+
except Exception as e:
57+
lat = ""
58+
lon = ""
59+
msg = f"Error: {str(e)}"
60+
4861
return pd.Series([lat, lon, msg])
4962

5063

51-
def parallelize(data, func, num_of_processes=cpus):
52-
data_split = np.array_split(data, num_of_processes)
53-
pool = Pool(num_of_processes)
54-
data = pd.concat(pool.map(func, data_split))
55-
pool.close()
56-
pool.join()
57-
return data
64+
def run_on_subset(func: Callable, data_subset: pd.DataFrame) -> pd.DataFrame:
65+
"""Apply a function to each row of a dataframe subset"""
66+
return data_subset.apply(func, axis=1)
5867

5968

60-
def run_on_subset(func, data_subset):
61-
return data_subset.apply(func, axis=1)
69+
def parallelize(
70+
data: pd.DataFrame, func: Callable, num_of_processes: int = cpus
71+
) -> pd.DataFrame:
72+
"""
73+
Split dataframe and apply function in parallel
74+
75+
Args:
76+
data: Input DataFrame
77+
func: Function to apply to each chunk
78+
num_of_processes: Number of parallel processes
6279
80+
Returns:
81+
DataFrame with results
82+
"""
83+
# Create roughly equal sized chunks using pandas methods
84+
splits = []
85+
chunk_size = max(1, len(data) // num_of_processes)
86+
87+
# Create chunks without using numpy arrays
88+
for i in range(0, len(data), chunk_size):
89+
splits.append(data.iloc[i : min(i + chunk_size, len(data))].copy())
90+
91+
# Use tqdm for progress tracking
92+
with Pool(num_of_processes) as pool:
93+
results = list(
94+
tqdm(pool.imap(func, splits), total=len(splits), desc="Geocoding")
95+
)
6396

64-
def parallelize_on_rows(data, func, num_of_processes=cpus):
65-
return parallelize(data, partial(run_on_subset, func), num_of_processes)
97+
return pd.concat(results)
6698

6799

68100
if __name__ == "__main__":
101+
print(f"Starting geocoding with {cpus} processes")
69102

70-
# read in csv
71-
df = pd.read_csv(INPUT_CSV)
103+
# Process in batches for large datasets
104+
batch_size = 100000
105+
106+
# Check if input file exists
107+
if not os.path.exists(INPUT_CSV):
108+
print(f"Error: Input file not found: {INPUT_CSV}")
109+
exit(1)
72110

73-
# add 3 Geosupport columns - Latitude, Longitude and Geosupport message
74-
df[["lat", "lon", "msg"]] = parallelize_on_rows(df, geo_by_address)
111+
# Create output directory if it doesn't exist
112+
os.makedirs(os.path.dirname(OUTPUT_CSV), exist_ok=True)
113+
114+
# Read the input csv
115+
df = pd.read_csv(INPUT_CSV)
116+
total_rows = len(df)
117+
print(f"Processing {total_rows} addresses")
118+
119+
# Process in batches if large dataset
120+
if total_rows > batch_size:
121+
for i in range(0, total_rows, batch_size):
122+
print(
123+
f"Processing batch {i//batch_size + 1}/{(total_rows-1)//batch_size + 1}"
124+
)
125+
batch = df.iloc[i : i + batch_size].copy()
126+
127+
# Geocode the batch
128+
batch[["lat", "lon", "msg"]] = parallelize(
129+
batch, partial(run_on_subset, geo_by_address), num_of_processes=cpus
130+
)
131+
132+
# Write each batch (append mode after first batch)
133+
mode = "w" if i == 0 else "a"
134+
header = i == 0
135+
batch.to_csv(OUTPUT_CSV, mode=mode, header=header, index=False)
136+
print(f"Batch {i//batch_size + 1} complete")
137+
else:
138+
# For small datasets, process all at once
139+
df[["lat", "lon", "msg"]] = parallelize(
140+
df, partial(run_on_subset, geo_by_address), num_of_processes=cpus
141+
)
142+
df.to_csv(OUTPUT_CSV, index=False)
75143

76-
# output to csv with the 3 new columns.
77-
df.to_csv(OUTPUT_CSV)
144+
print(f"Geocoding complete! Results saved to {OUTPUT_CSV}")

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
extras_require={
5252
"dev": [
5353
"coverage",
54-
"black",
54+
"black==25.1.0",
5555
]
5656
},
5757
)

0 commit comments

Comments
 (0)