BLDX-590: restructure observability class hierarchy & per-signal flags#1067
BLDX-590: restructure observability class hierarchy & per-signal flags#1067
Conversation
Move all apps to write under artifacts/apps/observability/ instead of
per-app artifacts/apps/{app_name}/{deploy_name}/observability/ so MDLH
can scan a single S3 prefix for ingestion.
Encode app identity in the filename:
{timestamp_ns}_{hostname}_{app_name}_{suffix}.parquet
Replace ParquetFileWriter with direct parquet write + explicit dual
upload (deployment store + upstream for SDR).
- Cherry-pick app_name from BLDX-599 (bf1ec61) - Add correlation_id field (duplicate of trace_id) to LogExtraModel - Change LOG_FILE_NAME default from log.parquet to log.jsonl.gz - Override _flush_records in AtlanLoggerAdapter to write compressed JSON Lines instead of parquet, using orjson + gzip
Extract correlation fields (trace_id, atlan-*) from workflow_args and include them in the Temporal start_workflow args so the CorrelationContextInterceptor can propagate them to activities and logs. BLDX-644 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
✅ Snyk checks have passed. No issues have been found so far.
💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse. |
📜 Docstring Coverage ReportRESULT: PASSED (minimum: 30.0%, actual: 79.5%) Detailed Coverage Report |
📦 Trivy Vulnerability Scan Results
Report SummaryCould not generate summary table (data length mismatch: 9 vs 8). Scan Result Detailsrequirements.txtuv.lock |
📦 Trivy Secret Scan Results
Report Summary
Scan Result Details✅ No secrets found during the scan for |
|
🛠 Docs available at: https://k.atlan.dev/application-sdk/BLDX-590-clean |
☂️ Python Coverage
Overall Coverage
New FilesNo new covered files... Modified Files
|
|
🛠 Full Test Coverage Report: https://k.atlan.dev/coverage/application-sdk/pr/1067 |
Greptile SummaryThis PR implements three coordinated changes to SDK observability:
Key observations:
Confidence Score: 4/5
|
| Filename | Overview |
|---|---|
| application_sdk/clients/temporal.py | Extracts trace_id and atlan-* fields from workflow args and forwards them to Temporal start_workflow for correlation context propagation via CorrelationContextInterceptor |
| application_sdk/constants.py | Changes log file format from log.parquet to log.jsonl.gz and simplifies OBSERVABILITY_DIR to shared prefix artifacts/apps/observability; documentation needs update |
| application_sdk/observability/logger_adaptor.py | Adds app_name and correlation_id fields, overrides _flush_records to write jsonl.gz instead of parquet; good implementation but potential performance impact from gzip per-record write pattern |
| application_sdk/observability/observability.py | Refactored to add hostname and timestamp_ns to parquet filenames for uniqueness, simplified from ParquetFileWriter to direct df.to_parquet() call |
| application_sdk/observability/utils.py | Simplified get_observability_dir() to remove application_name and deployment_name formatting, now returns static shared prefix path |
Sequence Diagram
sequenceDiagram
participant Client as Workflow Client
participant Temporal as Temporal Server
participant WF as Workflow
participant Interceptor as CorrelationContextInterceptor
participant Activity as Activity
participant Logger as AtlanLoggerAdapter
participant S3 as Object Store
Client->>Temporal: start_workflow(args={workflow_id, trace_id, atlan-*})
Temporal->>WF: execute_workflow
WF->>Interceptor: extract correlation fields
Interceptor->>Interceptor: set correlation_context(trace_id, atlan-*)
WF->>Activity: start_activity (with headers)
Interceptor->>Activity: inject trace_id & atlan-* headers
Activity->>Interceptor: read headers
Interceptor->>Interceptor: set correlation_context
Activity->>Logger: log message
Logger->>Logger: add app_name, correlation_id=trace_id
Logger->>Logger: buffer & flush to jsonl.gz
Logger->>S3: upload to artifacts/apps/observability/logs/year=.../month=.../day=.../
Last reviewed commit: 1a0557b
| with gzip.open(file_path, "wb") as f: | ||
| for record in partition_data: | ||
| f.write(orjson.dumps(record) + b"\n") |
There was a problem hiding this comment.
inefficient write pattern - each record writes separately with gzip overhead
| with gzip.open(file_path, "wb") as f: | |
| for record in partition_data: | |
| f.write(orjson.dumps(record) + b"\n") | |
| # Write all records in one batch | |
| jsonl_content = b"".join(orjson.dumps(record) + b"\n" for record in partition_data) | |
| f.write(jsonl_content) |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
ecf5eef to
775abac
Compare
…, int64 timestamps - Add AtlanParquetObservability intermediate class: parquet flush logic moves out of the ABC helper into a proper class that metrics, traces, and logs all inherit from. - Per-signal sink flags: ENABLE_LOG_SINK, ENABLE_METRICS_SINK, ENABLE_TRACES_SINK (default to ENABLE_OBSERVABILITY_DAPR_SINK). - Convert timestamp from float (DOUBLE) to int64 nanoseconds (INT64) in parquet files for OTel/S3-Iceberg compatibility. - Revert logs back to parquet format (drop jsonl.gz experiment). - S3 path simplified to artifacts/apps/observability/ (app identity encoded in filename).
775abac to
ec53bec
Compare
Summary
AtlanParquetObservabilityintermediate class — parquet flush logic moves out of the ABC helper into a proper class. All three signals (logs, metrics, traces) inherit from it.ENABLE_LOG_SINK,ENABLE_METRICS_SINK,ENABLE_TRACES_SINK) that default to the existingENABLE_OBSERVABILITY_DAPR_SINKmaster toggle.timestampfromfloat(DOUBLE in parquet) toint64nanoseconds (INT64) for OTel/S3-Iceberg compatibility.artifacts/apps/observability/{signal}/year=.../with app identity encoded in the filename.Test plan
pytest tests/unit/observability/ -v— 43 passed, 1 skipped🤖 Generated with Claude Code