From 846d3cd9e4926922c2e7538549a520cf99a7f096 Mon Sep 17 00:00:00 2001 From: Rachel Yang Date: Fri, 3 Apr 2026 11:29:01 -0400 Subject: [PATCH] WIP: enable rust endpoints for baggagE --- manifests/rust.yml | 32 +-- .../docker/rust/parametric/src/datadog/mod.rs | 210 +++++++++++++++--- 2 files changed, 197 insertions(+), 45 deletions(-) diff --git a/manifests/rust.yml b/manifests/rust.yml index 3782b3a2aa5..ed13b94bce0 100644 --- a/manifests/rust.yml +++ b/manifests/rust.yml @@ -118,22 +118,22 @@ manifest: tests/parametric/test_headers_b3multi.py::Test_Headers_B3multi::test_headers_b3multi_propagate_invalid: missing_feature # Created by easy win activation script tests/parametric/test_headers_b3multi.py::Test_Headers_B3multi::test_headers_b3multi_propagate_valid: missing_feature # Created by easy win activation script tests/parametric/test_headers_b3multi.py::Test_Headers_B3multi::test_headers_b3multi_single_key_propagate_valid: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage: '>=0.2.1' # Modified by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_extract_header_D005: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_get_D008: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_get_all_D009: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_inject_header_D004: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_malformed_headers_D012: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_malformed_headers_D013: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_malformed_headers_D014: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_malformed_headers_D015: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_remove_D010: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_remove_all_D011: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_set_D006: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggageheader_maxbytes_inject_D017: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggageheader_maxitems_inject_D016: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_headers_baggage_default_D001: missing_feature # Created by easy win activation script - tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_headers_baggage_only_D002: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage: '>=0.2.1' # Modified by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_extract_header_D005: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_get_D008: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_get_all_D009: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_inject_header_D004: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_malformed_headers_D012: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_malformed_headers_D013: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_malformed_headers_D014: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_malformed_headers_D015: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_remove_D010: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_remove_all_D011: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggage_set_D006: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggageheader_maxbytes_inject_D017: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_baggageheader_maxitems_inject_D016: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_headers_baggage_default_D001: missing_feature # Created by easy win activation script + # tests/parametric/test_headers_baggage.py::Test_Headers_Baggage::test_headers_baggage_only_D002: missing_feature # Created by easy win activation script tests/parametric/test_headers_baggage.py::Test_Headers_Baggage_Span_Tags: '>=0.2.1' # Modified by easy win activation script tests/parametric/test_headers_baggage.py::Test_Headers_Baggage_Span_Tags::test_baggage_span_tags_all: missing_feature # Created by easy win activation script tests/parametric/test_headers_baggage.py::Test_Headers_Baggage_Span_Tags::test_baggage_span_tags_config_with_empty_keys: missing_feature # Created by easy win activation script diff --git a/utils/build/docker/rust/parametric/src/datadog/mod.rs b/utils/build/docker/rust/parametric/src/datadog/mod.rs index 4e3c0d2621f..c5176abbdea 100644 --- a/utils/build/docker/rust/parametric/src/datadog/mod.rs +++ b/utils/build/docker/rust/parametric/src/datadog/mod.rs @@ -8,15 +8,99 @@ use axum::{ }; use dto::*; use opentelemetry::{ + baggage::BaggageExt, trace::{Span, TraceContextExt, Tracer}, Context, }; use opentelemetry_http::HeaderExtractor; -use std::{collections::HashMap, sync::Arc, vec}; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, OnceLock, + }, + vec, +}; use tracing::debug; use crate::{get_tracer, AppState, ContextWithParent}; +/// Returns true if `b` is a valid HTTP token char (tchar per RFC 9110). +/// tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." / +/// "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA +fn is_tchar(b: u8) -> bool { + matches!( + b, + b'!' | b'#' + | b'$' + | b'%' + | b'&' + | b'\'' + | b'*' + | b'+' + | b'-' + | b'.' + | b'^' + | b'_' + | b'`' + | b'|' + | b'~' + | b'0'..=b'9' + | b'a'..=b'z' + | b'A'..=b'Z' + ) +} + +/// Percent-encode any byte in `key` that is not a valid tchar, so the resulting +/// string passes OTel's `Baggage` key-validity check. +fn encode_baggage_key(key: &str) -> String { + let mut encoded = String::with_capacity(key.len()); + for b in key.bytes() { + if is_tchar(b) { + encoded.push(b as char); + } else { + encoded.push_str(&format!("%{:02X}", b)); + } + } + encoded +} + +/// Percent-decode a baggage key back to its original form. +fn decode_baggage_key(key: &str) -> String { + let bytes = key.as_bytes(); + let mut result: Vec = Vec::with_capacity(bytes.len()); + let mut i = 0; + while i < bytes.len() { + if bytes[i] == b'%' && i + 2 < bytes.len() { + if let Ok(byte) = u8::from_str_radix( + std::str::from_utf8(&bytes[i + 1..i + 3]).unwrap_or("XX"), + 16, + ) { + result.push(byte); + i += 3; + continue; + } + } + result.push(bytes[i]); + i += 1; + } + String::from_utf8_lossy(&result).into_owned() +} + +/// Stored in the OTel `Context` to signal that the baggage size limit has been +/// reached; subsequent `set_baggage` calls for the same span are silently dropped. +#[derive(Clone)] +struct BaggageOverflowed; + +/// Generate a synthetic span ID for baggage-only extracted contexts (no valid trace context). +/// Counts down from u64::MAX/2 to avoid collision with real span IDs (which are random 64-bit values). +fn next_synthetic_span_id() -> u64 { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER + .get_or_init(|| AtomicU64::new(u64::MAX / 2)) + .fetch_sub(1, Ordering::Relaxed) +} + pub fn app() -> Router { Router::new() .route("/span/start", post(start_span)) @@ -32,11 +116,11 @@ pub fn app() -> Router { .route("/span/extract_headers", post(extract_headers)) .route("/span/flush", post(flush_spans)) .route("/stats/flush", post(flush_stats)) - // .route("/span/set_baggage", post(set_baggage)) - // .route("/span/get_baggage", get(get_baggage)) - // .route("/span/get_all_baggage", get(get_all_baggage)) - // .route("/span/remove_baggage", post(remove_baggage)) - // .route("/span/remove_all_baggage", post(remove_all_baggage)) + .route("/span/set_baggage", post(set_baggage)) + .route("/span/get_baggage", get(get_baggage)) + .route("/span/get_all_baggage", get(get_all_baggage)) + .route("/span/remove_baggage", post(remove_baggage)) + .route("/span/remove_all_baggage", post(remove_all_baggage)) } // Handler implementations @@ -133,7 +217,10 @@ async fn start_span( let span_id = u64::from_be_bytes(id.to_bytes()); let trace_id = u128::from_be_bytes(span.span_context().trace_id().to_bytes()); - let ctx = Context::current_with_span(span); + let ctx = match parent_ctx { + Some(ref p) => p.with_span(span), + None => Context::current_with_span(span), + }; let ctx_with_parent = Arc::new(ContextWithParent::new(ctx, parent_ctx)); *state.current_context.lock().unwrap() = ctx_with_parent.clone(); @@ -285,16 +372,12 @@ async fn inject_headers( ) -> Json { let contexts = state.contexts.lock().unwrap(); if let Some(ctx) = contexts.get(&args.span_id) { - let span = ctx.context.span(); opentelemetry::global::get_text_map_propagator(|propagator| { let mut injector = HashMap::new(); - // TODO: review! - let context = Context::new().with_remote_span_context(span.span_context().clone()); - - debug!("inject_headers: context: {:#?}", context); + debug!("inject_headers: context: {:#?}", ctx.context); - propagator.inject_context(&context, &mut injector); + propagator.inject_context(&ctx.context, &mut injector); debug!( "inject_headers: span {} found: {:#?}", @@ -340,6 +423,22 @@ async fn extract_headers( let context = propagator.extract(&HeaderExtractor(&extractor)); if !context.span().span_context().is_valid() { + // Even without a valid span context, there may be baggage. Store the context + // under a synthetic ID so start_span can inherit the baggage. + if !context.baggage().is_empty() { + let synthetic_id = next_synthetic_span_id(); + debug!( + "extract_headers: no valid span context but has baggage, using synthetic id {synthetic_id}" + ); + state + .extracted_span_contexts + .lock() + .unwrap() + .insert(synthetic_id, context); + return Json(SpanExtractHeadersResult { + span_id: Some(synthetic_id), + }); + } debug!("extract_headers: no valid context. Returning empty result"); return Json(SpanExtractHeadersResult { span_id: None }); } @@ -383,12 +482,43 @@ async fn flush_stats(State(_): State) -> StatusCode { StatusCode::OK } -/* async fn set_baggage(State(state): State, Json(args): Json) { let mut contexts = state.contexts.lock().unwrap(); - if let Some(span) = spans.get_mut(&args.span_id) { + if let Some(ctx) = contexts.get(&args.span_id).cloned() { debug!("set_baggage: span {} found", args.span_id); - span.set_baggage_item(args.key.clone(), Some(args.value.clone())); + + // If the size limit was previously reached, silently drop new items. + if ctx.context.get::().is_some() { + debug!("set_baggage: baggage overflow flag set, skipping"); + return; + } + + let encoded_key = encode_baggage_key(&args.key); + + let mut new_baggage = opentelemetry::baggage::Baggage::new(); + for (k, (v, _)) in ctx.context.baggage().iter() { + new_baggage.insert(k.clone(), v.as_str().to_string()); + } + new_baggage.insert(encoded_key.clone(), args.value.clone()); + + // Detect whether the insert succeeded: the key must now hold the expected value. + let insert_succeeded = new_baggage + .iter() + .any(|(k, (v, _))| k.as_str() == encoded_key && v.as_str() == args.value.as_str()); + + let new_context = ctx.context.with_baggage(new_baggage); + // On failure, mark the context so no subsequent items are accepted. + let new_context = if insert_succeeded { + new_context + } else { + debug!("set_baggage: insert failed (overflow), setting overflow flag"); + new_context.with_value(BaggageOverflowed) + }; + + contexts.insert( + args.span_id, + Arc::new(ContextWithParent::new(new_context, ctx.parent.clone())), + ); } else { debug!("set_baggage: span {} NOT found", args.span_id); } @@ -399,11 +529,17 @@ async fn get_baggage( Json(args): Json, ) -> Json { let contexts = state.contexts.lock().unwrap(); - if let Some(span) = spans.get(&args.span_id) { + if let Some(ctx) = contexts.get(&args.span_id) { debug!("get_baggage: span {} found", args.span_id); - Json(SpanGetBaggageResult { - baggage: span.get_baggage_item(&args.key), - }) + // Keys may be stored in percent-encoded form; encode the lookup key accordingly. + let encoded_key = encode_baggage_key(&args.key); + let value = ctx + .context + .baggage() + .iter() + .find(|(k, _)| k.as_str() == encoded_key.as_str()) + .map(|(_, (v, _))| v.as_str().to_string()); + Json(SpanGetBaggageResult { baggage: value }) } else { debug!("get_baggage: span {} NOT found", args.span_id); Json(SpanGetBaggageResult { baggage: None }) @@ -415,11 +551,16 @@ async fn get_all_baggage( Json(args): Json, ) -> Json { let contexts = state.contexts.lock().unwrap(); - if let Some(span) = spans.get(&args.span_id) { + if let Some(ctx) = contexts.get(&args.span_id) { debug!("get_all_baggage: span {} found", args.span_id); - Json(SpanGetAllBaggageResult { - baggage: Some(span.baggage.clone()), - }) + // Decode keys back to their original (possibly non-tchar) form. + let baggage: HashMap = ctx + .context + .baggage() + .iter() + .map(|(k, (v, _))| (decode_baggage_key(k.as_str()), v.as_str().to_string())) + .collect(); + Json(SpanGetAllBaggageResult { baggage: Some(baggage) }) } else { debug!("get_all_baggage: span {} NOT found", args.span_id); Json(SpanGetAllBaggageResult { baggage: None }) @@ -428,9 +569,20 @@ async fn get_all_baggage( async fn remove_baggage(State(state): State, Json(args): Json) { let mut contexts = state.contexts.lock().unwrap(); - if let Some(span) = spans.get_mut(&args.span_id) { + if let Some(ctx) = contexts.get(&args.span_id).cloned() { debug!("remove_baggage: span {} found", args.span_id); - span.set_baggage_item(args.key.clone(), None); + let encoded_key = encode_baggage_key(&args.key); + let mut new_baggage = opentelemetry::baggage::Baggage::new(); + for (k, (v, _)) in ctx.context.baggage().iter() { + if k.as_str() != encoded_key.as_str() { + new_baggage.insert(k.clone(), v.as_str().to_string()); + } + } + let new_context = ctx.context.with_baggage(new_baggage); + contexts.insert( + args.span_id, + Arc::new(ContextWithParent::new(new_context, ctx.parent.clone())), + ); } else { debug!("remove_baggage: span {} NOT found", args.span_id); } @@ -441,11 +593,11 @@ async fn remove_all_baggage( Json(args): Json, ) { let mut contexts = state.contexts.lock().unwrap(); - if let Some(span) = spans.get_mut(&args.span_id) { + if let Some(ctx) = contexts.get(&args.span_id).cloned() { debug!("remove_all_baggage: span {} found", args.span_id); - span.baggage.clear(); + let new_context = ctx.context.with_cleared_baggage(); + contexts.insert(args.span_id, Arc::new(ContextWithParent::new(new_context, ctx.parent.clone()))); } else { debug!("remove_all_baggage: span {} NOT found", args.span_id); } } -*/