-
Notifications
You must be signed in to change notification settings - Fork 15
WIP: [rust]: enable rust endpoints for baggage #6686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,15 +8,99 @@ use axum::{ | |
| }; | ||
| use dto::*; | ||
| use opentelemetry::{ | ||
| baggage::BaggageExt, | ||
| trace::{Span, TraceContextExt, Tracer}, | ||
| Context, | ||
| }; | ||
| use opentelemetry_http::HeaderExtractor; | ||
| use std::{collections::HashMap, sync::Arc, vec}; | ||
| use std::{ | ||
| collections::HashMap, | ||
| sync::{ | ||
| atomic::{AtomicU64, Ordering}, | ||
| Arc, OnceLock, | ||
| }, | ||
| vec, | ||
| }; | ||
| use tracing::debug; | ||
|
|
||
| use crate::{get_tracer, AppState, ContextWithParent}; | ||
|
|
||
| /// Returns true if `b` is a valid HTTP token char (tchar per RFC 9110). | ||
| /// tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." / | ||
| /// "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA | ||
| fn is_tchar(b: u8) -> bool { | ||
| matches!( | ||
| b, | ||
| b'!' | b'#' | ||
| | b'$' | ||
| | b'%' | ||
| | b'&' | ||
| | b'\'' | ||
| | b'*' | ||
| | b'+' | ||
| | b'-' | ||
| | b'.' | ||
| | b'^' | ||
| | b'_' | ||
| | b'`' | ||
| | b'|' | ||
| | b'~' | ||
| | b'0'..=b'9' | ||
| | b'a'..=b'z' | ||
| | b'A'..=b'Z' | ||
| ) | ||
| } | ||
|
|
||
| /// Percent-encode any byte in `key` that is not a valid tchar, so the resulting | ||
| /// string passes OTel's `Baggage` key-validity check. | ||
| fn encode_baggage_key(key: &str) -> String { | ||
| let mut encoded = String::with_capacity(key.len()); | ||
| for b in key.bytes() { | ||
| if is_tchar(b) { | ||
| encoded.push(b as char); | ||
| } else { | ||
| encoded.push_str(&format!("%{:02X}", b)); | ||
| } | ||
| } | ||
| encoded | ||
| } | ||
|
|
||
| /// Percent-decode a baggage key back to its original form. | ||
| fn decode_baggage_key(key: &str) -> String { | ||
| let bytes = key.as_bytes(); | ||
| let mut result: Vec<u8> = Vec::with_capacity(bytes.len()); | ||
| let mut i = 0; | ||
| while i < bytes.len() { | ||
| if bytes[i] == b'%' && i + 2 < bytes.len() { | ||
| if let Ok(byte) = u8::from_str_radix( | ||
| std::str::from_utf8(&bytes[i + 1..i + 3]).unwrap_or("XX"), | ||
| 16, | ||
| ) { | ||
| result.push(byte); | ||
| i += 3; | ||
| continue; | ||
| } | ||
| } | ||
| result.push(bytes[i]); | ||
| i += 1; | ||
| } | ||
| String::from_utf8_lossy(&result).into_owned() | ||
| } | ||
|
|
||
| /// Stored in the OTel `Context` to signal that the baggage size limit has been | ||
| /// reached; subsequent `set_baggage` calls for the same span are silently dropped. | ||
| #[derive(Clone)] | ||
| struct BaggageOverflowed; | ||
|
|
||
| /// Generate a synthetic span ID for baggage-only extracted contexts (no valid trace context). | ||
| /// Counts down from u64::MAX/2 to avoid collision with real span IDs (which are random 64-bit values). | ||
| fn next_synthetic_span_id() -> u64 { | ||
| static COUNTER: OnceLock<AtomicU64> = OnceLock::new(); | ||
| COUNTER | ||
| .get_or_init(|| AtomicU64::new(u64::MAX / 2)) | ||
| .fetch_sub(1, Ordering::Relaxed) | ||
| } | ||
|
|
||
| pub fn app() -> Router<AppState> { | ||
| Router::new() | ||
| .route("/span/start", post(start_span)) | ||
|
|
@@ -32,11 +116,11 @@ pub fn app() -> Router<AppState> { | |
| .route("/span/extract_headers", post(extract_headers)) | ||
| .route("/span/flush", post(flush_spans)) | ||
| .route("/stats/flush", post(flush_stats)) | ||
| // .route("/span/set_baggage", post(set_baggage)) | ||
| // .route("/span/get_baggage", get(get_baggage)) | ||
| // .route("/span/get_all_baggage", get(get_all_baggage)) | ||
| // .route("/span/remove_baggage", post(remove_baggage)) | ||
| // .route("/span/remove_all_baggage", post(remove_all_baggage)) | ||
| .route("/span/set_baggage", post(set_baggage)) | ||
| .route("/span/get_baggage", get(get_baggage)) | ||
| .route("/span/get_all_baggage", get(get_all_baggage)) | ||
| .route("/span/remove_baggage", post(remove_baggage)) | ||
| .route("/span/remove_all_baggage", post(remove_all_baggage)) | ||
| } | ||
|
|
||
| // Handler implementations | ||
|
|
@@ -133,7 +217,10 @@ async fn start_span( | |
| let span_id = u64::from_be_bytes(id.to_bytes()); | ||
| let trace_id = u128::from_be_bytes(span.span_context().trace_id().to_bytes()); | ||
|
|
||
| let ctx = Context::current_with_span(span); | ||
| let ctx = match parent_ctx { | ||
| Some(ref p) => p.with_span(span), | ||
| None => Context::current_with_span(span), | ||
| }; | ||
|
|
||
| let ctx_with_parent = Arc::new(ContextWithParent::new(ctx, parent_ctx)); | ||
| *state.current_context.lock().unwrap() = ctx_with_parent.clone(); | ||
|
|
@@ -285,16 +372,12 @@ async fn inject_headers( | |
| ) -> Json<SpanInjectHeadersResult> { | ||
| let contexts = state.contexts.lock().unwrap(); | ||
| if let Some(ctx) = contexts.get(&args.span_id) { | ||
| let span = ctx.context.span(); | ||
| opentelemetry::global::get_text_map_propagator(|propagator| { | ||
| let mut injector = HashMap::new(); | ||
|
|
||
| // TODO: review! | ||
| let context = Context::new().with_remote_span_context(span.span_context().clone()); | ||
|
|
||
| debug!("inject_headers: context: {:#?}", context); | ||
| debug!("inject_headers: context: {:#?}", ctx.context); | ||
|
|
||
| propagator.inject_context(&context, &mut injector); | ||
| propagator.inject_context(&ctx.context, &mut injector); | ||
|
|
||
| debug!( | ||
| "inject_headers: span {} found: {:#?}", | ||
|
|
@@ -340,6 +423,22 @@ async fn extract_headers( | |
| let context = propagator.extract(&HeaderExtractor(&extractor)); | ||
|
|
||
| if !context.span().span_context().is_valid() { | ||
| // Even without a valid span context, there may be baggage. Store the context | ||
| // under a synthetic ID so start_span can inherit the baggage. | ||
| if !context.baggage().is_empty() { | ||
| let synthetic_id = next_synthetic_span_id(); | ||
| debug!( | ||
| "extract_headers: no valid span context but has baggage, using synthetic id {synthetic_id}" | ||
| ); | ||
| state | ||
| .extracted_span_contexts | ||
| .lock() | ||
| .unwrap() | ||
| .insert(synthetic_id, context); | ||
| return Json(SpanExtractHeadersResult { | ||
| span_id: Some(synthetic_id), | ||
| }); | ||
| } | ||
| debug!("extract_headers: no valid context. Returning empty result"); | ||
| return Json(SpanExtractHeadersResult { span_id: None }); | ||
| } | ||
|
|
@@ -383,12 +482,43 @@ async fn flush_stats(State(_): State<AppState>) -> StatusCode { | |
| StatusCode::OK | ||
| } | ||
|
|
||
| /* | ||
| async fn set_baggage(State(state): State<AppState>, Json(args): Json<SpanSetBaggageArgs>) { | ||
| let mut contexts = state.contexts.lock().unwrap(); | ||
| if let Some(span) = spans.get_mut(&args.span_id) { | ||
| if let Some(ctx) = contexts.get(&args.span_id).cloned() { | ||
| debug!("set_baggage: span {} found", args.span_id); | ||
| span.set_baggage_item(args.key.clone(), Some(args.value.clone())); | ||
|
|
||
| // If the size limit was previously reached, silently drop new items. | ||
| if ctx.context.get::<BaggageOverflowed>().is_some() { | ||
| debug!("set_baggage: baggage overflow flag set, skipping"); | ||
| return; | ||
|
Comment on lines
+491
to
+493
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
After one overflow, Useful? React with 👍 / 👎. |
||
| } | ||
|
|
||
| let encoded_key = encode_baggage_key(&args.key); | ||
|
|
||
| let mut new_baggage = opentelemetry::baggage::Baggage::new(); | ||
| for (k, (v, _)) in ctx.context.baggage().iter() { | ||
| new_baggage.insert(k.clone(), v.as_str().to_string()); | ||
| } | ||
| new_baggage.insert(encoded_key.clone(), args.value.clone()); | ||
|
|
||
| // Detect whether the insert succeeded: the key must now hold the expected value. | ||
| let insert_succeeded = new_baggage | ||
| .iter() | ||
| .any(|(k, (v, _))| k.as_str() == encoded_key && v.as_str() == args.value.as_str()); | ||
|
|
||
| let new_context = ctx.context.with_baggage(new_baggage); | ||
| // On failure, mark the context so no subsequent items are accepted. | ||
| let new_context = if insert_succeeded { | ||
| new_context | ||
| } else { | ||
| debug!("set_baggage: insert failed (overflow), setting overflow flag"); | ||
| new_context.with_value(BaggageOverflowed) | ||
| }; | ||
|
|
||
| contexts.insert( | ||
| args.span_id, | ||
| Arc::new(ContextWithParent::new(new_context, ctx.parent.clone())), | ||
| ); | ||
| } else { | ||
| debug!("set_baggage: span {} NOT found", args.span_id); | ||
| } | ||
|
|
@@ -399,11 +529,17 @@ async fn get_baggage( | |
| Json(args): Json<SpanGetBaggageArgs>, | ||
| ) -> Json<SpanGetBaggageResult> { | ||
| let contexts = state.contexts.lock().unwrap(); | ||
| if let Some(span) = spans.get(&args.span_id) { | ||
| if let Some(ctx) = contexts.get(&args.span_id) { | ||
| debug!("get_baggage: span {} found", args.span_id); | ||
| Json(SpanGetBaggageResult { | ||
| baggage: span.get_baggage_item(&args.key), | ||
| }) | ||
| // Keys may be stored in percent-encoded form; encode the lookup key accordingly. | ||
| let encoded_key = encode_baggage_key(&args.key); | ||
| let value = ctx | ||
| .context | ||
| .baggage() | ||
| .iter() | ||
| .find(|(k, _)| k.as_str() == encoded_key.as_str()) | ||
| .map(|(_, (v, _))| v.as_str().to_string()); | ||
| Json(SpanGetBaggageResult { baggage: value }) | ||
| } else { | ||
| debug!("get_baggage: span {} NOT found", args.span_id); | ||
| Json(SpanGetBaggageResult { baggage: None }) | ||
|
|
@@ -415,11 +551,16 @@ async fn get_all_baggage( | |
| Json(args): Json<SpanGetAllBaggageArgs>, | ||
| ) -> Json<SpanGetAllBaggageResult> { | ||
| let contexts = state.contexts.lock().unwrap(); | ||
| if let Some(span) = spans.get(&args.span_id) { | ||
| if let Some(ctx) = contexts.get(&args.span_id) { | ||
| debug!("get_all_baggage: span {} found", args.span_id); | ||
| Json(SpanGetAllBaggageResult { | ||
| baggage: Some(span.baggage.clone()), | ||
| }) | ||
| // Decode keys back to their original (possibly non-tchar) form. | ||
| let baggage: HashMap<String, String> = ctx | ||
| .context | ||
| .baggage() | ||
| .iter() | ||
| .map(|(k, (v, _))| (decode_baggage_key(k.as_str()), v.as_str().to_string())) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||
| .collect(); | ||
| Json(SpanGetAllBaggageResult { baggage: Some(baggage) }) | ||
| } else { | ||
| debug!("get_all_baggage: span {} NOT found", args.span_id); | ||
| Json(SpanGetAllBaggageResult { baggage: None }) | ||
|
|
@@ -428,9 +569,20 @@ async fn get_all_baggage( | |
|
|
||
| async fn remove_baggage(State(state): State<AppState>, Json(args): Json<SpanRemoveBaggageArgs>) { | ||
| let mut contexts = state.contexts.lock().unwrap(); | ||
| if let Some(span) = spans.get_mut(&args.span_id) { | ||
| if let Some(ctx) = contexts.get(&args.span_id).cloned() { | ||
| debug!("remove_baggage: span {} found", args.span_id); | ||
| span.set_baggage_item(args.key.clone(), None); | ||
| let encoded_key = encode_baggage_key(&args.key); | ||
| let mut new_baggage = opentelemetry::baggage::Baggage::new(); | ||
| for (k, (v, _)) in ctx.context.baggage().iter() { | ||
| if k.as_str() != encoded_key.as_str() { | ||
| new_baggage.insert(k.clone(), v.as_str().to_string()); | ||
| } | ||
| } | ||
| let new_context = ctx.context.with_baggage(new_baggage); | ||
| contexts.insert( | ||
| args.span_id, | ||
| Arc::new(ContextWithParent::new(new_context, ctx.parent.clone())), | ||
| ); | ||
| } else { | ||
| debug!("remove_baggage: span {} NOT found", args.span_id); | ||
| } | ||
|
|
@@ -441,11 +593,11 @@ async fn remove_all_baggage( | |
| Json(args): Json<SpanRemoveAllBaggageArgs>, | ||
| ) { | ||
| let mut contexts = state.contexts.lock().unwrap(); | ||
| if let Some(span) = spans.get_mut(&args.span_id) { | ||
| if let Some(ctx) = contexts.get(&args.span_id).cloned() { | ||
| debug!("remove_all_baggage: span {} found", args.span_id); | ||
| span.baggage.clear(); | ||
| let new_context = ctx.context.with_cleared_baggage(); | ||
| contexts.insert(args.span_id, Arc::new(ContextWithParent::new(new_context, ctx.parent.clone()))); | ||
| } else { | ||
| debug!("remove_all_baggage: span {} NOT found", args.span_id); | ||
| } | ||
| } | ||
| */ | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If those line are not needed, could you remove them ?