Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,20 @@ def get_otel_logger(
)
readers.append(export_to_console)

# Reset the OTel SDK's MeterProvider state before setting a new one.
# This is necessary because the SDK uses a Once() guard that only allows
# set_meter_provider() to succeed once per process. When Airflow forks a
# subprocess for task execution, the child inherits the parent's Once._done=True
# state, causing set_meter_provider() to silently fail. The child then uses the
# parent's stale MeterProvider whose export thread is dead after fork, and
# task-level metrics (e.g. ti.finish) are lost.
# stats.py already detects the PID mismatch and only calls this function in
# forked children that need a fresh provider, so this reset is safe.
import opentelemetry.metrics._internal as _metrics_internal

_metrics_internal._METER_PROVIDER_SET_ONCE._done = False
_metrics_internal._METER_PROVIDER = None

metrics.set_meter_provider(
MeterProvider(
resource=resource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,30 @@ def test_atexit_flush_on_process_exit(self):
def mock_service_run():
logger = get_otel_logger(debug=True)
logger.incr("my_test_stat")


class TestOtelForkSafety:
def test_get_otel_logger_resets_meter_provider_guard(self):
"""
Verify that get_otel_logger() can set a new MeterProvider even when the
OTel SDK's Once() guard has already been triggered (simulating a forked child
process that inherited the parent's state).
"""
import opentelemetry.metrics._internal as _metrics_internal

# Simulate the state a forked child inherits: Once._done=True from parent
_metrics_internal._METER_PROVIDER_SET_ONCE._done = True
original_provider = _metrics_internal._METER_PROVIDER

try:
logger = get_otel_logger(debug=True)

# The guard should have been reset and a new provider set
assert _metrics_internal._METER_PROVIDER is not None
assert _metrics_internal._METER_PROVIDER is not original_provider
# The logger should be functional
assert isinstance(logger, SafeOtelLogger)
finally:
# Clean up: reset for other tests
_metrics_internal._METER_PROVIDER_SET_ONCE._done = False
_metrics_internal._METER_PROVIDER = None
Loading