diff --git a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py index 4b5174a3bf3ca..14726e3ecc064 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py +++ b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py @@ -433,6 +433,22 @@ def get_otel_logger( ) readers.append(export_to_console) + # Reset the OTel SDK's Once() guard so set_meter_provider() can succeed. + # This is necessary when get_otel_logger() is called after a process fork: + # the parent's _METER_PROVIDER_SET_ONCE._done = True is inherited by the child, + # causing set_meter_provider() to silently fail with "Overriding of current + # MeterProvider is not allowed". The child then uses the parent's stale provider + # whose PeriodicExportingMetricReader thread is dead after fork. + # On first call (no fork), _done is already False so this is a no-op. + # See: https://github.com/apache/airflow/issues/64690 + try: + import opentelemetry.metrics._internal as _metrics_internal + + _metrics_internal._METER_PROVIDER_SET_ONCE._done = False + _metrics_internal._METER_PROVIDER = None + except (ImportError, AttributeError): + pass + metrics.set_meter_provider( MeterProvider( resource=resource, diff --git a/shared/observability/tests/observability/metrics/test_otel_logger.py b/shared/observability/tests/observability/metrics/test_otel_logger.py index c27c372996968..f7b348354d7c9 100644 --- a/shared/observability/tests/observability/metrics/test_otel_logger.py +++ b/shared/observability/tests/observability/metrics/test_otel_logger.py @@ -442,7 +442,50 @@ def test_atexit_flush_on_process_exit(self): f"stderr:\n{proc.stderr}" ) + def test_reinit_after_fork_exports_metrics(self): + """Calling get_otel_logger() twice (simulating post-fork re-init) should still export metrics. + + Reproduces https://github.com/apache/airflow/issues/64690: the OTel SDK's Once() + guard on set_meter_provider() survives fork, preventing the child from setting a + fresh MeterProvider. The fix resets the guard before each set_meter_provider() call. + """ + test_module_name = "tests.observability.metrics.test_otel_logger" + function_call_str = f"import {test_module_name} as m; m.mock_service_run_reinit()" + + proc = subprocess.run( + [sys.executable, "-c", function_call_str], + check=False, + env=os.environ.copy(), + capture_output=True, + text=True, + timeout=20, + ) + + assert proc.returncode == 0, f"Process failed\nstdout:\n{proc.stdout}\nstderr:\n{proc.stderr}" + + assert "post_fork_stat" in proc.stdout, ( + "Expected 'post_fork_stat' in stdout after re-initialization but it wasn't found. " + "This suggests set_meter_provider() failed due to the Once() guard.\n" + f"stdout:\n{proc.stdout}\n" + f"stderr:\n{proc.stderr}" + ) + def mock_service_run(): logger = get_otel_logger(debug=True) logger.incr("my_test_stat") + + +def mock_service_run_reinit(): + """Simulate re-initialization after fork by calling get_otel_logger() twice. + + The first call sets the global MeterProvider and the Once() guard. + The second call simulates what happens in a forked child: stats.py detects + a PID mismatch and calls the factory again. Without the fix, the second + set_meter_provider() silently fails and the child uses a stale provider. + """ + # First init — sets Once._done = True + get_otel_logger(debug=True) + # Second init — simulates post-fork re-initialization + logger = get_otel_logger(debug=True) + logger.incr("post_fork_stat")