Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions datadog-sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,22 @@ pub unsafe extern "C" fn ddog_get_agent_info_env<'a>(
.unwrap_or(ffi::CharSlice::empty())
}

/// Gets the container tags hash from agent info (or empty if not existing)
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_get_agent_info_container_tags_hash<'a>(
reader: &'a mut AgentInfoReader,
changed: &mut bool,
) -> ffi::CharSlice<'a> {
let (has_changed, info) = reader.read();
*changed = has_changed;

info.as_ref()
.and_then(|i| i.container_tags_hash.as_ref())
.map(|s| ffi::CharSlice::from(s.as_str()))
.unwrap_or(ffi::CharSlice::empty())
}

#[macro_export]
macro_rules! check {
($failable:expr, $msg:expr) => {
Expand Down
96 changes: 85 additions & 11 deletions libdd-data-pipeline/src/agent_info/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
//! Provides utilities to fetch the agent /info endpoint and an automatic fetcher to keep info
//! up-to-date

use super::{schema::AgentInfo, AGENT_INFO_CACHE};
use super::{
schema::{AgentInfo, AgentInfoStruct},
AGENT_INFO_CACHE,
};
use anyhow::{anyhow, Result};
use http::header::HeaderName;
use http_body_util::BodyExt;
Expand All @@ -26,28 +29,47 @@ pub enum FetchInfoStatus {
NewState(Box<AgentInfo>),
}

/// Fetch info from the given info_endpoint and compare its state to the current state hash.
/// Fetch info from the given endpoint and compare state-related hashes.
///
/// If the state hash is different from the current one:
/// If either the agent state hash or container tags hash is different from the current one:
/// - Return a `FetchInfoStatus::NewState` of the info struct
/// - Else return `FetchInfoStatus::SameState`
pub async fn fetch_info_with_state(
async fn fetch_info_with_state_and_container_tags(
info_endpoint: &Endpoint,
current_state_hash: Option<&str>,
current_container_tags_hash: Option<&str>,
) -> Result<FetchInfoStatus> {
let (new_state_hash, body_data) = fetch_and_hash_response(info_endpoint).await?;
let (new_state_hash, body_data, container_tags_hash) =
fetch_and_hash_response(info_endpoint).await?;

if current_state_hash.is_some_and(|state| state == new_state_hash) {
if current_state_hash.is_some_and(|state| state == new_state_hash)
&& current_container_tags_hash == container_tags_hash.as_deref()
{
return Ok(FetchInfoStatus::SameState);
}

let mut info_struct: AgentInfoStruct = serde_json::from_slice(&body_data)?;
info_struct.container_tags_hash = container_tags_hash;

let info = Box::new(AgentInfo {
state_hash: new_state_hash,
info: serde_json::from_slice(&body_data)?,
info: info_struct,
});
Ok(FetchInfoStatus::NewState(info))
}

/// Fetch info from the given info_endpoint and compare its state to the current state hash.
///
/// If the state hash is different from the current one:
/// - Return a `FetchInfoStatus::NewState` of the info struct
/// - Else return `FetchInfoStatus::SameState`
pub async fn fetch_info_with_state(
info_endpoint: &Endpoint,
current_state_hash: Option<&str>,
) -> Result<FetchInfoStatus> {
fetch_info_with_state_and_container_tags(info_endpoint, current_state_hash, None).await
}

/// Fetch the info endpoint once and return the info.
///
/// Can be used for one-time access to the agent's info. If you need to access the info several
Expand Down Expand Up @@ -78,21 +100,30 @@ pub async fn fetch_info(info_endpoint: &Endpoint) -> Result<Box<AgentInfo>> {

/// Fetch and hash the response from the agent info endpoint.
///
/// Returns a tuple of (state_hash, response_body_bytes).
/// Returns a tuple of (state_hash, response_body_bytes, container_tags_hash).
/// The hash is calculated using SHA256 to match the agent's calculation method.
async fn fetch_and_hash_response(info_endpoint: &Endpoint) -> Result<(String, bytes::Bytes)> {
async fn fetch_and_hash_response(
info_endpoint: &Endpoint,
) -> Result<(String, bytes::Bytes, Option<String>)> {
let req = info_endpoint
.to_request_builder(concat!("Libdatadog/", env!("CARGO_PKG_VERSION")))?
.method(http::Method::GET)
.body(http_common::Body::empty());
let client = http_common::new_default_client();
let res = client.request(req?).await?;

// Extract the Datadog-Container-Tags-Hash header
let container_tags_hash = res
.headers()
.get("Datadog-Container-Tags-Hash")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());

let body_bytes = res.into_body().collect().await?;
let body_data = body_bytes.to_bytes();
let hash = format!("{:x}", Sha256::digest(&body_data));

Ok((hash, body_data))
Ok((hash, body_data, container_tags_hash))
}

/// Fetch the info endpoint and update an ArcSwap keeping it up-to-date.
Expand Down Expand Up @@ -224,7 +255,15 @@ impl AgentInfoFetcher {
async fn fetch_and_update(&self) {
let current_info = AGENT_INFO_CACHE.load();
let current_hash = current_info.as_ref().map(|info| info.state_hash.as_str());
let res = fetch_info_with_state(&self.info_endpoint, current_hash).await;
let current_container_tags_hash = current_info
.as_ref()
.and_then(|info| info.info.container_tags_hash.as_deref());
let res = fetch_info_with_state_and_container_tags(
&self.info_endpoint,
current_hash,
current_container_tags_hash,
)
.await;
match res {
Ok(FetchInfoStatus::NewState(new_info)) => {
debug!("New /info state received");
Expand Down Expand Up @@ -406,6 +445,41 @@ mod single_threaded_tests {
assert!(matches!(same_state_info_status, FetchInfoStatus::SameState));
}

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn test_fetch_info_with_same_state_but_different_container_tags_hash() {
let server = MockServer::start();
let mock = server
.mock_async(|when, then| {
when.path("/info");
then.status(200)
.header("content-type", "application/json")
.header("Datadog-Container-Tags-Hash", "new-container-hash")
.body(TEST_INFO);
})
.await;
let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());

let info_status = fetch_info_with_state_and_container_tags(
&endpoint,
Some(TEST_INFO_HASH),
Some("old-container-hash"),
)
.await
.unwrap();

mock.assert();
assert!(
matches!(info_status, FetchInfoStatus::NewState(info) if *info == AgentInfo {
state_hash: TEST_INFO_HASH.to_string(),
info: AgentInfoStruct {
container_tags_hash: Some("new-container-hash".to_string()),
..serde_json::from_str(TEST_INFO).unwrap()
},
})
);
}

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn test_fetch_info() {
Expand Down
2 changes: 2 additions & 0 deletions libdd-data-pipeline/src/agent_info/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct AgentInfoStruct {
pub peer_tags: Option<Vec<String>>,
/// List of span kinds eligible for stats computation
pub span_kinds_stats_computed: Option<Vec<String>>,
/// Container tags hash from HTTP response header
pub container_tags_hash: Option<String>,
}

#[allow(missing_docs)]
Expand Down
Loading