From 97519456303e4155379666e66b760e9967c15be5 Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Mon, 24 Nov 2025 10:52:56 -0500 Subject: [PATCH] Proto-marimo notebook --- .../marimo-server/notebooks/build_runner.py | 363 ++++++++++++++ .../notebooks/connector_management.py | 277 +++++++++++ .../notebooks/data_distribution.py | 459 ++++++++++++++++++ .../marimo-server/notebooks/distribution.py | 22 + .../notebooks/version_checker.py | 295 +++++++++++ .../apps/marimo-server/notebooks/versions.py | 22 + .../apps/marimo-server/requirements.txt | 27 ++ experimental/apps/marimo-server/server.py | 163 +++++++ experimental/apps/marimo-server/start.sh | 66 +++ 9 files changed, 1694 insertions(+) create mode 100644 experimental/apps/marimo-server/notebooks/build_runner.py create mode 100644 experimental/apps/marimo-server/notebooks/connector_management.py create mode 100644 experimental/apps/marimo-server/notebooks/data_distribution.py create mode 100644 experimental/apps/marimo-server/notebooks/distribution.py create mode 100644 experimental/apps/marimo-server/notebooks/version_checker.py create mode 100644 experimental/apps/marimo-server/notebooks/versions.py create mode 100644 experimental/apps/marimo-server/requirements.txt create mode 100644 experimental/apps/marimo-server/server.py create mode 100755 experimental/apps/marimo-server/start.sh diff --git a/experimental/apps/marimo-server/notebooks/build_runner.py b/experimental/apps/marimo-server/notebooks/build_runner.py new file mode 100644 index 0000000000..a8c245c75e --- /dev/null +++ b/experimental/apps/marimo-server/notebooks/build_runner.py @@ -0,0 +1,363 @@ +import marimo + +__generated_with = "0.9.0" +app = marimo.App(width="medium") + + +@app.cell +def __(): + import marimo as mo + + mo.md( + r""" + # đŸ—ī¸ Data Pipeline Build Runnersss + + This notebook provides an interface to execute and monitor data pipeline builds + for NYC DCP data products. + """ + ) + return (mo,) + + +@app.cell +def __(mo): + import subprocess + import sys + from pathlib import Path + import json + from datetime import datetime + + # Add dcpy to path + repo_root = Path(__file__).parents[3] # Go up to data-engineering root + products_dir = repo_root / "products" + + mo.md(f"📁 **Products Directory**: `{products_dir}`") + return Path, datetime, json, products_dir, repo_root, subprocess, sys + + +@app.cell +def __(mo, products_dir): + # Discover available products + available_products = [] + if products_dir.exists(): + for product_dir in products_dir.iterdir(): + if product_dir.is_dir() and (product_dir / "dbt_project.yml").exists(): + available_products.append(product_dir.name) + + if available_products: + product_selector = mo.ui.dropdown( + options=sorted(available_products), label="Select Product to Build:" + ) + + mo.md(f""" + ## Available Products ({len(available_products)}) + + {product_selector} + """) + else: + product_selector = None + mo.md("❌ No dbt products found in products directory") + return available_products, product_selector + + +@app.cell +def __(mo, product_selector): + # Build configuration options + if product_selector and product_selector.value: + # Build mode selector + build_mode = mo.ui.dropdown( + options=[ + ("Full Build", "run"), + ("Test Only", "test"), + ("Compile Only", "compile"), + ("Parse Only", "parse"), + ], + value="run", + label="Build Mode:", + ) + + # Model selector + model_selector = mo.ui.text( + placeholder="e.g., +model_name, tag:staging, --exclude test_type:unit", + label="Model Selection (optional):", + full_width=True, + ) + + # Environment selector + profile_selector = mo.ui.dropdown( + options=[ + ("Local Development", "local"), + ("Development", "development"), + ("Staging", "staging"), + ("Production", "production"), + ], + value="local", + label="Target Environment:", + ) + + mo.md(f""" + ### Build Configuration for {product_selector.value} + + {build_mode} + {model_selector} + {profile_selector} + """) + else: + build_mode = model_selector = profile_selector = None + mo.md("") + return build_mode, model_selector, profile_selector + + +@app.cell +def __(mo): + # Build execution status + build_status = { + "running": False, + "last_run": None, + "output": "", + "return_code": None, + } + + mo.md("## Build Execution") + return (build_status,) + + +@app.cell +def __( + build_mode, + build_status, + datetime, + mo, + model_selector, + product_selector, + products_dir, + profile_selector, + subprocess, +): + # Build execution function + def run_dbt_command(): + """Execute the dbt command based on selected options.""" + if not product_selector or not product_selector.value: + return "❌ No product selected" + + product_path = products_dir / product_selector.value + if not product_path.exists(): + return f"❌ Product directory not found: {product_path}" + + # Build the dbt command + cmd = ["dbt", build_mode.value if build_mode else "run"] + + # Add model selection if specified + if model_selector and model_selector.value.strip(): + cmd.extend(["--select", model_selector.value.strip()]) + + # Add profile directory (assumes profiles.yml in product dir) + cmd.extend(["--profiles-dir", str(product_path)]) + + # Add target environment + if profile_selector and profile_selector.value != "local": + cmd.extend(["--target", profile_selector.value]) + + build_status["running"] = True + build_status["last_run"] = datetime.now() + + try: + # Execute the command + result = subprocess.run( + cmd, + cwd=product_path, + capture_output=True, + text=True, + timeout=300, # 5 minute timeout + ) + + build_status["output"] = ( + f"STDOUT:\n{result.stdout}\n\nSTDERR:\n{result.stderr}" + ) + build_status["return_code"] = result.returncode + + if result.returncode == 0: + status_msg = f"✅ Build completed successfully!" + else: + status_msg = f"❌ Build failed with code {result.returncode}" + + except subprocess.TimeoutExpired: + build_status["output"] = "❌ Build timed out after 5 minutes" + build_status["return_code"] = -1 + status_msg = "⏰ Build timed out" + + except Exception as e: + build_status["output"] = f"❌ Error executing build: {str(e)}" + build_status["return_code"] = -1 + status_msg = f"đŸ’Ĩ Build error: {str(e)}" + + finally: + build_status["running"] = False + + return status_msg + + # Build button and status display + if product_selector and product_selector.value and build_mode: + # Show current configuration + config_info = f""" + **Product**: {product_selector.value} + **Mode**: {build_mode.value} + **Target**: {profile_selector.value if profile_selector else "local"} + """ + + if model_selector and model_selector.value.strip(): + config_info += f" \n**Selection**: `{model_selector.value.strip()}`" + + # Run button + run_button = mo.ui.run_button(label="🚀 Execute Build") + + if run_button.value: + if not build_status["running"]: + result_msg = run_dbt_command() + mo.md(f""" + ### Build Configuration + {config_info} + + {run_button} + + ### Build Result + {result_msg} + + **Started**: {build_status["last_run"].strftime("%Y-%m-%d %H:%M:%S") if build_status["last_run"] else "Never"} + **Status**: {"🔄 Running" if build_status["running"] else "✅ Complete" if build_status["return_code"] == 0 else "❌ Failed"} + """) + else: + mo.md(f""" + ### Build Configuration + {config_info} + + {run_button} + + ### Build Status + 🔄 **Build is currently running...** + + Started: {build_status["last_run"].strftime("%Y-%m-%d %H:%M:%S")} + """) + else: + mo.md(f""" + ### Build Configuration + {config_info} + + {run_button} + """) + else: + run_button = None + mo.md("Complete the configuration above to execute a build.") + return config_info, result_msg, run_button, run_dbt_command + + +@app.cell +def __(build_status, mo): + # Build output display + if build_status["output"]: + mo.md(f""" + ### Build Output + + **Return Code**: {build_status["return_code"]} + + ``` + {build_status["output"][:2000]}{"..." if len(build_status["output"]) > 2000 else ""} + ``` + """) + else: + mo.md("") + return + + +@app.cell +def __(mo, products_dir): + # Quick product info + def get_product_info(product_name): + """Get basic info about a product.""" + product_path = products_dir / product_name + + info = { + "path": str(product_path), + "has_dbt_project": (product_path / "dbt_project.yml").exists(), + "has_profiles": (product_path / "profiles.yml").exists(), + "has_readme": (product_path / "README.md").exists(), + } + + # Try to read dbt_project.yml for more info + try: + import yaml + + with open(product_path / "dbt_project.yml") as f: + dbt_config = yaml.safe_load(f) + info["dbt_version"] = dbt_config.get("require-dbt-version", "unknown") + info["models"] = list(dbt_config.get("models", {}).keys()) + except: + info["dbt_version"] = "unknown" + info["models"] = [] + + return info + + if product_selector and product_selector.value: + product_info = get_product_info(product_selector.value) + + mo.md(f""" + --- + + ### Product Information: {product_selector.value} + + - **Path**: `{product_info["path"]}` + - **dbt Project**: {"✅" if product_info["has_dbt_project"] else "❌"} + - **Local Profiles**: {"✅" if product_info["has_profiles"] else "❌"} + - **README**: {"✅" if product_info["has_readme"] else "❌"} + - **dbt Version**: {product_info["dbt_version"]} + """) + else: + mo.md("") + return get_product_info, product_info + + +@app.cell +def __(mo): + mo.md( + r""" + --- + + ## 💡 Usage Guide + + ### Build Modes + - **Full Build**: Runs `dbt run` - executes models and tests + - **Test Only**: Runs `dbt test` - executes data quality tests + - **Compile Only**: Runs `dbt compile` - validates SQL without execution + - **Parse Only**: Runs `dbt parse` - validates project structure + + ### Model Selection + Use dbt selection syntax: + - `+model_name` - model and all upstream dependencies + - `model_name+` - model and all downstream dependents + - `tag:staging` - all models with "staging" tag + - `--exclude test_type:unit` - exclude unit tests + + ### Environment Targets + - **Local**: Uses local development settings + - **Development**: Remote development environment + - **Staging**: Pre-production testing environment + - **Production**: Live production environment + + ### Prerequisites + - dbt CLI must be installed and configured + - Product must have `dbt_project.yml` + - Database connections must be properly configured + - Appropriate permissions for selected target environment + + ### Troubleshooting + - Check build output for detailed error messages + - Verify database connectivity and permissions + - Ensure all required environment variables are set + - Check that profiles.yml exists and is properly configured + """ + ) + return + + +if __name__ == "__main__": + app.run() diff --git a/experimental/apps/marimo-server/notebooks/connector_management.py b/experimental/apps/marimo-server/notebooks/connector_management.py new file mode 100644 index 0000000000..af42dc80c7 --- /dev/null +++ b/experimental/apps/marimo-server/notebooks/connector_management.py @@ -0,0 +1,277 @@ +import marimo as mo +import pandas as pd +from typing import Any, Dict, List +import random +from datetime import datetime, timedelta + +app = mo.App(width="medium") + + +@app.cell +def __(): + """Import and setup the connector registry""" + try: + from dcpy.lifecycle.connector_registry import connectors + from dcpy.connectors.registry import VersionedConnector, VersionSearch + + mo.md(""" + # 🔌 Connector Management Dashboard + + This dashboard provides an interface to explore registered connectors and manage their versions. + """) + except ImportError as e: + mo.md(f""" + # âš ī¸ Import Error + + Could not import dcpy connectors: {e} + + Make sure dcpy is installed as an editable package: + ```bash + pip install -e . + ``` + """) + return + + +@app.cell +def __(connectors): + """Get list of registered connectors""" + registered_connectors = connectors.list_registered() + + mo.md(f""" + ## Available Connectors ({len(registered_connectors)}) + + The following connectors are currently registered in the system: + """) + return (registered_connectors,) + + +@app.cell +def __(mo, registered_connectors): + """Create dropdown for connector selection""" + if registered_connectors: + connector_dropdown = mo.ui.dropdown( + options=registered_connectors, + value=registered_connectors[0] if registered_connectors else None, + label="Select a connector:", + ) + else: + connector_dropdown = mo.ui.dropdown( + options=[], value=None, label="No connectors available" + ) + + connector_dropdown + return (connector_dropdown,) + + +@app.cell +def __(mo, connector_dropdown, connectors): + """Display connector info and setup version interface""" + if not connector_dropdown.value: + mo.md("👆 Select a connector to view its details") + else: + selected_connector_name = connector_dropdown.value + + try: + # Get the connector instance + connector_instance = connectors[selected_connector_name] + connector_type = type(connector_instance).__name__ + + # Check if it supports versioning + supports_versioning = hasattr(connector_instance, "list_versions") + + mo.md(f""" + ## 📋 Connector Details: `{selected_connector_name}` + + - **Type**: {connector_type} + - **Supports Versioning**: {"✅ Yes" if supports_versioning else "❌ No"} + - **Module**: {type(connector_instance).__module__} + """) + + except Exception as e: + mo.md(f""" + ## ❌ Error loading connector: `{selected_connector_name}` + + ``` + {str(e)} + ``` + """) + return + + +@app.cell +def __(mo, connector_dropdown, connectors): + """Version management interface""" + if not connector_dropdown.value: + mo.stop() + + selected_connector_name = connector_dropdown.value + + try: + connector_instance = connectors[selected_connector_name] + supports_versioning = hasattr(connector_instance, "list_versions") + + if supports_versioning: + # Create input for dataset key + dataset_key_input = mo.ui.text( + placeholder="Enter dataset key (e.g., 'abc123' for Socrata)", + label="Dataset Key:", + full_width=True, + ) + + # Refresh button + refresh_button = mo.ui.button(label="🔄 Refresh Versions", kind="success") + + mo.md(f""" + ## 📊 Version Management + + This connector supports version operations. Enter a dataset key to explore versions: + """) + + mo.vstack([dataset_key_input, refresh_button]) + else: + mo.md(f""" + ## â„šī¸ Version Management Not Available + + The `{selected_connector_name}` connector does not support version operations. + """) + except Exception as e: + mo.md(f"Error setting up version interface: {str(e)}") + return + + +@app.cell +def __(mo, connector_dropdown, connectors, dataset_key_input, refresh_button): + """Display versions table""" + if not connector_dropdown.value: + mo.stop() + + if ( + not hasattr(locals().get("dataset_key_input"), "value") + or not dataset_key_input.value + ): + mo.md("👆 Enter a dataset key above to see versions") + mo.stop() + + selected_connector_name = connector_dropdown.value + dataset_key = dataset_key_input.value.strip() + + if not dataset_key: + mo.stop() + + try: + connector_instance = connectors[selected_connector_name] + + if hasattr(connector_instance, "list_versions"): + # Mock version data for demonstration + # In reality, we'd call: versions = connector_instance.list_versions(dataset_key) + mock_versions = [f"v1.{i}.{random.randint(0, 9)}" for i in range(10, 0, -1)] + + # Create mock metadata for each version + version_data = [] + for i, version in enumerate(mock_versions): + created_date = datetime.now() - timedelta( + days=i * 7 + random.randint(0, 6) + ) + size_mb = round(random.uniform(0.1, 150.0), 2) + status = random.choice( + ["Active", "Active", "Active", "Deprecated", "Draft"] + ) + + version_data.append( + { + "Version": version, + "Created": created_date.strftime("%Y-%m-%d %H:%M"), + "Size (MB)": size_mb, + "Status": status, + "Records": random.randint(100, 50000), + } + ) + + df = pd.DataFrame(version_data) + + # Style the dataframe + styled_df = df.style.apply( + lambda x: [ + "background-color: #e8f5e8" + if v == "Active" + else "background-color: #fff2e8" + if v == "Draft" + else "background-color: #f5e8e8" + if v == "Deprecated" + else "" + for v in x + ], + subset=["Status"], + ) + + mo.md(f""" + ## 📈 Versions for Dataset: `{dataset_key}` + + Found **{len(mock_versions)}** versions for connector `{selected_connector_name}`: + + > âš ī¸ **Note**: This is mock data for demonstration. In production, this would call: + > ```python + > connector_instance.list_versions("{dataset_key}") + > ``` + """) + + mo.ui.table(df, selection=None) + + else: + mo.md("This connector does not support version listing.") + + except Exception as e: + mo.md(f""" + ## ❌ Error fetching versions + + ``` + {str(e)} + ``` + + **Possible reasons:** + - Dataset key not found + - Network connectivity issues + - Connector configuration problems + - Authentication required + """) + return + + +@app.cell +def __(mo, connector_dropdown, connectors): + """Connector capabilities summary""" + if not connector_dropdown.value: + mo.stop() + + selected_connector_name = connector_dropdown.value + + try: + connector_instance = connectors[selected_connector_name] + + # Check capabilities + capabilities = { + "Pull Data": hasattr(connector_instance, "pull"), + "Push Data": hasattr(connector_instance, "push"), + "List Versions": hasattr(connector_instance, "list_versions"), + "Get Latest Version": hasattr(connector_instance, "get_latest_version"), + "Version Exists Check": hasattr(connector_instance, "version_exists"), + } + + capability_rows = [] + for capability, supported in capabilities.items(): + status = "✅ Supported" if supported else "❌ Not Available" + capability_rows.append({"Capability": capability, "Status": status}) + + capabilities_df = pd.DataFrame(capability_rows) + + mo.md("## đŸ› ī¸ Connector Capabilities") + mo.ui.table(capabilities_df, selection=None) + + except Exception as e: + mo.md(f"Error checking capabilities: {str(e)}") + return + + +if __name__ == "__main__": + app.run() diff --git a/experimental/apps/marimo-server/notebooks/data_distribution.py b/experimental/apps/marimo-server/notebooks/data_distribution.py new file mode 100644 index 0000000000..961556c115 --- /dev/null +++ b/experimental/apps/marimo-server/notebooks/data_distribution.py @@ -0,0 +1,459 @@ +import marimo + +__generated_with = "0.9.0" +app = marimo.App(width="medium") + + +@app.cell +def __(): + import marimo as mo + + mo.md( + r""" + # 📊 Data Distribution Dashboard + + This notebook provides an interface to view and manage data distribution tasks + for NYC DCP data products, including publishing to open data portals and + internal distribution channels. + """ + ) + return (mo,) + + +@app.cell +def __(mo): + import json + import subprocess + import sys + from datetime import datetime, timedelta + from pathlib import Path + import os + + # Repository paths + repo_root = Path(__file__).parents[3] + products_dir = repo_root / "products" + + mo.md(f"📁 **Repository Root**: `{repo_root}`") + return Path, datetime, json, os, products_dir, repo_root, subprocess, sys, timedelta + + +@app.cell +def __(mo, products_dir): + # Discover products with distribution configs + def find_distributable_products(): + """Find products that have distribution configurations.""" + products = [] + + if not products_dir.exists(): + return products + + for product_dir in products_dir.iterdir(): + if not product_dir.is_dir(): + continue + + product_info = { + "name": product_dir.name, + "path": product_dir, + "has_dbt": (product_dir / "dbt_project.yml").exists(), + "has_recipe": (product_dir / "recipe.yml").exists(), + "has_build_config": (product_dir / "build.yml").exists(), + "distribution_channels": [], + } + + # Check for common distribution indicators + if (product_dir / "publish").exists(): + product_info["distribution_channels"].append("Custom Publisher") + if (product_dir / "socrata").exists(): + product_info["distribution_channels"].append("Socrata/Open Data") + if (product_dir / "s3").exists(): + product_info["distribution_channels"].append("S3") + if (product_dir / "ftp").exists(): + product_info["distribution_channels"].append("FTP") + + products.append(product_info) + + return sorted(products, key=lambda x: x["name"]) + + distributable_products = find_distributable_products() + + if distributable_products: + mo.md(f""" + ## Available Products for Distribution ({len(distributable_products)}) + """) + else: + mo.md("❌ No products found with distribution configurations") + return distributable_products, find_distributable_products + + +@app.cell +def __(distributable_products, mo): + # Product selector with distribution info + if distributable_products: + # Create options with distribution channel info + product_options = [] + for product in distributable_products: + channels = ( + ", ".join(product["distribution_channels"]) + if product["distribution_channels"] + else "None configured" + ) + label = f"{product['name']} ({channels})" + product_options.append((label, product["name"])) + + product_selector = mo.ui.dropdown( + options=product_options, label="Select Product:" + ) + + mo.md(f"{product_selector}") + else: + product_selector = None + return product_options, product_selector + + +@app.cell +def __(distributable_products, mo, product_selector): + # Show selected product details + if product_selector and product_selector.value: + selected_product = next( + (p for p in distributable_products if p["name"] == product_selector.value), + None, + ) + + if selected_product: + channels_list = ( + "\n".join( + [f"- {ch}" for ch in selected_product["distribution_channels"]] + ) + or "- No distribution channels configured" + ) + + mo.md(f""" + ### Selected Product: {selected_product["name"]} + + **Path**: `{selected_product["path"]}` + **dbt Project**: {"✅" if selected_product["has_dbt"] else "❌"} + **Recipe Config**: {"✅" if selected_product["has_recipe"] else "❌"} + **Build Config**: {"✅" if selected_product["has_build_config"] else "❌"} + + **Distribution Channels**: + {channels_list} + """) + else: + selected_product = None + mo.md("❌ Product not found") + else: + selected_product = None + mo.md("") + return channels_list, selected_product + + +@app.cell +def __(mo, selected_product): + # Distribution action selector + if selected_product: + action_selector = mo.ui.dropdown( + options=[ + ("Check Status", "status"), + ("Publish Latest", "publish"), + ("Validate Data", "validate"), + ("View Distribution Log", "log"), + ("Force Republish", "force_publish"), + ], + label="Distribution Action:", + ) + + # Environment selector + env_selector = mo.ui.dropdown( + options=[ + ("Development", "dev"), + ("Staging", "staging"), + ("Production", "prod"), + ], + value="dev", + label="Target Environment:", + ) + + mo.md(f""" + ### Distribution Actions + + {action_selector} + {env_selector} + """) + else: + action_selector = env_selector = None + return action_selector, env_selector + + +@app.cell +def __(mo): + # Mock distribution status data + def get_mock_distribution_status(product_name, environment): + """Generate mock distribution status for demonstration.""" + from datetime import datetime, timedelta + import random + + # Mock data for different products and environments + base_time = datetime.now() - timedelta(hours=random.randint(1, 48)) + + status = { + "product": product_name, + "environment": environment, + "last_published": base_time.strftime("%Y-%m-%d %H:%M:%S"), + "version": f"2024v{random.randint(1, 5)}.{random.randint(0, 10)}", + "status": random.choice( + ["success", "success", "success", "warning", "error"] + ), + "channels": [], + } + + # Mock channel statuses + channels = ["Socrata Open Data", "S3 Bucket", "Internal FTP", "API Endpoint"] + for channel in random.sample(channels, random.randint(1, 3)): + channel_status = { + "name": channel, + "status": random.choice( + ["published", "published", "pending", "failed"] + ), + "last_update": ( + base_time + timedelta(minutes=random.randint(0, 60)) + ).strftime("%Y-%m-%d %H:%M:%S"), + "records_count": random.randint(1000, 100000), + "file_size": f"{random.randint(1, 500)} MB", + } + status["channels"].append(channel_status) + + return status + + mo.md("## Distribution Status") + return (get_mock_distribution_status,) + + +@app.cell +def __( + action_selector, env_selector, get_mock_distribution_status, mo, selected_product +): + # Execute distribution actions + if selected_product and action_selector and action_selector.value: + if action_selector.value == "status": + # Show distribution status + status = get_mock_distribution_status( + selected_product["name"], env_selector.value if env_selector else "dev" + ) + + # Status emoji mapping + status_emoji = { + "success": "✅", + "warning": "âš ī¸", + "error": "❌", + "published": "✅", + "pending": "🔄", + "failed": "❌", + } + + channels_info = "" + for channel in status["channels"]: + emoji = status_emoji.get(channel["status"], "❓") + channels_info += f""" + **{channel["name"]}** {emoji} + - Status: {channel["status"]} + - Last Update: {channel["last_update"]} + - Records: {channel["records_count"]:,} + - Size: {channel["file_size"]} + + """ + + overall_emoji = status_emoji.get(status["status"], "❓") + + mo.md(f""" + ### Distribution Status {overall_emoji} + + **Product**: {status["product"]} + **Environment**: {status["environment"]} + **Version**: {status["version"]} + **Last Published**: {status["last_published"]} + **Overall Status**: {status["status"]} {overall_emoji} + + #### Distribution Channels + + {channels_info} + """) + + elif action_selector.value == "publish": + # Mock publish action + execute_button = mo.ui.run_button(label="🚀 Execute Publish") + + if execute_button.value: + mo.md(f""" + ### Publishing {selected_product["name"]} + + {execute_button} + + **Environment**: {env_selector.value if env_selector else "dev"} + + 🔄 **Publishing in progress...** + + 1. ✅ Validating data quality + 2. ✅ Preparing distribution packages + 3. 🔄 Publishing to Socrata Open Data + 4. âŗ Uploading to S3 bucket + 5. âŗ Updating API endpoints + + *This is a mock execution for demonstration purposes.* + """) + else: + mo.md(f""" + ### Publish {selected_product["name"]} + + **Target Environment**: {env_selector.value if env_selector else "dev"} + + This will publish the latest version of the data product to all configured distribution channels. + + {execute_button} + """) + + elif action_selector.value == "validate": + # Mock validation + mo.md(f""" + ### Data Validation Results + + **Product**: {selected_product["name"]} + **Environment**: {env_selector.value if env_selector else "dev"} + + #### Validation Checks + + ✅ **Schema Validation** - All required columns present + ✅ **Data Quality** - No null values in required fields + ✅ **Row Count** - Expected number of records (47,392) + âš ī¸ **Freshness Check** - Data is 2 days old (acceptable) + ❌ **Geometry Validation** - 12 invalid geometries found + + #### Recommendations + - Review and fix invalid geometries before publishing + - Consider data refresh if freshness is critical + + *Mock validation results for demonstration.* + """) + + elif action_selector.value == "log": + # Mock distribution log + mo.md(f""" + ### Distribution Log + + **Product**: {selected_product["name"]} + **Environment**: {env_selector.value if env_selector else "dev"} + + ``` + 2024-11-13 14:30:15 [INFO] Starting distribution process + 2024-11-13 14:30:16 [INFO] Validating data quality... + 2024-11-13 14:30:18 [INFO] Quality checks passed + 2024-11-13 14:30:19 [INFO] Publishing to Socrata (dataset: abc-123) + 2024-11-13 14:31:45 [INFO] Socrata publish completed (47,392 records) + 2024-11-13 14:31:46 [INFO] Uploading to S3: s3://bucket/product/latest/ + 2024-11-13 14:32:12 [INFO] S3 upload completed + 2024-11-13 14:32:13 [INFO] Updating API metadata + 2024-11-13 14:32:15 [INFO] Distribution process completed successfully + 2024-11-13 14:32:16 [INFO] Notification sent to subscribers + ``` + + *Mock log output for demonstration.* + """) + + else: + mo.md("Action not implemented yet.") + + else: + mo.md("Select a product and action to proceed.") + return channels_info, execute_button, overall_emoji, status, status_emoji + + +@app.cell +def __(mo): + # Distribution schedule and automation info + mo.md( + r""" + --- + + ## 📅 Distribution Schedule + + ### Automated Distributions + + | Product | Environment | Schedule | Last Run | Status | + |---------|-------------|----------|----------|---------| + | PLUTO | Production | Daily 6AM | 2024-11-13 06:15 | ✅ Success | + | LION | Production | Weekly Mon | 2024-11-11 06:30 | ✅ Success | + | Facilities | Staging | On-demand | 2024-11-12 14:20 | âš ī¸ Warning | + | ZAP | Production | Monthly | 2024-11-01 07:00 | ✅ Success | + + ### Distribution Channels + + - **Socrata Open Data**: NYC Open Data portal for public datasets + - **S3 Buckets**: Internal and external file distribution + - **FTP Servers**: Legacy system integration + - **API Endpoints**: Real-time data access + - **Internal Databases**: Cross-system data sharing + + *Mock data for demonstration purposes.* + """ + ) + return + + +@app.cell +def __(mo): + mo.md( + r""" + --- + + ## 💡 Usage Guide + + ### Distribution Actions + + - **Check Status**: View current publication status across all channels + - **Publish Latest**: Distribute the most recent version of data + - **Validate Data**: Run quality checks before distribution + - **View Distribution Log**: See detailed execution logs + - **Force Republish**: Override and republish existing data + + ### Distribution Channels + + #### Socrata/Open Data Portal + - Public-facing datasets for transparency + - Automated metadata and schema updates + - Version control and change tracking + + #### S3 Distribution + - Bulk file downloads (CSV, GeoJSON, Shapefile) + - Automated backup and archival + - Cross-region replication + + #### API Endpoints + - Real-time data access + - Authentication and rate limiting + - Standardized REST/GraphQL interfaces + + ### Environment Management + + - **Development**: Testing and validation + - **Staging**: Pre-production verification + - **Production**: Live public distribution + + ### Monitoring and Alerts + + - Automated quality validation + - Publication failure notifications + - Performance monitoring and metrics + - Data freshness alerts + + ### Prerequisites + + - Product must have completed successful build + - Distribution channels must be configured + - Appropriate permissions for target environment + - Valid data quality validation results + """ + ) + return + + +if __name__ == "__main__": + app.run() diff --git a/experimental/apps/marimo-server/notebooks/distribution.py b/experimental/apps/marimo-server/notebooks/distribution.py new file mode 100644 index 0000000000..bf770dd101 --- /dev/null +++ b/experimental/apps/marimo-server/notebooks/distribution.py @@ -0,0 +1,22 @@ +import marimo + +__generated_with = "0.17.1" +app = marimo.App(width="medium") + + +@app.cell +def _(): + from dcpy.lifecycle.scripts import version_compare + + versions = version_compare.run() + return (versions,) + + +@app.cell +def _(versions): + versions + return + + +if __name__ == "__main__": + app.run() diff --git a/experimental/apps/marimo-server/notebooks/version_checker.py b/experimental/apps/marimo-server/notebooks/version_checker.py new file mode 100644 index 0000000000..a0643ade7a --- /dev/null +++ b/experimental/apps/marimo-server/notebooks/version_checker.py @@ -0,0 +1,295 @@ +import marimo + +__generated_with = "0.9.0" +app = marimo.App(width="medium") + + +@app.cell +def __(): + import marimo as mo + + mo.md( + r""" + # 🔍 Data Product Version Checker + + This notebook provides an interface to check available versions of data products + across the NYC DCP data engineering pipeline. + """ + ) + return (mo,) + + +@app.cell +def __(mo): + # Import version utilities from dcpy + import sys + from pathlib import Path + + # Add dcpy to path if needed + repo_root = Path(__file__).parents[ + 3 + ] # Go up from marimo-server/notebooks to data-engineering root + dcpy_path = repo_root / "dcpy" + if str(dcpy_path) not in sys.path: + sys.path.insert(0, str(dcpy_path)) + + try: + from dcpy.utils.versions import ( + Version, + Date, + MajorMinor, + CapitalBudget, + DateVersionFormat, + ) + from dcpy.utils.versions import CapitalBudgetRelease + + mo.md("✅ Successfully imported version utilities from dcpy") + except ImportError as e: + mo.md(f"❌ Failed to import from dcpy: {e}") + return ( + CapitalBudget, + CapitalBudgetRelease, + Date, + DateVersionFormat, + MajorMinor, + Path, + Version, + dcpy_path, + repo_root, + sys, + ) + + +@app.cell +def __(mo): + # Product selector + product_selector = mo.ui.dropdown( + options=[ + "lion", + "facilities", + "zap", + "ceqr", + "pluto", + "developments", + "sca_capacity_projects", + ], + value="lion", + label="Select Data Product:", + ) + + mo.md(f"## Select Product\n{product_selector}") + return (product_selector,) + + +@app.cell +def __(mo, product_selector): + # Version format selector + format_selector = mo.ui.dropdown( + options=[ + ("Date", "date"), + ("Month", "month"), + ("Quarter", "quarter"), + ("Fiscal Year", "fiscal_year"), + ("Major/Minor", "major_minor"), + ("Capital Budget", "capital_budget"), + ], + value="date", + label="Version Format:", + ) + + if product_selector.value: + mo.md(f"### {product_selector.value.title()} Versions\n{format_selector}") + else: + mo.md("Please select a product first.") + return (format_selector,) + + +@app.cell +def __( + CapitalBudget, + CapitalBudgetRelease, + Date, + DateVersionFormat, + MajorMinor, + format_selector, + mo, + product_selector, +): + # Generate example versions based on selected format + from datetime import date, timedelta + import random + + def generate_example_versions(format_type, product_name, count=5): + """Generate example versions for demonstration.""" + versions = [] + + if format_type == "date": + base_date = date.today() + for i in range(count): + version_date = base_date - timedelta(days=i * 30) # Monthly releases + versions.append(Date(version_date, DateVersionFormat.date)) + + elif format_type == "month": + base_date = date.today().replace(day=1) + for i in range(count): + version_date = base_date - timedelta(days=i * 32) # Monthly + versions.append(Date(version_date, DateVersionFormat.month)) + + elif format_type == "quarter": + base_date = date.today() + for i in range(count): + # Go back by quarters + quarter_date = date(base_date.year, max(1, base_date.month - i * 3), 1) + versions.append(Date(quarter_date, DateVersionFormat.quarter)) + + elif format_type == "fiscal_year": + current_year = date.today().year + for i in range(count): + fy_date = date(current_year - i, 7, 1) # July 1st start of FY + versions.append(Date(fy_date, DateVersionFormat.fiscal_year)) + + elif format_type == "major_minor": + current_year = date.today().year % 100 # 2-digit year + for i in range(count): + major = random.randint(1, 5) + minor = random.randint(0, 3) if random.random() > 0.3 else 0 + versions.append(MajorMinor(current_year, major, minor)) + + elif format_type == "capital_budget": + current_year = date.today().year % 100 # 2-digit year + for i in range(count): + year = current_year - (i // 3) # New year every 3 versions + release_idx = (2 - (i % 3)) + 1 # Cycle through releases backwards + release = CapitalBudgetRelease(min(3, max(1, release_idx))) + versions.append(CapitalBudget(year, release)) + + return sorted(versions, reverse=True) # Most recent first + + if product_selector.value and format_selector.value: + example_versions = generate_example_versions( + format_selector.value, product_selector.value + ) + + # Display versions in a table + version_data = [ + {"Version": v.label, "Type": type(v).__name__, "Sort Order": i + 1} + for i, v in enumerate(example_versions) + ] + + mo.md(f""" + ### Available Versions for {product_selector.value.title()} + + **Format**: {format_selector.value.replace("_", " ").title()} + + {mo.ui.table(version_data, selection=None)} + + *Note: These are example versions for demonstration. In production, + these would be fetched from your actual data product registry.* + """) + else: + mo.md("Please select both a product and version format.") + return ( + date, + example_versions, + generate_example_versions, + random, + timedelta, + version_data, + ) + + +@app.cell +def __(example_versions, mo, product_selector): + # Version comparison tool + if len(example_versions) >= 2: + version1_selector = mo.ui.dropdown( + options=[(v.label, i) for i, v in enumerate(example_versions)], + label="First Version:", + ) + + version2_selector = mo.ui.dropdown( + options=[(v.label, i) for i, v in enumerate(example_versions)], + value=1 if len(example_versions) > 1 else 0, + label="Second Version:", + ) + + mo.md(f""" + ### Version Comparison Tool + + Compare two versions of **{product_selector.value}**: + + {version1_selector} + {version2_selector} + """) + else: + version1_selector = None + version2_selector = None + mo.md("") + return version1_selector, version2_selector + + +@app.cell +def __(example_versions, mo, version1_selector, version2_selector): + # Perform version comparison + if ( + version1_selector + and version2_selector + and version1_selector.value is not None + and version2_selector.value is not None + ): + v1 = example_versions[version1_selector.value] + v2 = example_versions[version2_selector.value] + + comparison_result = "equal" if v1 == v2 else ("newer" if v1 > v2 else "older") + + emoji_map = {"newer": "📈", "older": "📉", "equal": "🟰"} + + mo.md(f""" + ### Comparison Result + + {emoji_map[comparison_result]} **{v1.label}** is **{comparison_result}** than **{v2.label}** + + - Version 1: `{v1.label}` ({type(v1).__name__}) + - Version 2: `{v2.label}` ({type(v2).__name__}) + """) + else: + mo.md("") + return comparison_result, emoji_map, v1, v2 + + +@app.cell +def __(mo): + # Instructions and tips + mo.md( + r""" + --- + + ## 💡 Usage Tips + + - **Version Selection**: Choose different products and formats to see how versions are structured + - **Comparison Tool**: Compare any two versions to understand chronological ordering + - **Integration**: This interface can be extended to connect to actual data product registries + + ### Supported Version Types + + - **Date**: Full date versions (YYYY-MM-DD) + - **Month**: Monthly versions (YYYY-MM) + - **Quarter**: Quarterly versions (YYQ#) + - **Fiscal Year**: Annual fiscal year versions (FYYYY) + - **Major/Minor**: Semantic versioning (YYv#.#.#) + - **Capital Budget**: NYC capital budget releases (YYprelim/exec/adopt) + + ### Next Steps + + To make this production-ready: + 1. Connect to actual data product metadata + 2. Add version creation/tagging functionality + 3. Integrate with build and deployment pipelines + 4. Add version deprecation and lifecycle management + """ + ) + return + + +if __name__ == "__main__": + app.run() diff --git a/experimental/apps/marimo-server/notebooks/versions.py b/experimental/apps/marimo-server/notebooks/versions.py new file mode 100644 index 0000000000..bf770dd101 --- /dev/null +++ b/experimental/apps/marimo-server/notebooks/versions.py @@ -0,0 +1,22 @@ +import marimo + +__generated_with = "0.17.1" +app = marimo.App(width="medium") + + +@app.cell +def _(): + from dcpy.lifecycle.scripts import version_compare + + versions = version_compare.run() + return (versions,) + + +@app.cell +def _(versions): + versions + return + + +if __name__ == "__main__": + app.run() diff --git a/experimental/apps/marimo-server/requirements.txt b/experimental/apps/marimo-server/requirements.txt new file mode 100644 index 0000000000..3d20a787fb --- /dev/null +++ b/experimental/apps/marimo-server/requirements.txt @@ -0,0 +1,27 @@ +# Data Engineering Marimo Server +# Dependencies for operational notebooks + +# Core marimo and web serving +marimo>=0.9.0 +fastapi>=0.104.0 +uvicorn[standard]>=0.24.0 + +# Data processing and utilities +pandas>=2.0.0 +polars>=0.20.0 +pydantic>=2.0.0 + +# Database connectivity +psycopg2-binary>=2.9.0 +sqlalchemy>=2.0.0 + +# File handling and parsing +pyyaml>=6.0 +toml>=0.10.0 + +# Date/time utilities +python-dateutil>=2.8.0 + +# Development and testing +pytest>=7.0.0 +black>=23.0.0 \ No newline at end of file diff --git a/experimental/apps/marimo-server/server.py b/experimental/apps/marimo-server/server.py new file mode 100644 index 0000000000..be6258a810 --- /dev/null +++ b/experimental/apps/marimo-server/server.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python3 +""" +Marimo server for data engineering operational notebooks. + +This server hosts multiple read-only marimo notebooks for operational tasks: +- Build runner: Execute and monitor data pipeline builds +- Data distribution: View and manage data distribution tasks +- Version checker: Check available versions of data products +""" + +import marimo +from pathlib import Path +from fastapi import FastAPI +from fastapi.responses import HTMLResponse + + +def create_server(): + """Create the marimo server with all operational notebooks.""" + + # Create the base marimo server + server = marimo.create_asgi_app( + include_code=True, # Show code in read-only mode + quiet=False, + ) + + # Path to notebooks directory + notebooks_dir = Path(__file__).parent / "notebooks" + + # Manually add specific operational notebooks + operational_notebooks = { + "build": "build_runner.py", + "distribute": "data_distribution.py", + "distribution": "distribution.py", + "versions": "version_checker.py", + "connectors": "connector_management.py", + } + + # Add each operational notebook + for route_name, filename in operational_notebooks.items(): + notebook_path = notebooks_dir / filename + if notebook_path.exists(): + server = server.with_app(path=f"/{route_name}", root=notebook_path) + print(f"Added notebook: /{route_name} -> {filename}") + else: + print(f"Warning: Notebook not found: {filename}") + + # Also dynamically add any other notebooks in the directory + for notebook_file in notebooks_dir.glob("*.py"): + route_name = notebook_file.stem + # Skip if already added above + if route_name not in [ + nb.replace("_", "-") for nb in operational_notebooks.keys() + ]: + server = server.with_app(path=f"/{route_name}", root=notebook_file) + print(f"Added notebook: /{route_name} -> {notebook_file.name}") + + return server + + +def create_index_page(): + """Create a simple index page listing available notebooks.""" + notebooks_dir = Path(__file__).parent / "notebooks" + + html = """ + + + + Data Engineering Operational Notebooks + + + +

