Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,22 @@ def get_otel_logger(
)
readers.append(export_to_console)

# Reset the OTel SDK's Once() guard so set_meter_provider() can succeed.
# This is necessary when get_otel_logger() is called after a process fork:
# the parent's _METER_PROVIDER_SET_ONCE._done = True is inherited by the child,
# causing set_meter_provider() to silently fail with "Overriding of current
# MeterProvider is not allowed". The child then uses the parent's stale provider
# whose PeriodicExportingMetricReader thread is dead after fork.
# On first call (no fork), _done is already False so this is a no-op.
# See: https://github.com/apache/airflow/issues/64690
try:
import opentelemetry.metrics._internal as _metrics_internal

_metrics_internal._METER_PROVIDER_SET_ONCE._done = False
_metrics_internal._METER_PROVIDER = None
except (ImportError, AttributeError):
pass

metrics.set_meter_provider(
MeterProvider(
resource=resource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,50 @@ def test_atexit_flush_on_process_exit(self):
f"stderr:\n{proc.stderr}"
)

def test_reinit_after_fork_exports_metrics(self):
"""Calling get_otel_logger() twice (simulating post-fork re-init) should still export metrics.

Reproduces https://github.com/apache/airflow/issues/64690: the OTel SDK's Once()
guard on set_meter_provider() survives fork, preventing the child from setting a
fresh MeterProvider. The fix resets the guard before each set_meter_provider() call.
"""
test_module_name = "tests.observability.metrics.test_otel_logger"
function_call_str = f"import {test_module_name} as m; m.mock_service_run_reinit()"

proc = subprocess.run(
[sys.executable, "-c", function_call_str],
check=False,
env=os.environ.copy(),
capture_output=True,
text=True,
timeout=20,
)

assert proc.returncode == 0, f"Process failed\nstdout:\n{proc.stdout}\nstderr:\n{proc.stderr}"

assert "post_fork_stat" in proc.stdout, (
"Expected 'post_fork_stat' in stdout after re-initialization but it wasn't found. "
"This suggests set_meter_provider() failed due to the Once() guard.\n"
f"stdout:\n{proc.stdout}\n"
f"stderr:\n{proc.stderr}"
)


def mock_service_run():
logger = get_otel_logger(debug=True)
logger.incr("my_test_stat")


def mock_service_run_reinit():
"""Simulate re-initialization after fork by calling get_otel_logger() twice.

The first call sets the global MeterProvider and the Once() guard.
The second call simulates what happens in a forked child: stats.py detects
a PID mismatch and calls the factory again. Without the fix, the second
set_meter_provider() silently fails and the child uses a stale provider.
"""
# First init — sets Once._done = True
get_otel_logger(debug=True)
# Second init — simulates post-fork re-initialization
logger = get_otel_logger(debug=True)
logger.incr("post_fork_stat")
Loading