Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions libdd-data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct TraceExporterConfig {
process_tags: Option<String>,
test_session_token: Option<String>,
connection_timeout: Option<u64>,
otlp_endpoint: Option<String>,
}

#[no_mangle]
Expand Down Expand Up @@ -414,8 +415,37 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_connection_timeout(
)
}

/// Enables OTLP HTTP/JSON export and sets the endpoint URL.
///
/// When set, traces are sent to this URL in OTLP HTTP/JSON format instead of the Datadog
/// agent. The host language is responsible for resolving the endpoint from its configuration
/// (e.g. `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`) before calling this function.
#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_endpoint(
config: Option<&mut TraceExporterConfig>,
url: CharSlice,
) -> Option<Box<ExporterError>> {
catch_panic!(
if let Some(handle) = config {
handle.otlp_endpoint = match sanitize_string(url) {
Ok(s) => Some(s),
Err(e) => return Some(e),
};
None
} else {
gen_error!(ErrorCode::InvalidArgument)
},
gen_error!(ErrorCode::Panic)
)
}

/// Create a new TraceExporter instance.
///
/// When `OTEL_TRACES_EXPORTER=otlp` is set in the environment, the exporter sends traces in
/// OTLP HTTP/JSON to the configured OTLP endpoint instead of the Datadog agent. The same
/// payload (e.g. MessagePack) is passed to `ddog_trace_exporter_send`; the library decodes
/// and converts to OTLP when OTLP is enabled.
///
/// # Arguments
///
/// * `out_handle` - The handle to write the TraceExporter instance in.
Expand Down Expand Up @@ -467,6 +497,10 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(
builder.enable_health_metrics();
}

if let Some(ref url) = config.otlp_endpoint {
builder.set_otlp_endpoint(url);
}

match builder.build() {
Ok(exporter) => {
out_handle.as_ptr().write(Box::new(exporter));
Expand Down
1 change: 1 addition & 0 deletions libdd-data-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

pub mod agent_info;
mod health_metrics;
pub mod otlp;
mod pausable_worker;
#[allow(missing_docs)]
pub mod stats_exporter;
Expand Down
35 changes: 35 additions & 0 deletions libdd-data-pipeline/src/otlp/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
Comment on lines +1 to +2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should parse the configurations from the host language, and not in libdatadg.

The way we would configure the TraceExporter for agent_url for instance would be

1) language_configuration resolves agent_url  from different sources
2) when the tracer wants to instantiate a TraceExporter it first creates a TraceExporterBuilder and calls setters on it (set_agent_url in this case) 
3) TraceExporterBuilder::build gives a TraceExporter containing the agent_url 
4) In TraceExporter::send we can just read self.agent_url


//! OTLP trace export configuration.

use std::time::Duration;

/// OTLP trace export protocol. HTTP/JSON is currently supported.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub(crate) enum OtlpProtocol {
/// HTTP with JSON body (Content-Type: application/json). Default for HTTP.
#[default]
HttpJson,
/// HTTP with protobuf body. (Not supported yet)
HttpProtobuf,
/// gRPC. (Not supported yet)
Grpc,
}

/// Default timeout for OTLP export requests.
pub const DEFAULT_OTLP_TIMEOUT: Duration = Duration::from_secs(10);

/// Parsed OTLP trace exporter configuration.
#[derive(Clone, Debug)]
pub struct OtlpTraceConfig {
/// Full URL to POST traces to (e.g. `http://localhost:4318/v1/traces`).
pub endpoint_url: String,
/// Optional HTTP headers (key-value pairs).
pub headers: Vec<(String, String)>,
/// Request timeout.
pub timeout: Duration,
/// Protocol (for future use; currently only HttpJson is supported).
#[allow(dead_code)]
pub(crate) protocol: OtlpProtocol,
}
78 changes: 78 additions & 0 deletions libdd-data-pipeline/src/otlp/exporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! OTLP HTTP/JSON trace exporter.

use super::config::OtlpTraceConfig;
use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError};
use libdd_common::{http_common, Endpoint, HttpClient};
use libdd_trace_utils::send_with_retry::{
send_with_retry, RetryBackoffType, RetryStrategy, SendWithRetryError,
};
use std::collections::HashMap;

/// Max total attempts for OTLP export (1 initial + up to 4 retries on transient failures).
const OTLP_MAX_ATTEMPTS: u32 = 5;
/// Initial backoff between retries (milliseconds).
const OTLP_RETRY_DELAY_MS: u64 = 100;

/// Send OTLP trace payload (JSON bytes) to the configured endpoint with retries.
///
/// Uses [`send_with_retry`] for consistent retry behaviour and observability across exporters.
///
/// Note: dynamic OTLP headers from `OTEL_EXPORTER_OTLP_HEADERS` are not forwarded because
/// [`send_with_retry`] requires `&'static str` header keys. Support for arbitrary OTEL headers
/// would require the API to accept `HashMap<String, String>`.
Comment on lines +23 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm true 🤔
I'll refactor send_with_retry so it's possible

