|
| 1 | +import os |
| 2 | +import time |
| 3 | +from typing import Dict, Any |
| 4 | + |
| 5 | +from prefect import flow, task, get_run_logger |
| 6 | +from prefect.artifacts import create_markdown_artifact |
| 7 | +from minio import Minio |
| 8 | +from minio.error import S3Error |
| 9 | + |
| 10 | + |
| 11 | +@task |
| 12 | +def plan_task(recipe_config: Dict[str, Any]) -> Dict[str, Any]: |
| 13 | + logger = get_run_logger() |
| 14 | + logger.info(f"Planning build for {recipe_config['product']}") |
| 15 | + |
| 16 | + time.sleep(2) |
| 17 | + |
| 18 | + planned_recipe = { |
| 19 | + **recipe_config, |
| 20 | + "resolved_version": recipe_config["version"], |
| 21 | + "inputs_resolved": { |
| 22 | + "bpl_libraries": { |
| 23 | + "dataset": "dcp_libraries", |
| 24 | + "version": "20241101", |
| 25 | + "file_type": "csv", |
| 26 | + "destination": "file", |
| 27 | + } |
| 28 | + }, |
| 29 | + "plan_timestamp": time.time(), |
| 30 | + } |
| 31 | + |
| 32 | + logger.info("Planning completed successfully") |
| 33 | + return planned_recipe |
| 34 | + |
| 35 | + |
| 36 | +@task |
| 37 | +def load_task(planned_recipe: Dict[str, Any]) -> str: |
| 38 | + logger = get_run_logger() |
| 39 | + logger.info("Loading source data (mocked)") |
| 40 | + |
| 41 | + time.sleep(3) |
| 42 | + |
| 43 | + data_path = ( |
| 44 | + f"/tmp/data/{planned_recipe['product']}/{planned_recipe['resolved_version']}" |
| 45 | + ) |
| 46 | + os.makedirs(data_path, exist_ok=True) |
| 47 | + |
| 48 | + mock_data_file = f"{data_path}/libraries.csv" |
| 49 | + with open(mock_data_file, "w") as f: |
| 50 | + f.write("id,name,address,borough\n") |
| 51 | + f.write("1,Central Library,Grand Army Plaza,Brooklyn\n") |
| 52 | + f.write("2,Jefferson Market Library,425 Avenue of the Americas,Manhattan\n") |
| 53 | + |
| 54 | + logger.info(f"Mock data loaded to {data_path}") |
| 55 | + return data_path |
| 56 | + |
| 57 | + |
| 58 | +@task |
| 59 | +def build_task(data_path: str, planned_recipe: Dict[str, Any]) -> str: |
| 60 | + logger = get_run_logger() |
| 61 | + logger.info("Running build process (mocked DBT)") |
| 62 | + |
| 63 | + time.sleep(4) |
| 64 | + |
| 65 | + build_output_path = f"{data_path}/build_outputs" |
| 66 | + os.makedirs(build_output_path, exist_ok=True) |
| 67 | + |
| 68 | + output_file = f"{build_output_path}/bpl_libraries_processed.csv" |
| 69 | + with open(output_file, "w") as f: |
| 70 | + f.write("library_id,library_name,full_address,borough_code\n") |
| 71 | + f.write("BPL001,Central Library,Grand Army Plaza Brooklyn NY,BK\n") |
| 72 | + f.write( |
| 73 | + "NYPL002,Jefferson Market Library,425 Avenue of the Americas Manhattan NY,MN\n" |
| 74 | + ) |
| 75 | + |
| 76 | + logger.info(f"Build completed, outputs at {build_output_path}") |
| 77 | + return build_output_path |
| 78 | + |
| 79 | + |
| 80 | +@task |
| 81 | +def package_task(build_output_path: str, planned_recipe: Dict[str, Any]) -> str: |
| 82 | + logger = get_run_logger() |
| 83 | + logger.info("Packaging build outputs") |
| 84 | + |
| 85 | + time.sleep(2) |
| 86 | + |
| 87 | + minio_client = Minio( |
| 88 | + os.getenv("MINIO_ENDPOINT", "localhost:9000"), |
| 89 | + access_key=os.getenv("MINIO_ACCESS_KEY", "minioadmin"), |
| 90 | + secret_key=os.getenv("MINIO_SECRET_KEY", "minioadmin"), |
| 91 | + secure=False, |
| 92 | + ) |
| 93 | + |
| 94 | + bucket_name = os.getenv("MINIO_BUCKET", "builds") |
| 95 | + |
| 96 | + try: |
| 97 | + if not minio_client.bucket_exists(bucket_name): |
| 98 | + minio_client.make_bucket(bucket_name) |
| 99 | + logger.info(f"Created bucket {bucket_name}") |
| 100 | + except S3Error as e: |
| 101 | + logger.error(f"Error creating bucket: {e}") |
| 102 | + |
| 103 | + object_key = f"{planned_recipe['product']}/{planned_recipe['resolved_version']}/bpl_libraries_processed.csv" |
| 104 | + file_path = f"{build_output_path}/bpl_libraries_processed.csv" |
| 105 | + |
| 106 | + try: |
| 107 | + minio_client.fput_object(bucket_name, object_key, file_path) |
| 108 | + logger.info(f"Uploaded {file_path} to {bucket_name}/{object_key}") |
| 109 | + except S3Error as e: |
| 110 | + logger.error(f"Error uploading file: {e}") |
| 111 | + raise |
| 112 | + |
| 113 | + package_path = f"s3://{bucket_name}/{object_key}" |
| 114 | + logger.info(f"Packaging completed: {package_path}") |
| 115 | + return package_path |
| 116 | + |
| 117 | + |
| 118 | +@flow(name="Build Lifecycle") |
| 119 | +def build_lifecycle_flow(recipe_path: str = "/app/config/recipe.yml"): |
| 120 | + logger = get_run_logger() |
| 121 | + logger.info("Starting build lifecycle") |
| 122 | + |
| 123 | + recipe_config = { |
| 124 | + "product": "bpl_libraries", |
| 125 | + "version": "2024.12.8", |
| 126 | + "inputs": { |
| 127 | + "bpl_libraries": { |
| 128 | + "dataset": "dcp_libraries", |
| 129 | + "version": "20241101", |
| 130 | + "destination": "file", |
| 131 | + } |
| 132 | + }, |
| 133 | + } |
| 134 | + |
| 135 | + planned_recipe = plan_task(recipe_config) |
| 136 | + data_path = load_task(planned_recipe) |
| 137 | + build_output_path = build_task(data_path, planned_recipe) |
| 138 | + package_path = package_task(build_output_path, planned_recipe) |
| 139 | + |
| 140 | + create_markdown_artifact( |
| 141 | + key="build-summary", |
| 142 | + markdown=f"""# Build Summary |
| 143 | + |
| 144 | +## Product: {planned_recipe["product"]} |
| 145 | +## Version: {planned_recipe["resolved_version"]} |
| 146 | +
|
| 147 | +### Stages Completed: |
| 148 | +- ✅ **Plan**: Recipe resolved successfully |
| 149 | +- ✅ **Load**: Mock data loaded to `{data_path}` |
| 150 | +- ✅ **Build**: DBT build simulated, outputs created |
| 151 | +- ✅ **Package**: Artifacts uploaded to `{package_path}` |
| 152 | +
|
| 153 | +### Timeline: |
| 154 | +- Total execution time: ~11 seconds (mocked) |
| 155 | +- Plan: 2s | Load: 3s | Build: 4s | Package: 2s |
| 156 | + """, |
| 157 | + ) |
| 158 | + |
| 159 | + logger.info(f"Build lifecycle completed successfully: {package_path}") |
| 160 | + return package_path |
| 161 | + |
| 162 | + |
| 163 | +if __name__ == "__main__": |
| 164 | + build_lifecycle_flow() |
0 commit comments