diff --git a/Cargo.lock b/Cargo.lock index 8f6f506..6d2f050 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,6 +387,30 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "crypto-auditing-event-monitor" +version = "0.2.4" +dependencies = [ + "anyhow", + "clap", + "crypto-auditing", + "futures", + "hex", + "inotify", + "serde", + "serde_cbor", + "serde_json", + "serde_with", + "tempfile", + "tokio", + "tokio-serde", + "tokio-stream", + "tokio-util", + "toml", + "tracing", + "tracing-subscriber", +] + [[package]] name = "crypto-auditing-log-parser" version = "0.2.4" @@ -395,10 +419,12 @@ dependencies = [ "clap", "crypto-auditing", "hex", + "pager", "serde", "serde_cbor", "serde_json", "serde_with", + "toml", ] [[package]] @@ -510,6 +536,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "errno" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi", +] + [[package]] name = "errno" version = "0.3.14" @@ -520,6 +557,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -1146,6 +1193,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "pager" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2599211a5c97fbbb1061d3dc751fa15f404927e4846e07c643287d6d1f462880" +dependencies = [ + "errno 0.2.8", + "libc", +] + [[package]] name = "pin-project" version = "1.1.10" @@ -1298,7 +1355,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" dependencies = [ "bitflags 2.10.0", - "errno", + "errno 0.3.14", "libc", "linux-raw-sys", "windows-sys 0.61.2", diff --git a/Cargo.toml b/Cargo.toml index 623e422..c926cd8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,8 @@ members = [ "client", "crypto-auditing", "event-broker", - "log-parser" + "log-parser", + "monitor" ] resolver = "2" @@ -32,7 +33,7 @@ openssl = "0.10" page_size = "0.6" probe = "0.5" plain = "0.2" -serde = { version = "1.0", features = ["derive"] } +serde = { version = "1.0", features = ["derive", "rc"] } serde_cbor = "0.11" serde_json = "1.0" serde_with = "3" diff --git a/GNUmakefile b/GNUmakefile index aa946f8..e9dba9e 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -18,12 +18,15 @@ programs = \ ${TARGETDIR}/${PROFILE}/crau-agent \ ${TARGETDIR}/${PROFILE}/crau-client \ ${TARGETDIR}/${PROFILE}/crau-event-broker \ - ${TARGETDIR}/${PROFILE}/crau-log-parser + ${TARGETDIR}/${PROFILE}/crau-query \ + ${TARGETDIR}/${PROFILE}/crau-monitor conffiles = \ dist/conf/agent.conf \ dist/conf/client.conf \ - dist/conf/event-broker.conf + dist/conf/event-broker.conf \ + dist/conf/query.conf \ + dist/conf/monitor.conf .PHONY: all all: $(programs) diff --git a/README.md b/README.md index 32d0fd1..c850141 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ The design documents can be found from the following links: ```console $ git clone --depth=1 -b wip/usdt https://gitlab.com/gnutls/gnutls.git $ ./bootstrap -$ ./configure --prefix=/path/to/installation +$ ./configure --prefix=/path/to/installation --enable-crypto-auditing $ make -j$(nproc) $ sudo make install ``` @@ -40,16 +40,6 @@ $ make $ sudo make install ``` -The first step requires `agent/src/bpf/vmlinux.h` to be populated. By -default it is done through BTF dump from the running kernel with -`bpftool`, but if it is not supported in your system, it is possible -to use `vmlinux.h` included in the `kernel-devel` package: - -```console -$ sudo dnf install kernel-devel -$ cp $(rpm -ql kernel-devel | grep '/vmlinux.h$' | tail -1) agent/src/bpf -``` - ## Running 1. Create dedicated user and group (e.g., crypto-auditing:crypto-auditing) @@ -73,16 +63,14 @@ SocketMode=0660 library = ["/path/to/installation/lib64/libgnutls.so.30"] user = "crypto-auditing:crypto-auditing" ``` -5. Enable agent and event-broker +5. Enable agent ```console $ sudo systemctl daemon-reload $ sudo systemctl start crau-agent.service -$ sudo systemctl start crau-event-broker.socket ``` -6. Connect to event-broker with client +6. Run monitor ```console -$ crau-client --scope tls --format json -$ crau-client --scope tls --format cbor --output audit.cborseq +$ crau-monitor ``` 7. On another terminal, run any commands using the instrumented library ```console diff --git a/crypto-auditing/Cargo.toml b/crypto-auditing/Cargo.toml index 0f8d98d..af232d7 100644 --- a/crypto-auditing/Cargo.toml +++ b/crypto-auditing/Cargo.toml @@ -11,7 +11,7 @@ futures.workspace = true libc.workspace = true serde.workspace = true serde_cbor.workspace = true -serde_with.workspace = true +serde_with = { workspace = true, features = ["hex"] } thiserror.workspace = true tokio = { workspace = true, features = ["net", "rt"] } tokio-serde.workspace = true diff --git a/crypto-auditing/src/types.rs b/crypto-auditing/src/types.rs index e6365f3..095527b 100644 --- a/crypto-auditing/src/types.rs +++ b/crypto-auditing/src/types.rs @@ -1,15 +1,50 @@ // SPDX-License-Identifier: GPL-3.0-or-later // Copyright (C) 2022-2023 The crypto-auditing developers. -use serde::{Deserialize, Serialize}; -use serde_with::serde_as; +use serde::{ + Deserialize, Serialize, + ser::{SerializeSeq, Serializer}, +}; +use serde_with::{hex::Hex, serde_as}; +use std::cell::RefCell; +use std::collections::BTreeMap; use std::ffi::CStr; +use std::rc::Rc; use std::time::Duration; include!(concat!(env!("OUT_DIR"), "/bindings.rs")); pub type ContextID = [u8; 16]; +fn only_values(source: &BTreeMap, serializer: S) -> Result +where + S: Serializer, + V: Serialize, +{ + let mut seq = serializer.serialize_seq(Some(source.len()))?; + for value in source.values() { + seq.serialize_element(value)?; + } + seq.end() +} + +#[serde_as] +#[derive(Debug, Default, Serialize)] +pub struct Context { + #[serde_as(as = "Hex")] + pub context: ContextID, + #[serde_as(as = "Hex")] + pub origin: Vec, + #[serde_as(as = "serde_with::DurationNanoSeconds")] + pub start: Duration, + #[serde_as(as = "serde_with::DurationNanoSeconds")] + pub end: Duration, + pub events: BTreeMap, + #[serde(skip_serializing_if = "BTreeMap::is_empty")] + #[serde(serialize_with = "only_values")] + pub spans: BTreeMap>>, +} + #[serde_as] #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(untagged)] @@ -48,7 +83,7 @@ pub struct EventGroup { events: Vec, } -fn format_context(pid_tgid: u64, context: i64) -> ContextID { +fn format_context_id(pid_tgid: u64, context: i64) -> ContextID { let mut result: ContextID = Default::default(); result[..8].copy_from_slice(&u64::to_le_bytes(pid_tgid)); result[8..].copy_from_slice(&i64::to_le_bytes(context)); @@ -115,13 +150,13 @@ impl EventGroup { pub fn from_bytes(bytes: &[u8]) -> Result> { let header = bytes.as_ptr() as *mut audit_event_header_st; let context = - unsafe { format_context((*header).pid_tgid.into(), (*header).context.into()) }; + unsafe { format_context_id((*header).pid_tgid.into(), (*header).context.into()) }; let ktime = unsafe { Duration::from_nanos((*header).ktime.into()) }; let event = match unsafe { (*header).type_ } { audit_event_type_t::AUDIT_EVENT_NEW_CONTEXT => { let raw_new_context = bytes.as_ptr() as *mut audit_new_context_event_st; let parent = unsafe { - format_context((*header).pid_tgid.into(), (*raw_new_context).parent.into()) + format_context_id((*header).pid_tgid.into(), (*raw_new_context).parent.into()) }; let origin = unsafe { (&(*raw_new_context).origin)[..(*raw_new_context).origin_size as usize].to_vec() diff --git a/dist/conf/monitor.conf b/dist/conf/monitor.conf new file mode 100644 index 0000000..81c10b5 --- /dev/null +++ b/dist/conf/monitor.conf @@ -0,0 +1,2 @@ +# log_file = "/var/log/crypto-auditing/audit.cborseq" +# event_window = 3000 diff --git a/dist/conf/query.conf b/dist/conf/query.conf new file mode 100644 index 0000000..fb4fe91 --- /dev/null +++ b/dist/conf/query.conf @@ -0,0 +1 @@ +# log_file = "/var/log/crypto-auditing/audit.cborseq" diff --git a/log-parser/Cargo.toml b/log-parser/Cargo.toml index 6a7f2be..960c0b1 100644 --- a/log-parser/Cargo.toml +++ b/log-parser/Cargo.toml @@ -11,11 +11,13 @@ anyhow.workspace = true clap = { workspace = true, features=["derive"] } crypto-auditing.workspace = true hex.workspace = true -serde = { workspace = true, features = ["rc"] } +serde.workspace = true serde_cbor.workspace = true serde_json.workspace = true serde_with = { workspace = true, features = ["hex"] } +toml.workspace = true +pager = "0.16" [[bin]] -name = "crau-log-parser" -path = "src/log_parser.rs" +name = "crau-query" +path = "src/query.rs" diff --git a/log-parser/src/config.rs b/log-parser/src/config.rs new file mode 100644 index 0000000..5e6ae1a --- /dev/null +++ b/log-parser/src/config.rs @@ -0,0 +1,89 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +// Copyright (C) 2022-2023 The crypto-auditing developers. + +use anyhow::{Context as _, Result, anyhow}; +use clap::{ArgMatches, arg, command, parser::ValueSource, value_parser}; +use std::fs; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use toml::{Table, Value}; + +const CONFIG: &str = "/etc/crypto-auditing/query.conf"; +const LOG: &str = "/var/log/crypto-auditing/audit.cborseq"; + +#[derive(Debug)] +pub struct Config { + /// Path to output log file + pub log_file: PathBuf, +} + +impl Default for Config { + fn default() -> Self { + Self { + log_file: PathBuf::from(LOG), + } + } +} + +impl Config { + pub fn new() -> Result { + let mut config = Config::default(); + + let matches = command!() + .arg( + arg!( + -c --config "Path to configuration file" + ) + .required(false) + .value_parser(value_parser!(PathBuf)), + ) + .arg( + arg!( + --"log-file" "Path to output log file" + ) + .required(false) + .value_parser(value_parser!(PathBuf)) + .default_value("audit.cborseq"), + ) + .get_matches(); + + if let Some(config_file) = matches.get_one::("config") { + config.merge_config_file(config_file)?; + } else if Path::new(CONFIG).exists() { + config.merge_config_file(CONFIG)?; + } + + config.merge_arg_matches(&matches)?; + + Ok(config) + } + + fn merge_config_file(&mut self, file: impl AsRef) -> Result<()> { + let s = fs::read_to_string(file.as_ref()) + .with_context(|| format!("unable to read config file `{}`", file.as_ref().display()))?; + let config = Table::from_str(&s).with_context(|| { + format!("unable to parse config file `{}`", file.as_ref().display()) + })?; + + if let Some(value) = config.get("log_file") { + self.log_file = pathbuf_from_value(value)?; + } + + Ok(()) + } + + fn merge_arg_matches(&mut self, matches: &ArgMatches) -> Result<()> { + if let Some(ValueSource::CommandLine) = matches.value_source("log-file") { + self.log_file = matches.try_get_one::("log-file")?.unwrap().clone(); + } + + Ok(()) + } +} + +fn pathbuf_from_value(value: &Value) -> Result { + value + .as_str() + .ok_or_else(|| anyhow!("value must be string")) + .map(PathBuf::from) +} diff --git a/log-parser/src/log_parser.rs b/log-parser/src/query.rs similarity index 64% rename from log-parser/src/log_parser.rs rename to log-parser/src/query.rs index b4dbe23..3620063 100644 --- a/log-parser/src/log_parser.rs +++ b/log-parser/src/query.rs @@ -2,59 +2,22 @@ // Copyright (C) 2022-2023 The crypto-auditing developers. use anyhow::{Context as _, Result}; -use clap::Parser; -use crypto_auditing::types::{ContextID, Event, EventData, EventGroup}; -use serde::Serialize; -use serde::ser::{SerializeSeq, Serializer}; +use crypto_auditing::types::{Context, ContextID, Event, EventGroup}; +use pager::Pager; use serde_cbor::de::Deserializer; -use serde_with::{hex::Hex, serde_as}; use std::cell::RefCell; use std::collections::BTreeMap; -use std::path::PathBuf; +use std::io::{self, Write}; use std::rc::Rc; -use std::time::Duration; -fn only_values(source: &BTreeMap, serializer: S) -> Result -where - S: Serializer, - V: Serialize, -{ - let mut seq = serializer.serialize_seq(Some(source.len()))?; - for value in source.values() { - seq.serialize_element(value)?; - } - seq.end() -} - -#[serde_as] -#[derive(Default, Serialize)] -struct Context { - #[serde_as(as = "Hex")] - context: ContextID, - #[serde_as(as = "Hex")] - origin: Vec, - #[serde_as(as = "serde_with::DurationNanoSeconds")] - start: Duration, - #[serde_as(as = "serde_with::DurationNanoSeconds")] - end: Duration, - events: BTreeMap, - #[serde(skip_serializing_if = "BTreeMap::is_empty")] - #[serde(serialize_with = "only_values")] - spans: BTreeMap>>, -} - -#[derive(Parser)] -#[command(author, version, about, long_about = None)] -#[command(about = "Primary log parser for crypto-auditing")] -struct Cli { - /// Path to log file to parse - log_path: PathBuf, -} +mod config; fn main() -> Result<(), Box> { - let cli = Cli::parse(); - let log_file = std::fs::File::open(&cli.log_path) - .with_context(|| format!("unable to read file `{}`", cli.log_path.display()))?; + let config = config::Config::new()?; + Pager::new().setup(); + + let log_file = std::fs::File::open(&config.log_file) + .with_context(|| format!("unable to read file `{}`", config.log_file.display()))?; let mut all_contexts: BTreeMap>> = BTreeMap::new(); let mut root_contexts = Vec::new(); for group in Deserializer::from_reader(&log_file).into_iter::() { @@ -107,6 +70,11 @@ fn main() -> Result<(), Box> { } } } - println!("{}", serde_json::to_string_pretty(&root_contexts).unwrap()); + let content = serde_json::to_string_pretty(&root_contexts)?; + if let Err(e) = io::stdout().write_all(content.as_bytes()) { + if e.kind() != io::ErrorKind::BrokenPipe { + return Err(Box::new(e)); + } + } Ok(()) } diff --git a/monitor/Cargo.toml b/monitor/Cargo.toml new file mode 100644 index 0000000..67e9377 --- /dev/null +++ b/monitor/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "crypto-auditing-event-monitor" +description = "Event monitor for crypto-auditing project" +version.workspace = true +edition.workspace = true +license.workspace = true +authors.workspace = true + +[dependencies] +anyhow.workspace = true +clap = { workspace = true, features = ["cargo", "derive"] } +crypto-auditing.workspace = true +futures.workspace = true +hex.workspace = true +inotify.workspace = true +serde = { workspace = true, features = ["rc"] } +serde_cbor.workspace = true +serde_json.workspace = true +serde_with = { workspace = true, features = ["hex"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } +tokio-serde.workspace = true +tokio-stream.workspace = true +tokio-util.workspace = true +toml.workspace = true +tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing.workspace = true + +[dev-dependencies] +tempfile.workspace = true + +[[bin]] +name = "crau-monitor" +path = "src/monitor.rs" diff --git a/monitor/src/config.rs b/monitor/src/config.rs new file mode 100644 index 0000000..2ff0623 --- /dev/null +++ b/monitor/src/config.rs @@ -0,0 +1,160 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +// Copyright (C) 2022-2025 The crypto-auditing developers. + +use anyhow::{Context as _, Result, anyhow}; +use clap::{ArgAction, ArgMatches, arg, command, parser::ValueSource, value_parser}; +use std::fs; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::time::Duration; +use toml::{Table, Value}; + +const CONFIG: &str = "/etc/crypto-auditing/monitor.conf"; +const LOG: &str = "/var/log/crypto-auditing/audit.cborseq"; + +#[derive(Debug)] +pub struct Config { + /// Path to output log file + pub log_file: PathBuf, + + /// Scope to match + pub scope: Vec, + + /// Event window + pub event_window: Duration, +} + +impl Default for Config { + fn default() -> Self { + Self { + log_file: PathBuf::from(LOG), + scope: Vec::default(), + event_window: Duration::from_secs(3), + } + } +} + +fn parse_duration(arg: &str) -> Result { + let milliseconds = arg.parse()?; + Ok(Duration::from_millis(milliseconds)) +} + +impl Config { + pub fn new() -> Result { + let mut config = Config::default(); + + let matches = command!() + .arg( + arg!( + -c --config "Path to configuration file" + ) + .required(false) + .value_parser(value_parser!(PathBuf)), + ) + .arg( + arg!( + --"log-file" "Path to output log file" + ) + .required(false) + .value_parser(value_parser!(PathBuf)) + .default_value("audit.cborseq"), + ) + .arg( + arg!( + --scope "Scope to restrict matches" + ) + .required(false) + .value_parser(value_parser!(String)) + .action(ArgAction::Append), + ) + .arg( + arg!( + --"event-window" "Event window" + ) + .required(false) + .value_parser(parse_duration), + ) + .get_matches(); + + if let Some(config_file) = matches.get_one::("config") { + config.merge_config_file(config_file)?; + } else if Path::new(CONFIG).exists() { + config.merge_config_file(CONFIG)?; + } + + config.merge_arg_matches(&matches)?; + + Ok(config) + } + + fn merge_config_file(&mut self, file: impl AsRef) -> Result<()> { + let s = fs::read_to_string(file.as_ref()) + .with_context(|| format!("unable to read config file `{}`", file.as_ref().display()))?; + let config = Table::from_str(&s).with_context(|| { + format!("unable to parse config file `{}`", file.as_ref().display()) + })?; + + if let Some(value) = config.get("log_file") { + self.log_file = pathbuf_from_value(value)?; + } + + if let Some(value) = config.get("scope") { + self.scope = string_array_from_value(value)?; + } + + if let Some(value) = config.get("event_window") { + self.event_window = duration_millis_from_value(value)?; + } + + Ok(()) + } + + fn merge_arg_matches(&mut self, matches: &ArgMatches) -> Result<()> { + if let Some(ValueSource::CommandLine) = matches.value_source("log-file") { + self.log_file = matches.try_get_one::("log-file")?.unwrap().clone(); + } + + if let Some(ValueSource::CommandLine) = matches.value_source("event-window") { + self.event_window = *matches.try_get_one::("event-window")?.unwrap(); + } + + if let Some(ValueSource::CommandLine) = matches.value_source("scope") { + self.scope = matches.try_get_many("scope")?.unwrap().cloned().collect(); + } + + Ok(()) + } +} + +fn string_array_from_value(value: &Value) -> Result> { + value + .as_array() + .ok_or_else(|| anyhow!("value must be array")) + .and_then(|array| { + array + .iter() + .map(string_from_value) + .collect::>>() + }) +} + +fn string_from_value(value: &Value) -> Result { + value + .as_str() + .ok_or_else(|| anyhow!("value must be string")) + .map(|v| v.to_string()) +} + +fn pathbuf_from_value(value: &Value) -> Result { + value + .as_str() + .ok_or_else(|| anyhow!("value must be string")) + .map(PathBuf::from) +} + +fn duration_millis_from_value(value: &Value) -> Result { + value + .as_integer() + .ok_or_else(|| anyhow!("value must be duration in milliseconds")) + .map(|v| Duration::from_millis(v as u64)) +} diff --git a/monitor/src/monitor.rs b/monitor/src/monitor.rs new file mode 100644 index 0000000..0c7ea83 --- /dev/null +++ b/monitor/src/monitor.rs @@ -0,0 +1,246 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +// Copyright (C) 2022-2025 The crypto-auditing developers. + +use anyhow::{Context as _, Result}; +use crypto_auditing::types::{Context, ContextID, Event, EventGroup}; +use futures::{Stream, stream::StreamExt, try_join}; +use inotify::{EventMask, EventStream, Inotify, WatchDescriptor, WatchMask}; +use serde_cbor::de::Deserializer; +use std::cell::RefCell; +use std::collections::BTreeMap; +use std::fs; +use std::marker; +use std::path::{Path, PathBuf}; +use std::rc::Rc; +use std::time::{Duration, Instant}; +use tokio::signal; +use tokio::sync::{broadcast, mpsc}; +use tokio::task::JoinSet; +use tokio::time::sleep; +use tokio_stream::wrappers::ReceiverStream; +use tracing::info; +use tracing_subscriber::{EnvFilter, fmt, prelude::*}; + +mod config; + +struct Reader { + log_file: PathBuf, + inotify_stream: EventStream>, + watch_descriptor: WatchDescriptor, +} + +impl Reader { + fn new(log_file: impl AsRef) -> Result { + let log_file = log_file.as_ref().to_path_buf(); + let inotify = + Inotify::init().with_context(|| "unable to initialize inotify".to_string())?; + let buffer = vec![0; 1024]; + let inotify_stream = inotify.into_event_stream(buffer)?; + let watch_descriptor = inotify_stream + .watches() + .add(&log_file, WatchMask::MODIFY | WatchMask::CREATE) + .with_context(|| format!("unable to start monitoring {}", log_file.display()))?; + Ok(Self { + log_file, + inotify_stream, + watch_descriptor, + }) + } + + async fn read( + &mut self, + event_sender: &mpsc::Sender, + shutdown_receiver: &mut broadcast::Receiver<()>, + ) -> Result<()> { + let mut file = fs::File::open(&self.log_file) + .with_context(|| format!("unable to open {}", self.log_file.display()))?; + + loop { + tokio::select! { + Some(event_or_error) = self.inotify_stream.next() => { + let event = event_or_error?; + if event.mask.contains(EventMask::CREATE) { + file = fs::File::open(&self.log_file).with_context(|| { + format!("unable to read file `{}`", self.log_file.display()) + })?; + } + for group in Deserializer::from_reader(&mut file).into_iter::() { + event_sender.send(group?).await? + } + }, + _ = shutdown_receiver.recv() => break, + } + } + + Ok(()) + } +} + +impl Drop for Reader { + fn drop(&mut self) { + if let Err(e) = self + .inotify_stream + .watches() + .remove(self.watch_descriptor.clone()) + { + info!(error = %e, "unable to stop monitoring {}", self.log_file.display()); + } else { + info!("disabled monitoring of {}", self.log_file.display()); + } + } +} + +#[derive(Debug)] +struct Writer { + all_contexts: BTreeMap>>, + root_contexts: Vec<(Instant, Rc>)>, + event_window: Duration, + scopes: Vec, + timeouts: JoinSet<()>, +} + +impl Writer { + fn new(event_window: Duration, scopes: &Vec) -> Self { + let all_contexts: BTreeMap>> = BTreeMap::new(); + let root_contexts = Vec::new(); + Self { + all_contexts, + root_contexts, + event_window, + scopes: scopes.to_owned(), + timeouts: JoinSet::new(), + } + } + + async fn handle_event_group(&mut self, group: &EventGroup) -> Result<()> { + let mut group = group.clone(); + if !self.scopes.is_empty() { + group.events_filtered(&self.scopes); + } + for event in group.events() { + match event { + Event::NewContext { + parent: parent_context, + origin, + } => { + let context = Rc::new(RefCell::new(Context { + context: *group.context(), + origin: origin.to_owned(), + start: group.start(), + end: group.end(), + ..Default::default() + })); + if let Some(parent) = self.all_contexts.get(&parent_context[..]) { + parent + .borrow_mut() + .spans + .insert(*group.context(), context.clone()); + } else { + self.root_contexts.push((Instant::now(), context.clone())); + self.timeouts.spawn(sleep(self.event_window)); + } + self.all_contexts.insert(*group.context(), context); + } + Event::Data { key, value } => { + if !self.all_contexts.contains_key(group.context()) { + // Either this library did not do a new_context for this context, or the + // log we have is truncated at the beginning. Just assume that this context + // has no parent and create a new one so we don't loose the information in + // this message. + let context_obj = Rc::new(RefCell::new(Context { + context: *group.context(), + start: group.start(), + end: group.end(), + ..Default::default() + })); + self.root_contexts + .push((Instant::now(), context_obj.clone())); + self.all_contexts.insert(*group.context(), context_obj); + } + if let Some(parent) = self.all_contexts.get(group.context()) { + parent + .borrow_mut() + .events + .insert(key.to_string(), value.clone()); + } + } + } + } + + Ok(()) + } + + async fn write( + &mut self, + mut event_stream: impl Stream + marker::Unpin, + shutdown_receiver: &mut broadcast::Receiver<()>, + ) -> Result<()> { + loop { + tokio::select! { + Some(ref group) = event_stream.next() => { + self.handle_event_group( + group, + ).await? + }, + Some(_) = self.timeouts.join_next() => { + self.root_contexts.retain(|(instant, context)| { + if instant.elapsed() > self.event_window { + println!("{}", serde_json::to_string_pretty(context).unwrap()); + return false; + } + true + }); + }, + _ = shutdown_receiver.recv() => break, + } + } + + Ok(()) + } +} + +async fn shutdown( + shutdown_receiver: &mut broadcast::Receiver<()>, + shutdown_sender: &broadcast::Sender<()>, +) -> Result<()> { + tokio::select! { + maybe_value = signal::ctrl_c() => { + if let Err(e) = maybe_value { + info!(error = %e, "error receiving ctrl-c") + } + info!("shutting down event broker"); + if let Err(e) = shutdown_sender.send(()) { + info!(error = %e, "unable to send shutdown"); + } + }, + _ = shutdown_receiver.recv() => (), + } + Ok(()) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let config = config::Config::new()?; + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .try_init()?; + + let mut reader = Reader::new(&config.log_file)?; + let mut writer = Writer::new(config.event_window, &config.scope); + + let (event_tx, event_rx) = mpsc::channel::(10); + let mut event_rx = ReceiverStream::new(event_rx); + + let (shutdown_tx, mut shutdown_rx1) = broadcast::channel::<()>(2); + let mut shutdown_rx2 = shutdown_tx.subscribe(); + let mut shutdown_rx3 = shutdown_tx.subscribe(); + + try_join!( + shutdown(&mut shutdown_rx1, &shutdown_tx), + reader.read(&event_tx, &mut shutdown_rx2), + writer.write(&mut event_rx, &mut shutdown_rx3), + ) + .map(|_| ()) +}