fix: graceful shutdown and lazy-load heavy SDK dependencies#1064
fix: graceful shutdown and lazy-load heavy SDK dependencies#1064fyzanshaik-atlan wants to merge 4 commits intomainfrom
Conversation
✅ Snyk checks have passed. No issues have been found so far.
💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse. |
📜 Docstring Coverage ReportRESULT: PASSED (minimum: 30.0%, actual: 77.1%) Detailed Coverage Report |
📦 Trivy Vulnerability Scan Results
Report SummaryCould not generate summary table (data length mismatch: 9 vs 8). Scan Result Detailsrequirements.txtuv.lock |
📦 Trivy Secret Scan Results
Report Summary
Scan Result Details✅ No secrets found during the scan for |
|
🛠 Docs available at: https://k.atlan.dev/application-sdk/fix/local-sigint-graceful-shutdown |
☂️ Python Coverage
Overall Coverage
New FilesNo new covered files... Modified FilesNo covered modified files...
|
|
🛠 Full Test Coverage Report: https://k.atlan.dev/coverage/application-sdk/pr/1064 |
📦 Example workflows test results
|
📦 Example workflows test results
|
📦 Example workflows test results
|
Greptile SummaryThis PR addresses two concerns: graceful shutdown coordination for local-mode runs and lazy-loading of heavy dependencies to reduce startup cost.
Confidence Score: 4/5
|
| Filename | Overview |
|---|---|
| application_sdk/application/init.py | Adds lazy proxy classes for metrics/traces, wraps server lifecycle in try/finally to stop worker on exit in LOCAL mode, and adds _stop_worker() method. Duplicate proxy classes with sql.py. |
| application_sdk/application/metadata_extraction/sql.py | Adds identical lazy proxy classes for metrics/traces. Duplicates the pattern from application/__init__.py instead of sharing a common implementation. |
| application_sdk/worker.py | Adds stop() method with cross-thread shutdown support and early-exit check for pre-startup shutdown requests. Race conditions between daemon thread and stop() are handled via _shutdown_initiated flag. |
| application_sdk/server/fastapi/init.py | Adds KeyboardInterrupt and CancelledError handling around server.serve() to ensure uvicorn shutdown. CancelledError is correctly re-raised to preserve cancellation semantics. |
| application_sdk/server/fastapi/middleware/metrics.py | Removes module-level eager get_metrics() call, moving it inside the dispatch method. Clean change that defers metrics initialization to first request. |
| application_sdk/observability/logger_adaptor.py | Lazy-loads OpenTelemetry imports inside conditional blocks, adds _DefaultLoggerProxy for backward-compatible default_logger symbol. TYPE_CHECKING import for OTelLogRecord type alias is correct. |
| application_sdk/observability/metrics_adaptor.py | Moves OpenTelemetry imports inside _setup_otel_metrics() method. Renames metrics to otel_metrics to avoid shadowing the local import. Clean change. |
| application_sdk/observability/traces_adaptor.py | Moves OpenTelemetry imports to function-level for lazy loading. Duplicates imports across _setup_otel_traces, _setup_console_only_traces, _str_to_span_kind, and _send_to_otel. Return type of _str_to_span_kind changed to Any. |
| application_sdk/observability/observability.py | Moves pandas and duckdb imports to function-level for lazy loading. Clean change that only imports these heavy dependencies when actually needed. |
| application_sdk/common/aws_utils.py | Adds _get_boto3() helper for lazy boto3 import. Uses from __future__ import annotations with TYPE_CHECKING for type hints. Clean approach. |
| application_sdk/common/file_converter.py | Replaces top-level import pandas as pd with a _PandasProxy class. Every pd.* access now goes through __getattr__, meaning repeated accesses have proxy overhead. The proxy never caches the pandas module reference. |
| application_sdk/transformers/atlas/init.py | Adds _import_daft() helper for lazy daft import. Correctly uses TYPE_CHECKING and from __future__ import annotations for type annotations. |
| application_sdk/transformers/query/init.py | Replaces top-level daft imports with lazy _import_daft(), to_struct(), and when() wrapper functions. Uses TYPE_CHECKING for annotations. Thorough approach. |
Sequence Diagram
sequenceDiagram
participant User as User (Ctrl+C)
participant App as BaseApplication.start()
participant Server as APIServer.start()
participant Uvicorn as uvicorn Server
participant Worker as Worker (daemon thread)
App->>Worker: start(daemon=True)
Worker-->>Worker: Thread: asyncio.run(start(daemon=False))
Worker->>Worker: _worker_loop = get_running_loop()
Worker->>Worker: worker.run()
App->>Server: _start_server()
Server->>Uvicorn: server.serve()
User->>Uvicorn: SIGINT / KeyboardInterrupt
Uvicorn-->>Server: KeyboardInterrupt raised
Server->>Uvicorn: server.should_exit = True
Server->>Uvicorn: await server.shutdown()
Note over App: finally block (LOCAL mode)
App->>Worker: _stop_worker() → worker.stop()
Worker->>Worker: _shutdown_initiated = True
alt Worker loop is different loop (daemon)
Worker->>Worker: run_coroutine_threadsafe(_shutdown_worker(), worker_loop)
Worker->>Worker: workflow_worker.shutdown()
else Worker loop same or unavailable
Worker->>Worker: await _shutdown_worker()
end
Worker-->>App: Worker stopped
Last reviewed commit: 83b06b6
|
@greptile re-review |
Greptile SummaryThis PR fixes graceful shutdown coordination for local-mode runs and lazy-loads several heavyweight dependencies (
Confidence Score: 4/5
|
| Filename | Overview |
|---|---|
| application_sdk/application/init.py | Adds graceful worker shutdown in LOCAL mode via try/finally around server lifecycle, replaces eager metrics/traces singletons with lazy proxies. |
| application_sdk/common/file_converter.py | Introduces _PandasProxy for lazy pandas import. Each attribute access re-imports (Python caches in sys.modules, so minimal overhead). |
| application_sdk/observability/lazy_proxies.py | New shared module providing LazyMetricsProxy and LazyTracesProxy, centralizing the lazy proxy pattern previously duplicated inline. |
| application_sdk/observability/logger_adaptor.py | Moves OpenTelemetry imports to method scope, replaces eager default_logger initialization with _DefaultLoggerProxy for lazy creation. |
| application_sdk/observability/metrics_adaptor.py | Defers all OpenTelemetry imports to _setup_otel_metrics method. Renames local import to otel_metrics to avoid shadowing. |
| application_sdk/observability/traces_adaptor.py | Defers OpenTelemetry imports to method scope across _setup_otel_traces, _setup_console_only_traces, _str_to_span_kind, and _send_to_otel. |
| application_sdk/server/fastapi/init.py | Adds KeyboardInterrupt and CancelledError handling in start() to trigger explicit uvicorn shutdown, correctly re-raising CancelledError. |
| application_sdk/transformers/query/init.py | Defers daft and daft.functions imports; wraps to_struct and when in lazy helper functions that preserve the call-site API. |
| application_sdk/worker.py | Adds stop() method with same-loop and cross-thread shutdown support. Stores _worker_loop and _worker_thread references for coordinated shutdown. Includes early-exit check for _shutdown_initiated before worker.run(). |
Sequence Diagram
sequenceDiagram
participant User as User (Ctrl+C)
participant App as BaseApplication.start()
participant Server as APIServer.start()
participant Uvicorn as uvicorn.Server
participant Worker as Worker (daemon thread)
App->>Worker: start(daemon=True)
Worker-->>Worker: Thread: asyncio.run(start(daemon=False))
Worker->>Worker: _worker_loop = get_running_loop()
Worker->>Worker: workflow_worker = create_worker()
Worker->>Worker: worker.run()
App->>Server: _start_server()
Server->>Uvicorn: await server.serve()
User->>Uvicorn: KeyboardInterrupt / CancelledError
Uvicorn-->>Server: exception raised
Server->>Uvicorn: server.should_exit = True
Server->>Uvicorn: await server.shutdown()
Server-->>App: returns (finally block)
App->>Worker: _stop_worker() → worker.stop()
alt Same loop
Worker->>Worker: await _shutdown_worker()
else Cross-thread (daemon)
Worker->>Worker: run_coroutine_threadsafe(_shutdown_worker, worker_loop)
Worker->>Worker: await wrap_future(future)
end
Worker-->>App: shutdown complete
Last reviewed commit: d8c6750
|
|
||
| await EventStore.publish_event(worker_creation_event) | ||
|
|
||
| self._worker_loop = asyncio.get_running_loop() |
There was a problem hiding this comment.
Missing memory barrier for cross-thread field visibility
self._worker_loop is written here in the daemon thread and read by stop() in the main thread (line 317). While CPython's GIL makes individual attribute writes atomic, there is no guaranteed ordering between _worker_loop and workflow_worker assignments across threads. In practice this works because:
- The
_shutdown_initiatedflag covers the early window. - Python's GIL ensures visibility after the next bytecode boundary.
However, if you ever need stronger guarantees (or run on an alternative Python runtime), consider using a threading.Event or threading.Lock to coordinate the handoff. This is not a blocking issue for CPython but worth noting for future maintainability.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Summary
APIServer.start()shutdown path so Ctrl+C / task cancellation triggers explicit uvicorn shutdown (server.should_exit+await server.shutdown())BaseApplication.start()local-mode lifecycle to always stop the worker when server exits/failsWorker.stop()support for both same-loop and cross-thread (daemon) worker shutdownpandas,duckdb, OpenTelemetry modules,boto3, anddaftget_metrics()/get_traces()only when used)Why
Validation
uv run pytest tests/unit/observability tests/unit/application/test_application.py tests/unit/application/metadata_extraction/test_sql.py tests/unit/common/test_aws_utils.py tests/unit/common/test_file_converter.py tests/unit/transformers -quv run pre-commit run --files application_sdk/application/__init__.py application_sdk/application/metadata_extraction/sql.py application_sdk/common/aws_utils.py application_sdk/common/file_converter.py application_sdk/observability/logger_adaptor.py application_sdk/observability/metrics_adaptor.py application_sdk/observability/observability.py application_sdk/observability/traces_adaptor.py application_sdk/server/fastapi/middleware/metrics.py application_sdk/transformers/atlas/__init__.py application_sdk/transformers/query/__init__.py