Skip to content

Commit 09f1458

Browse files
committed
Add Prefect
1 parent 7d70482 commit 09f1458

File tree

8 files changed

+583
-0
lines changed

8 files changed

+583
-0
lines changed

apps/prefect/Dockerfile.worker

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
FROM prefecthq/prefect:2-python3.11
2+
3+
WORKDIR /app
4+
5+
COPY requirements.txt .
6+
RUN pip install -r requirements.txt
7+
8+
# COPY dcpy ./dcpy
9+
# RUN pip install -e .
10+
11+
COPY . .
12+
13+
CMD ["prefect", "worker", "start", "--pool", "default-work-pool", "--type", "process"]

apps/prefect/config/recipe.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
product: bpl_libraries
2+
version: "2024.12.8"
3+
inputs:
4+
bpl_libraries:
5+
dataset: dcp_libraries
6+
version: "20241101"
7+
destination: file
8+
stage_config:
9+
builds.plan:
10+
connector_args: {}
11+
builds.load:
12+
connector_args: {}
13+
builds.build:
14+
build_command: echo "Mock DBT build completed"
15+
connector_args: {}
16+
builds.package:
17+
destination: minio
18+
destination_key: bpl_libraries
19+
connector_args:
20+
bucket: builds

apps/prefect/docker-compose.yml

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
version: "3.8"
2+
3+
services:
4+
postgres:
5+
image: postgres:15
6+
environment:
7+
POSTGRES_DB: prefect
8+
POSTGRES_USER: prefect
9+
POSTGRES_PASSWORD: prefect
10+
volumes:
11+
- postgres_data:/var/lib/postgresql/data
12+
ports:
13+
- "5432:5432"
14+
healthcheck:
15+
test: ["CMD-SHELL", "pg_isready -U prefect"]
16+
interval: 5s
17+
timeout: 5s
18+
retries: 5
19+
20+
minio:
21+
image: minio/minio:latest
22+
command: server /data --console-address ":9001"
23+
environment:
24+
MINIO_ROOT_USER: minioadmin
25+
MINIO_ROOT_PASSWORD: minioadmin
26+
ports:
27+
- "9000:9000"
28+
- "9001:9001"
29+
volumes:
30+
- minio_data:/data
31+
healthcheck:
32+
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
33+
interval: 30s
34+
timeout: 20s
35+
retries: 3
36+
37+
prefect-server:
38+
image: prefecthq/prefect:3-python3.11
39+
command: prefect server start --host 0.0.0.0
40+
environment:
41+
PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:prefect@postgres:5432/prefect
42+
ports:
43+
- "4200:4200"
44+
depends_on:
45+
postgres:
46+
condition: service_healthy
47+
healthcheck:
48+
test:
49+
[
50+
"CMD",
51+
"python",
52+
"-c",
53+
"import urllib.request; urllib.request.urlopen('http://localhost:4200/api/health')",
54+
]
55+
interval: 5s
56+
timeout: 5s
57+
retries: 10
58+
59+
# Combined setup - creates work pools and deployments
60+
prefect-setup-and-deploy:
61+
build:
62+
context: .
63+
dockerfile: Dockerfile.worker
64+
command: ./setup_and_deploy.sh
65+
environment:
66+
PREFECT_API_URL: http://prefect-server:4200/api
67+
depends_on:
68+
prefect-server:
69+
condition: service_healthy
70+
volumes:
71+
- ./flows:/app/flows
72+
- ./setup_and_deploy.sh:/app/setup_and_deploy.sh
73+
- ./prefect.yaml:/app/prefect.yaml
74+
restart: "no"
75+
76+
# Dev worker - processes dev work pool
77+
prefect-worker-dev:
78+
build:
79+
context: .
80+
dockerfile: Dockerfile.worker
81+
command: prefect worker start --pool data-engineering-dev-pool --type process
82+
environment:
83+
PREFECT_API_URL: http://prefect-server:4200/api
84+
MINIO_ENDPOINT: minio:9000
85+
MINIO_ACCESS_KEY: minioadmin
86+
MINIO_SECRET_KEY: minioadmin
87+
MINIO_BUCKET: builds
88+
ENV: dev
89+
depends_on:
90+
prefect-setup-and-deploy:
91+
condition: service_completed_successfully
92+
volumes:
93+
- ./flows:/app/flows
94+
- ./data:/app/data
95+
96+
# Prod worker - processes prod work pool
97+
prefect-worker-prod:
98+
build:
99+
context: .
100+
dockerfile: Dockerfile.worker
101+
command: prefect worker start --pool data-engineering-prod-pool --type process
102+
environment:
103+
PREFECT_API_URL: http://prefect-server:4200/api
104+
MINIO_ENDPOINT: minio:9000
105+
MINIO_ACCESS_KEY: minioadmin
106+
MINIO_SECRET_KEY: minioadmin
107+
MINIO_BUCKET: builds
108+
ENV: prod
109+
depends_on:
110+
prefect-setup-and-deploy:
111+
condition: service_completed_successfully
112+
volumes:
113+
- ./flows:/app/flows
114+
- ./data:/app/data
115+
116+
volumes:
117+
postgres_data:
118+
minio_data:
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import os
2+
import time
3+
from typing import Dict, Any
4+
5+
from prefect import flow, task, get_run_logger
6+
from prefect.artifacts import create_markdown_artifact
7+
from minio import Minio
8+
from minio.error import S3Error
9+
10+
11+
@task
12+
def plan_task(recipe_config: Dict[str, Any]) -> Dict[str, Any]:
13+
logger = get_run_logger()
14+
logger.info(f"Planning build for {recipe_config['product']}")
15+
16+
time.sleep(2)
17+
18+
planned_recipe = {
19+
**recipe_config,
20+
"resolved_version": recipe_config["version"],
21+
"inputs_resolved": {
22+
"bpl_libraries": {
23+
"dataset": "dcp_libraries",
24+
"version": "20241101",
25+
"file_type": "csv",
26+
"destination": "file",
27+
}
28+
},
29+
"plan_timestamp": time.time(),
30+
}
31+
32+
logger.info("Planning completed successfully")
33+
return planned_recipe
34+
35+
36+
@task
37+
def load_task(planned_recipe: Dict[str, Any]) -> str:
38+
logger = get_run_logger()
39+
logger.info("Loading source data (mocked)")
40+
41+
time.sleep(3)
42+
43+
data_path = (
44+
f"/tmp/data/{planned_recipe['product']}/{planned_recipe['resolved_version']}"
45+
)
46+
os.makedirs(data_path, exist_ok=True)
47+
48+
mock_data_file = f"{data_path}/libraries.csv"
49+
with open(mock_data_file, "w") as f:
50+
f.write("id,name,address,borough\n")
51+
f.write("1,Central Library,Grand Army Plaza,Brooklyn\n")
52+
f.write("2,Jefferson Market Library,425 Avenue of the Americas,Manhattan\n")
53+
54+
logger.info(f"Mock data loaded to {data_path}")
55+
return data_path
56+
57+
58+
@task
59+
def build_task(data_path: str, planned_recipe: Dict[str, Any]) -> str:
60+
logger = get_run_logger()
61+
logger.info("Running build process (mocked DBT)")
62+
63+
time.sleep(4)
64+
65+
build_output_path = f"{data_path}/build_outputs"
66+
os.makedirs(build_output_path, exist_ok=True)
67+
68+
output_file = f"{build_output_path}/bpl_libraries_processed.csv"
69+
with open(output_file, "w") as f:
70+
f.write("library_id,library_name,full_address,borough_code\n")
71+
f.write("BPL001,Central Library,Grand Army Plaza Brooklyn NY,BK\n")
72+
f.write(
73+
"NYPL002,Jefferson Market Library,425 Avenue of the Americas Manhattan NY,MN\n"
74+
)
75+
76+
logger.info(f"Build completed, outputs at {build_output_path}")
77+
return build_output_path
78+
79+
80+
@task
81+
def package_task(build_output_path: str, planned_recipe: Dict[str, Any]) -> str:
82+
logger = get_run_logger()
83+
logger.info("Packaging build outputs")
84+
85+
time.sleep(2)
86+
87+
minio_client = Minio(
88+
os.getenv("MINIO_ENDPOINT", "localhost:9000"),
89+
access_key=os.getenv("MINIO_ACCESS_KEY", "minioadmin"),
90+
secret_key=os.getenv("MINIO_SECRET_KEY", "minioadmin"),
91+
secure=False,
92+
)
93+
94+
bucket_name = os.getenv("MINIO_BUCKET", "builds")
95+
96+
try:
97+
if not minio_client.bucket_exists(bucket_name):
98+
minio_client.make_bucket(bucket_name)
99+
logger.info(f"Created bucket {bucket_name}")
100+
except S3Error as e:
101+
logger.error(f"Error creating bucket: {e}")
102+
103+
object_key = f"{planned_recipe['product']}/{planned_recipe['resolved_version']}/bpl_libraries_processed.csv"
104+
file_path = f"{build_output_path}/bpl_libraries_processed.csv"
105+
106+
try:
107+
minio_client.fput_object(bucket_name, object_key, file_path)
108+
logger.info(f"Uploaded {file_path} to {bucket_name}/{object_key}")
109+
except S3Error as e:
110+
logger.error(f"Error uploading file: {e}")
111+
raise
112+
113+
package_path = f"s3://{bucket_name}/{object_key}"
114+
logger.info(f"Packaging completed: {package_path}")
115+
return package_path
116+
117+
118+
@flow(name="Build Lifecycle")
119+
def build_lifecycle_flow(recipe_path: str = "/app/config/recipe.yml"):
120+
logger = get_run_logger()
121+
logger.info("Starting build lifecycle")
122+
123+
recipe_config = {
124+
"product": "bpl_libraries",
125+
"version": "2024.12.8",
126+
"inputs": {
127+
"bpl_libraries": {
128+
"dataset": "dcp_libraries",
129+
"version": "20241101",
130+
"destination": "file",
131+
}
132+
},
133+
}
134+
135+
planned_recipe = plan_task(recipe_config)
136+
data_path = load_task(planned_recipe)
137+
build_output_path = build_task(data_path, planned_recipe)
138+
package_path = package_task(build_output_path, planned_recipe)
139+
140+
create_markdown_artifact(
141+
key="build-summary",
142+
markdown=f"""# Build Summary
143+
144+
## Product: {planned_recipe["product"]}
145+
## Version: {planned_recipe["resolved_version"]}
146+
147+
### Stages Completed:
148+
- ✅ **Plan**: Recipe resolved successfully
149+
- ✅ **Load**: Mock data loaded to `{data_path}`
150+
- ✅ **Build**: DBT build simulated, outputs created
151+
- ✅ **Package**: Artifacts uploaded to `{package_path}`
152+
153+
### Timeline:
154+
- Total execution time: ~11 seconds (mocked)
155+
- Plan: 2s | Load: 3s | Build: 4s | Package: 2s
156+
""",
157+
)
158+
159+
logger.info(f"Build lifecycle completed successfully: {package_path}")
160+
return package_path
161+
162+
163+
if __name__ == "__main__":
164+
build_lifecycle_flow()

0 commit comments

Comments
 (0)