pub async fn send_otlp_traces_http(
client: &HttpClient,
config: &OtlpTraceConfig,
json_body: Vec<u8>,
) -> Result<(), TraceExporterError> {
let url = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| {
TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!(
"Invalid OTLP endpoint URL: {}",
e
)))
})?;

let target = Endpoint {
url,
timeout_ms: config.timeout.as_millis() as u64,
..Endpoint::default()
};

let headers: HashMap<&'static str, String> =
HashMap::from([("Content-Type", "application/json".to_string())]);

let retry_strategy = RetryStrategy::new(
OTLP_MAX_ATTEMPTS,
OTLP_RETRY_DELAY_MS,
RetryBackoffType::Exponential,
None,
);

match send_with_retry(client, &target, json_body, &headers, &retry_strategy).await {
Ok(_) => Ok(()),
Err(e) => Err(map_send_error(e).await),
}
}

async fn map_send_error(err: SendWithRetryError) -> TraceExporterError {
match err {
SendWithRetryError::Http(response, _) => {
let status = response.status();
let body_bytes = http_common::collect_response_bytes(response)
.await
.unwrap_or_default();
let body_str = String::from_utf8_lossy(&body_bytes);
TraceExporterError::Request(RequestError::new(status, &body_str))
}
SendWithRetryError::Timeout(_) => {
TraceExporterError::Io(std::io::Error::from(std::io::ErrorKind::TimedOut))
}
SendWithRetryError::Network(error, _) => TraceExporterError::from(error),
SendWithRetryError::Build(_) => TraceExporterError::Internal(
InternalErrorKind::InvalidWorkerState("Failed to build OTLP request".to_string()),
),
}
}
29 changes: 29 additions & 0 deletions libdd-data-pipeline/src/otlp/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! OTLP trace export for libdatadog.
//!
//! When an OTLP endpoint is configured via [`crate::trace_exporter::TraceExporterBuilder::set_otlp_endpoint`],
//! the trace exporter sends traces in OTLP HTTP/JSON format to that endpoint instead of the
//! Datadog agent. The host language is responsible for resolving the endpoint from its own
//! configuration (e.g. `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`).
//!
//! ## Sampling
//!
//! By default, the exporter does not apply its own sampling: it exports every trace it receives
//! from the tracer. The tracer (e.g. dd-trace-py) is responsible for inheriting the sampling
//! decision from the distributed trace context; when no decision is present, the tracer typically
//! uses 100% (always on).
//!
//! ## Partial flush
//!
//! For the POC, partial flush is disabled. The tracer should only invoke the exporter when all
//! spans from a local trace are closed (i.e. send complete trace chunks). This crate does not
//! buffer or flush partially—it exports whatever trace chunks it receives.

pub mod config;
pub mod exporter;

pub use config::OtlpTraceConfig;
pub use exporter::send_otlp_traces_http;
pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo};
21 changes: 21 additions & 0 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

use crate::agent_info::AgentInfoFetcher;
use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT};
use crate::otlp::OtlpTraceConfig;
use crate::pausable_worker::PausableWorker;
use crate::telemetry::TelemetryClientBuilder;
use crate::trace_exporter::agent_response::AgentResponsePayloadVersion;
Expand Down Expand Up @@ -51,6 +53,7 @@ pub struct TraceExporterBuilder {
test_session_token: Option<String>,
agent_rates_payload_version_enabled: bool,
connection_timeout: Option<u64>,
otlp_endpoint: Option<String>,
}

impl TraceExporterBuilder {
Expand Down Expand Up @@ -223,6 +226,18 @@ impl TraceExporterBuilder {
self
}

/// Enables OTLP HTTP/JSON export and sets the endpoint URL.
///
/// When set, traces are sent to this endpoint in OTLP HTTP/JSON format instead of the
/// Datadog agent. The host language is responsible for resolving the endpoint from its
/// configuration (e.g. `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`) before calling this method.
///
/// Example: `set_otlp_endpoint("http://localhost:4318/v1/traces")`
pub fn set_otlp_endpoint(&mut self, url: &str) -> &mut Self {
self.otlp_endpoint = Some(url.to_owned());
self
}

#[allow(missing_docs)]
pub fn build(self) -> Result<TraceExporter, TraceExporterError> {
if !Self::is_inputs_outputs_formats_compatible(self.input_format, self.output_format) {
Expand Down Expand Up @@ -345,6 +360,12 @@ impl TraceExporterBuilder {
.agent_rates_payload_version_enabled
.then(AgentResponsePayloadVersion::new),
http_client: new_default_client(),
otlp_config: self.otlp_endpoint.map(|url| OtlpTraceConfig {
endpoint_url: url,
headers: vec![],
timeout: DEFAULT_OTLP_TIMEOUT,
protocol: OtlpProtocol::HttpJson,
}),
})
}

Expand Down
Loading
Loading