đŸ› ī¸ Data Engineering Operational Notebooks

+

Select a notebook to run operational tasks:

+ + + +
+

Available Notebooks

+ + + + + + """ + + return html + + +# Create FastAPI wrapper to add index page +app = FastAPI(title="Data Engineering Marimo Server") + + +@app.get("/", response_class=HTMLResponse) +async def index(): + """Serve index page with links to notebooks.""" + return create_index_page() + + +# Create and mount the marimo server +marimo_server = create_server() +app.mount("/", marimo_server.build()) + + +if __name__ == "__main__": + import uvicorn + + print("🚀 Starting Data Engineering Marimo Server...") + print("📖 Available at: http://localhost:8080") + print("📝 Notebooks directory: ./notebooks/") + + uvicorn.run("server:app", host="0.0.0.0", port=8888, reload=True, log_level="info") diff --git a/experimental/apps/marimo-server/start.sh b/experimental/apps/marimo-server/start.sh new file mode 100755 index 0000000000..39f06c7a9f --- /dev/null +++ b/experimental/apps/marimo-server/start.sh @@ -0,0 +1,66 @@ +#!/bin/bash + +# Start script for Marimo Server +# This script sets up the environment and starts the server + +set -e + +echo "🚀 Starting NYC DCP Data Engineering Marimo Server..." + +# Check if we're in the correct directory +if [ ! -f "server.py" ]; then + echo "❌ Error: server.py not found. Please run from the marimo-server directory." + exit 1 +fi + +# Check Python version +python_version=$(python3 --version 2>&1 | awk '{print $2}') +echo "🐍 Python version: $python_version" + +# Install dependencies if requirements.txt is newer than last install +if [ requirements.txt -nt .last_install ] || [ ! -f .last_install ]; then + echo "đŸ“Ļ Installing/updating dependencies..." + pip install -r requirements.txt + touch .last_install +else + echo "✅ Dependencies up to date" +fi + +# Set default environment variables +export MARIMO_HOST=${MARIMO_HOST:-"0.0.0.0"} +export MARIMO_PORT=${MARIMO_PORT:-8080} + +echo "🌐 Server will start on: http://${MARIMO_HOST}:${MARIMO_PORT}" +echo "📂 Repository root: $(realpath ../../..)" +echo "📝 Notebooks directory: $(realpath ./notebooks)" + +# Check if notebooks directory exists and has content +if [ ! -d "./notebooks" ]; then + echo "❌ Error: notebooks directory not found" + exit 1 +fi + +notebook_count=$(find ./notebooks -name "*.py" | wc -l) +echo "📊 Found $notebook_count notebook(s)" + +if [ $notebook_count -eq 0 ]; then + echo "âš ī¸ Warning: No notebooks found in ./notebooks/" +fi + +echo "" +echo "📖 Available notebooks:" +for notebook in ./notebooks/*.py; do + if [ -f "$notebook" ]; then + basename=$(basename "$notebook" .py) + echo " - /$basename -> $notebook" + fi +done + +echo "" +echo "đŸŽ¯ Starting server..." +echo " Press Ctrl+C to stop" +echo " Visit http://localhost:${MARIMO_PORT} to access notebooks" +echo "" + +# Start the server +python server.py \ No newline at end of file