diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 78bab1d2e6..ec5bd10bbd 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -1342,6 +1342,22 @@ pub unsafe extern "C" fn ddog_get_agent_info_env<'a>( .unwrap_or(ffi::CharSlice::empty()) } +/// Gets the container tags hash from agent info (or empty if not existing) +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_get_agent_info_container_tags_hash<'a>( + reader: &'a mut AgentInfoReader, + changed: &mut bool, +) -> ffi::CharSlice<'a> { + let (has_changed, info) = reader.read(); + *changed = has_changed; + + info.as_ref() + .and_then(|i| i.container_tags_hash.as_ref()) + .map(|s| ffi::CharSlice::from(s.as_str())) + .unwrap_or(ffi::CharSlice::empty()) +} + #[macro_export] macro_rules! check { ($failable:expr, $msg:expr) => { diff --git a/libdd-data-pipeline/src/agent_info/fetcher.rs b/libdd-data-pipeline/src/agent_info/fetcher.rs index 9bd7200288..fa2304df1b 100644 --- a/libdd-data-pipeline/src/agent_info/fetcher.rs +++ b/libdd-data-pipeline/src/agent_info/fetcher.rs @@ -3,7 +3,10 @@ //! Provides utilities to fetch the agent /info endpoint and an automatic fetcher to keep info //! up-to-date -use super::{schema::AgentInfo, AGENT_INFO_CACHE}; +use super::{ + schema::{AgentInfo, AgentInfoStruct}, + AGENT_INFO_CACHE, +}; use anyhow::{anyhow, Result}; use http::header::HeaderName; use http_body_util::BodyExt; @@ -26,28 +29,47 @@ pub enum FetchInfoStatus { NewState(Box), } -/// Fetch info from the given info_endpoint and compare its state to the current state hash. +/// Fetch info from the given endpoint and compare state-related hashes. /// -/// If the state hash is different from the current one: +/// If either the agent state hash or container tags hash is different from the current one: /// - Return a `FetchInfoStatus::NewState` of the info struct /// - Else return `FetchInfoStatus::SameState` -pub async fn fetch_info_with_state( +async fn fetch_info_with_state_and_container_tags( info_endpoint: &Endpoint, current_state_hash: Option<&str>, + current_container_tags_hash: Option<&str>, ) -> Result { - let (new_state_hash, body_data) = fetch_and_hash_response(info_endpoint).await?; + let (new_state_hash, body_data, container_tags_hash) = + fetch_and_hash_response(info_endpoint).await?; - if current_state_hash.is_some_and(|state| state == new_state_hash) { + if current_state_hash.is_some_and(|state| state == new_state_hash) + && current_container_tags_hash == container_tags_hash.as_deref() + { return Ok(FetchInfoStatus::SameState); } + let mut info_struct: AgentInfoStruct = serde_json::from_slice(&body_data)?; + info_struct.container_tags_hash = container_tags_hash; + let info = Box::new(AgentInfo { state_hash: new_state_hash, - info: serde_json::from_slice(&body_data)?, + info: info_struct, }); Ok(FetchInfoStatus::NewState(info)) } +/// Fetch info from the given info_endpoint and compare its state to the current state hash. +/// +/// If the state hash is different from the current one: +/// - Return a `FetchInfoStatus::NewState` of the info struct +/// - Else return `FetchInfoStatus::SameState` +pub async fn fetch_info_with_state( + info_endpoint: &Endpoint, + current_state_hash: Option<&str>, +) -> Result { + fetch_info_with_state_and_container_tags(info_endpoint, current_state_hash, None).await +} + /// Fetch the info endpoint once and return the info. /// /// Can be used for one-time access to the agent's info. If you need to access the info several @@ -78,9 +100,11 @@ pub async fn fetch_info(info_endpoint: &Endpoint) -> Result> { /// Fetch and hash the response from the agent info endpoint. /// -/// Returns a tuple of (state_hash, response_body_bytes). +/// Returns a tuple of (state_hash, response_body_bytes, container_tags_hash). /// The hash is calculated using SHA256 to match the agent's calculation method. -async fn fetch_and_hash_response(info_endpoint: &Endpoint) -> Result<(String, bytes::Bytes)> { +async fn fetch_and_hash_response( + info_endpoint: &Endpoint, +) -> Result<(String, bytes::Bytes, Option)> { let req = info_endpoint .to_request_builder(concat!("Libdatadog/", env!("CARGO_PKG_VERSION")))? .method(http::Method::GET) @@ -88,11 +112,18 @@ async fn fetch_and_hash_response(info_endpoint: &Endpoint) -> Result<(String, by let client = http_common::new_default_client(); let res = client.request(req?).await?; + // Extract the Datadog-Container-Tags-Hash header + let container_tags_hash = res + .headers() + .get("Datadog-Container-Tags-Hash") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + let body_bytes = res.into_body().collect().await?; let body_data = body_bytes.to_bytes(); let hash = format!("{:x}", Sha256::digest(&body_data)); - Ok((hash, body_data)) + Ok((hash, body_data, container_tags_hash)) } /// Fetch the info endpoint and update an ArcSwap keeping it up-to-date. @@ -224,7 +255,15 @@ impl AgentInfoFetcher { async fn fetch_and_update(&self) { let current_info = AGENT_INFO_CACHE.load(); let current_hash = current_info.as_ref().map(|info| info.state_hash.as_str()); - let res = fetch_info_with_state(&self.info_endpoint, current_hash).await; + let current_container_tags_hash = current_info + .as_ref() + .and_then(|info| info.info.container_tags_hash.as_deref()); + let res = fetch_info_with_state_and_container_tags( + &self.info_endpoint, + current_hash, + current_container_tags_hash, + ) + .await; match res { Ok(FetchInfoStatus::NewState(new_info)) => { debug!("New /info state received"); @@ -406,6 +445,41 @@ mod single_threaded_tests { assert!(matches!(same_state_info_status, FetchInfoStatus::SameState)); } + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_fetch_info_with_same_state_but_different_container_tags_hash() { + let server = MockServer::start(); + let mock = server + .mock_async(|when, then| { + when.path("/info"); + then.status(200) + .header("content-type", "application/json") + .header("Datadog-Container-Tags-Hash", "new-container-hash") + .body(TEST_INFO); + }) + .await; + let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap()); + + let info_status = fetch_info_with_state_and_container_tags( + &endpoint, + Some(TEST_INFO_HASH), + Some("old-container-hash"), + ) + .await + .unwrap(); + + mock.assert(); + assert!( + matches!(info_status, FetchInfoStatus::NewState(info) if *info == AgentInfo { + state_hash: TEST_INFO_HASH.to_string(), + info: AgentInfoStruct { + container_tags_hash: Some("new-container-hash".to_string()), + ..serde_json::from_str(TEST_INFO).unwrap() + }, + }) + ); + } + #[cfg_attr(miri, ignore)] #[tokio::test] async fn test_fetch_info() { diff --git a/libdd-data-pipeline/src/agent_info/schema.rs b/libdd-data-pipeline/src/agent_info/schema.rs index accfaa0719..6023bef01a 100644 --- a/libdd-data-pipeline/src/agent_info/schema.rs +++ b/libdd-data-pipeline/src/agent_info/schema.rs @@ -35,6 +35,8 @@ pub struct AgentInfoStruct { pub peer_tags: Option>, /// List of span kinds eligible for stats computation pub span_kinds_stats_computed: Option>, + /// Container tags hash from HTTP response header + pub container_tags_hash: Option, } #[allow(missing_docs)]