From 0a44635aa37388dc089ce81b26c858093d94be4b Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 6 Mar 2026 15:16:56 +0100 Subject: [PATCH] ref(relay): Remove old error processing pipeline --- CHANGELOG.md | 1 + relay-dynamic-config/src/feature.rs | 3 - .../src/processing/errors/errors/nswitch.rs | 3 - relay-server/src/processing/utils/event.rs | 14 +- relay-server/src/services/processor.rs | 171 +-------- .../src/services/processor/attachment.rs | 45 --- .../services/processor/dynamic_sampling.rs | 317 ---------------- relay-server/src/services/processor/event.rs | 216 +---------- .../src/services/processor/nnswitch.rs | 344 ------------------ .../src/services/processor/playstation.rs | 151 +------- relay-server/src/services/processor/report.rs | 6 +- relay-server/src/services/processor/unreal.rs | 54 --- relay-server/src/utils/dynamic_sampling.rs | 101 +---- relay-server/src/utils/unreal.rs | 68 +--- tests/integration/fixtures/mini_sentry.py | 1 - tests/integration/test_playstation.py | 1 - 16 files changed, 31 insertions(+), 1465 deletions(-) delete mode 100644 relay-server/src/services/processor/attachment.rs delete mode 100644 relay-server/src/services/processor/dynamic_sampling.rs delete mode 100644 relay-server/src/services/processor/nnswitch.rs delete mode 100644 relay-server/src/services/processor/unreal.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 1790d1b5d94..a8e04ba9f6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ **Features**: +- Transition error processing pipeline to a more modern implementation. ([#5702](https://github.com/getsentry/relay/pull/5702)) - Set `sentry.segment.id` and `sentry.segment.name` attributes on OTLP segment spans. ([#5748](https://github.com/getsentry/relay/pull/5748)) - Envelope buffer: Add option to disable flush-to-disk on shutdown. ([#5751](https://github.com/getsentry/relay/pull/5751)) - Allow configuring Objectstore client auth parameters. ([#5720](https://github.com/getsentry/relay/pull/5720)) diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index 65e499501b5..78fd341cabf 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -116,9 +116,6 @@ pub enum Feature { /// Enable the upload endpoint for attachments. #[serde(rename = "projects:relay-upload-endpoint")] UploadEndpoint, - /// Enable the new Error processing pipeline in Relay. - #[serde(rename = "organizations:relay-new-error-processing")] - NewErrorProcessing, /// Upload non-prosperodmp playstation attachments via the upload-endpoint. #[serde(rename = "projects:relay-playstation-uploads")] PlaystationUploads, diff --git a/relay-server/src/processing/errors/errors/nswitch.rs b/relay-server/src/processing/errors/errors/nswitch.rs index d48008e5874..688f9668fe3 100644 --- a/relay-server/src/processing/errors/errors/nswitch.rs +++ b/relay-server/src/processing/errors/errors/nswitch.rs @@ -120,9 +120,6 @@ impl Counted for Nswitch { /// An error returned when parsing the dying message attachment. #[derive(Debug, thiserror::Error)] pub enum SwitchProcessingError { - #[error("invalid json")] - #[cfg_attr(not(feature = "processing"), expect(unused))] - InvalidJson(#[source] serde_json::Error), #[error("envelope parsing failed")] EnvelopeParsing(#[from] EnvelopeError), #[error("unexpected EOF, expected {expected:?}")] diff --git a/relay-server/src/processing/utils/event.rs b/relay-server/src/processing/utils/event.rs index eaa9764dd34..795899a1671 100644 --- a/relay-server/src/processing/utils/event.rs +++ b/relay-server/src/processing/utils/event.rs @@ -33,7 +33,7 @@ use relay_statsd::metric; use crate::constants::DEFAULT_EVENT_RETENTION; use crate::envelope::AttachmentType; -use crate::envelope::{Envelope, EnvelopeHeaders, Item}; +use crate::envelope::{EnvelopeHeaders, Item}; use crate::processing::Context; use crate::services::processor::{MINIMUM_CLOCK_DRIFT, ProcessingError}; use crate::services::projects::project::ProjectInfo; @@ -391,18 +391,6 @@ pub fn filter( #[derive(Debug, Copy, Clone)] pub struct EventFullyNormalized(pub bool); -impl EventFullyNormalized { - /// Returns `true` if the event is fully normalized, `false` otherwise. - pub fn new(envelope: &Envelope) -> Self { - let event_fully_normalized = envelope.meta().request_trust().is_trusted() - && envelope - .items() - .any(|item| item.creates_event() && item.fully_normalized()); - - Self(event_fully_normalized) - } -} - /// New type representing whether metrics were extracted from transactions/spans. #[derive(Debug, Copy, Clone)] pub struct EventMetricsExtracted(pub bool); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index d09d4273dd9..31719d60377 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -23,9 +23,7 @@ use relay_config::{Config, HttpEncoding, RelayMode}; use relay_dynamic_config::{Feature, GlobalConfig}; use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup}; use relay_event_schema::processor::ProcessingAction; -use relay_event_schema::protocol::{ - ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2, -}; +use relay_event_schema::protocol::{ClientReport, Event, EventId, NetworkReportError, SpanV2}; use relay_filter::FilterStatKey; use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace}; use relay_protocol::Annotated; @@ -37,9 +35,7 @@ use reqwest::header; use smallvec::{SmallVec, smallvec}; use zstd::stream::Encoder as ZstdEncoder; -use crate::envelope::{ - self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType, -}; +use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType}; use crate::extractors::{PartialDsn, RequestMeta, RequestTrust}; use crate::integrations::Integration; use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope}; @@ -60,10 +56,7 @@ use crate::processing::trace_attachments::TraceAttachmentsProcessor; use crate::processing::trace_metrics::TraceMetricsProcessor; use crate::processing::transactions::TransactionProcessor; use crate::processing::user_reports::UserReportsProcessor; -use crate::processing::utils::event::{ - EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category, - event_type, -}; +use crate::processing::utils::event::event_category; use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter}; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; @@ -95,8 +88,6 @@ use { symbolic_unreal::{Unreal4Error, Unreal4ErrorKind}, }; -mod attachment; -mod dynamic_sampling; pub mod event; mod metrics; mod nel; @@ -107,11 +98,6 @@ mod span; #[cfg(all(sentry, feature = "processing"))] pub mod playstation; mod standalone; -#[cfg(feature = "processing")] -mod unreal; - -#[cfg(feature = "processing")] -mod nnswitch; /// Creates the block only if used with `processing` feature. /// @@ -177,16 +163,9 @@ macro_rules! processing_group { }; } -/// A marker trait. -/// -/// Should be used only with groups which are responsible for processing envelopes with events. -pub trait EventProcessing {} - processing_group!(TransactionGroup, Transaction); -impl EventProcessing for TransactionGroup {} processing_group!(ErrorGroup, Error); -impl EventProcessing for ErrorGroup {} processing_group!(SessionGroup, Session); processing_group!( @@ -1196,6 +1175,7 @@ struct InnerProcessor { addrs: Addrs, #[cfg(feature = "processing")] rate_limiter: Option>, + #[cfg(feature = "processing")] geoip_lookup: GeoIpLookup, #[cfg(feature = "processing")] cardinality_limiter: Option, @@ -1295,6 +1275,8 @@ impl EnvelopeProcessorService { }) .map(CardinalityLimiter::new), metric_outcomes, + #[cfg(feature = "processing")] + geoip_lookup: geoip_lookup.clone(), processing: Processing { errors: ErrorsProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()), logs: LogsProcessor::new(Arc::clone("a_limiter)), @@ -1309,13 +1291,12 @@ impl EnvelopeProcessorService { ), profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)), trace_attachments: TraceAttachmentsProcessor::new(Arc::clone("a_limiter)), - replays: ReplaysProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()), + replays: ReplaysProcessor::new(Arc::clone("a_limiter), geoip_lookup), client_reports: ClientReportsProcessor::new(outcome_aggregator), attachments: AttachmentProcessor::new(Arc::clone("a_limiter)), user_reports: UserReportsProcessor::new(Arc::clone("a_limiter)), profiles: ProfilesProcessor::new(quota_limiter), }, - geoip_lookup, config, }; @@ -1366,126 +1347,6 @@ impl EnvelopeProcessorService { } else { Ok(cached_result.event) }) } - /// Processes the general errors, and the items which require or create the events. - async fn process_errors( - &self, - managed_envelope: &mut TypedEnvelope, - project_id: ProjectId, - mut ctx: processing::Context<'_>, - ) -> Result, ProcessingError> { - let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope()); - let mut metrics = Metrics::default(); - let mut extracted_metrics = ProcessingExtractedMetrics::new(); - - // Events can also contain user reports. - report::process_user_reports(managed_envelope); - - if_processing!(self.inner.config, { - unreal::expand(managed_envelope, &self.inner.config)?; - #[cfg(sentry)] - playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?; - nnswitch::expand(managed_envelope)?; - }); - - let mut event = event::extract( - managed_envelope, - &mut metrics, - event_fully_normalized, - &self.inner.config, - )?; - - if_processing!(self.inner.config, { - if let Some(inner_event_fully_normalized) = - unreal::process(managed_envelope, &mut event)? - { - event_fully_normalized = inner_event_fully_normalized; - } - #[cfg(sentry)] - if let Some(inner_event_fully_normalized) = - playstation::process(managed_envelope, &mut event, ctx.project_info)? - { - event_fully_normalized = inner_event_fully_normalized; - } - if let Some(inner_event_fully_normalized) = - attachment::create_placeholders(managed_envelope, &mut event, &mut metrics) - { - event_fully_normalized = inner_event_fully_normalized; - } - }); - - ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc( - managed_envelope, - &mut event, - ctx.project_info, - ctx.sampling_project_info, - ); - - let attachments = managed_envelope - .envelope() - .items() - .filter(|item| item.attachment_type() == Some(AttachmentType::Attachment)); - processing::utils::event::finalize( - managed_envelope.envelope().headers(), - &mut event, - attachments, - &mut metrics, - ctx.config, - )?; - event_fully_normalized = processing::utils::event::normalize( - managed_envelope.envelope().headers(), - &mut event, - event_fully_normalized, - project_id, - ctx, - &self.inner.geoip_lookup, - )?; - let filter_run = - processing::utils::event::filter(managed_envelope.envelope().headers(), &event, ctx) - .map_err(ProcessingError::EventFiltered)?; - - if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) { - dynamic_sampling::tag_error_with_sampling_decision( - managed_envelope, - &mut event, - ctx.sampling_project_info, - &self.inner.config, - ) - .await; - } - - event = self - .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx) - .await?; - - if event.value().is_some() { - processing::utils::event::scrub(&mut event, ctx.project_info)?; - event::serialize( - managed_envelope, - &mut event, - event_fully_normalized, - EventMetricsExtracted(false), - SpansExtracted(false), - )?; - event::emit_feedback_metrics(managed_envelope.envelope()); - } - - let attachments = managed_envelope - .envelope_mut() - .items_mut() - .filter(|i| i.ty() == &ItemType::Attachment); - processing::utils::attachments::scrub(attachments, ctx.project_info, None); - - if self.inner.config.processing_enabled() && !event_fully_normalized.0 { - relay_log::error!( - tags.project = %project_id, - tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()), - "ingested event without normalizing" - ); - } - - Ok(Some(extracted_metrics)) - } - /// Processes standalone items that require an event ID, but do not have an event on the same envelope. async fn process_standalone( &self, @@ -1677,16 +1538,8 @@ impl EnvelopeProcessorService { match group { ProcessingGroup::Error => { - if ctx.project_info.has_feature(Feature::NewErrorProcessing) { - self.process_with_processor( - &self.inner.processing.errors, - managed_envelope, - ctx, - ) + self.process_with_processor(&self.inner.processing.errors, managed_envelope, ctx) .await - } else { - run!(process_errors, project_id, ctx) - } } ProcessingGroup::Transaction => { self.process_with_processor( @@ -3455,10 +3308,10 @@ mod tests { }, }; - let Ok(Some(Submit::Envelope(mut new_envelope))) = processor.process(message).await else { + let Ok(Some(Submit::Output { output, ctx })) = processor.process(message).await else { panic!(); }; - let new_envelope = new_envelope.envelope_mut(); + let new_envelope = output.serialize_envelope(ctx).unwrap().accept(|f| f); let event_item = new_envelope.items().last().unwrap(); let annotated_event: Annotated = @@ -3538,11 +3391,11 @@ mod tests { }; let processor = create_test_processor(Config::from_json_value(config).unwrap()).await; - let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else { + let Ok(Some(Submit::Output { output, ctx })) = processor.process(message).await else { panic!(); }; + let envelope = output.serialize_envelope(ctx).unwrap(); let event = envelope - .envelope() .get_item_by(|item| item.ty() == &ItemType::Event) .unwrap(); diff --git a/relay-server/src/services/processor/attachment.rs b/relay-server/src/services/processor/attachment.rs deleted file mode 100644 index b4fc317c095..00000000000 --- a/relay-server/src/services/processor/attachment.rs +++ /dev/null @@ -1,45 +0,0 @@ -//! Attachments processor code. -#[cfg(feature = "processing")] -use { - crate::{ - envelope::AttachmentType, - managed::TypedEnvelope, - services::processor::{ErrorGroup, EventFullyNormalized}, - utils, - }, - relay_event_schema::protocol::{Event, Metrics}, - relay_protocol::Annotated, -}; - -/// Adds processing placeholders for special attachments. -/// -/// If special attachments are present in the envelope, this adds placeholder payloads to the -/// event. This indicates to the pipeline that the event needs special processing. -/// -/// If the event payload was empty before, it is created. -#[cfg(feature = "processing")] -pub fn create_placeholders( - managed_envelope: &mut TypedEnvelope, - event: &mut Annotated, - metrics: &mut Metrics, -) -> Option { - let envelope = managed_envelope.envelope(); - let minidump_attachment = - envelope.get_item_by(|item| item.attachment_type() == Some(AttachmentType::Minidump)); - let apple_crash_report_attachment = envelope - .get_item_by(|item| item.attachment_type() == Some(AttachmentType::AppleCrashReport)); - - if let Some(item) = minidump_attachment { - let event = event.get_or_insert_with(Event::default); - metrics.bytes_ingested_event_minidump = Annotated::new(item.len() as u64); - utils::process_minidump(event, &item.payload()); - return Some(EventFullyNormalized(false)); - } else if let Some(item) = apple_crash_report_attachment { - let event = event.get_or_insert_with(Event::default); - metrics.bytes_ingested_event_applecrashreport = Annotated::new(item.len() as u64); - utils::process_apple_crash_report(event, &item.payload()); - return Some(EventFullyNormalized(false)); - } - - None -} diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs deleted file mode 100644 index 1c39e74e465..00000000000 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ /dev/null @@ -1,317 +0,0 @@ -//! Dynamic sampling processor related code. - -use relay_config::Config; -use relay_dynamic_config::ErrorBoundary; -use relay_event_schema::protocol::{Contexts, Event, TraceContext}; -use relay_protocol::{Annotated, Empty}; - -use crate::managed::TypedEnvelope; -use crate::services::processor::EventProcessing; -use crate::services::projects::project::ProjectInfo; -use crate::utils::{self}; - -/// Ensures there is a valid dynamic sampling context and corresponding project state. -/// -/// The dynamic sampling context (DSC) specifies the project_key of the project that initiated -/// the trace. That project state should have been loaded previously by the project cache and is -/// available on the `ProcessEnvelopeState`. Under these conditions, this cannot happen: -/// -/// - There is no DSC in the envelope headers. This occurs with older or third-party SDKs. -/// - The project key does not exist. This can happen if the project key was disabled, the -/// project removed, or in rare cases when a project from another Sentry instance is referred -/// to. -/// - The project key refers to a project from another organization. In this case the project -/// cache does not resolve the state and instead leaves it blank. -/// - The project state could not be fetched. This is a runtime error, but in this case Relay -/// should fall back to the next-best sampling rule set. -/// -/// In all of the above cases, this function will compute a new DSC using information from the -/// event payload, similar to how SDKs do this. The `sampling_project_state` is also switched to -/// the main project state. -/// -/// If there is no transaction event in the envelope, this function will do nothing. -/// -/// The function will return the sampling project information of the root project for the event. If -/// no sampling project information is specified, the project information of the event’s project -/// will be returned. -pub fn validate_and_set_dsc<'a, T>( - managed_envelope: &mut TypedEnvelope, - event: &mut Annotated, - project_info: &'a ProjectInfo, - sampling_project_info: Option<&'a ProjectInfo>, -) -> Option<&'a ProjectInfo> { - let original_dsc = managed_envelope.envelope().dsc(); - if original_dsc.is_some() && sampling_project_info.is_some() { - return sampling_project_info; - } - - // The DSC can only be computed if there's a transaction event. Note that `dsc_from_event` - // below already checks for the event type. - if let Some(event) = event.value() - && let Some(key_config) = project_info.get_public_key_config() - && let Some(mut dsc) = - crate::processing::utils::dsc::dsc_from_event(key_config.public_key, event) - { - // All other information in the DSC must be discarded, but the sample rate was - // actually applied by the client and is therefore correct. - let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate); - dsc.sample_rate = dsc.sample_rate.or(original_sample_rate); - - managed_envelope.envelope_mut().set_dsc(dsc); - return Some(project_info); - } - - // If we cannot compute a new DSC but the old one is incorrect, we need to remove it. - managed_envelope.envelope_mut().remove_dsc(); - None -} - -/// Runs dynamic sampling on an incoming error and tags it in case of successful sampling -/// decision. -/// -/// This execution of dynamic sampling is technically a "simulation" since we will use the result -/// only for tagging errors and not for actually sampling incoming events. -pub async fn tag_error_with_sampling_decision( - managed_envelope: &mut TypedEnvelope, - event: &mut Annotated, - sampling_project_info: Option<&ProjectInfo>, - config: &Config, -) { - let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else { - return; - }; - - let root_state = sampling_project_info.as_ref(); - let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) { - Some(ErrorBoundary::Ok(config)) => config, - _ => return, - }; - - if sampling_config.unsupported() { - if config.processing_enabled() { - relay_log::error!("found unsupported rules even as processing relay"); - } - - return; - } - - let Some(sampled) = utils::is_trace_fully_sampled(sampling_config, dsc).await else { - return; - }; - - // We want to get the trace context, in which we will inject the `sampled` field. - let context = event - .contexts - .get_or_insert_with(Contexts::new) - .get_or_default::(); - - // We want to update `sampled` only if it was not set, since if we don't check this - // we will end up overriding the value set by downstream Relays and this will lead - // to more complex debugging in case of problems. - if context.sampled.is_empty() { - relay_log::trace!("tagged error with `sampled = {}` flag", sampled); - context.sampled = Annotated::new(sampled); - } -} - -#[cfg(test)] -mod tests { - use std::collections::BTreeMap; - - use relay_base_schema::project::ProjectKey; - use relay_event_schema::protocol::EventId; - use relay_protocol::RuleCondition; - use relay_sampling::config::{RuleId, RuleType, SamplingRule, SamplingValue}; - use relay_sampling::{DynamicSamplingContext, SamplingConfig}; - use relay_system::Addr; - - use crate::envelope::{ContentType, Envelope, Item, ItemType}; - use crate::extractors::RequestMeta; - use crate::managed::ManagedEnvelope; - use crate::processing; - use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit}; - use crate::services::projects::project::ProjectInfo; - use crate::testutils::create_test_processor; - - use super::*; - - /// Always sets the processing item type to event. - async fn process_envelope_with_root_project_state( - envelope: Box, - sampling_project_info: Option<&ProjectInfo>, - ) -> Envelope { - let processor = create_test_processor(Default::default()).await; - let outcome_aggregator = Addr::dummy(); - - let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default()); - assert_eq!(envelopes.len(), 1); - let (group, envelope) = envelopes.pop().unwrap(); - - let message = ProcessEnvelopeGrouped { - group, - envelope: ManagedEnvelope::new(envelope, outcome_aggregator), - ctx: processing::Context { - sampling_project_info, - ..processing::Context::for_test() - }, - }; - - let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else { - panic!(); - }; - - envelope.envelope().clone() - } - - fn extract_first_event_from_envelope(envelope: Envelope) -> Event { - let item = envelope.items().next().unwrap(); - let annotated_event: Annotated = - Annotated::from_json_bytes(&item.payload()).unwrap(); - annotated_event.into_value().unwrap() - } - - fn mocked_error_item() -> Item { - let mut item = Item::new(ItemType::Event); - item.set_payload( - ContentType::Json, - r#"{ - "event_id": "52df9022835246eeb317dbd739ccd059", - "exception": { - "values": [ - { - "type": "mytype", - "value": "myvalue", - "module": "mymodule", - "thread_id": 42, - "other": "value" - } - ] - } - }"#, - ); - item - } - - #[tokio::test] - async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_none() { - let event_id = EventId::new(); - let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" - .parse() - .unwrap(); - let request_meta = RequestMeta::new(dsn); - - // We test tagging when root project state and dsc are none. - let mut envelope = Envelope::from_request(Some(event_id), request_meta); - envelope.add_item(mocked_error_item()); - let new_envelope = process_envelope_with_root_project_state(envelope, None).await; - let event = extract_first_event_from_envelope(new_envelope); - - assert!(event.contexts.value().is_none()); - } - - fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo { - let sampling_config = SamplingConfig { - rules: vec![SamplingRule { - condition: RuleCondition::all(), - sampling_value: SamplingValue::SampleRate { value: sample_rate }, - ty: RuleType::Trace, - id: RuleId(1), - time_range: Default::default(), - decaying_fn: Default::default(), - }], - ..SamplingConfig::new() - }; - - let mut sampling_project_state = ProjectInfo::default(); - sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config)); - sampling_project_state - } - - fn mock_dsc() -> DynamicSamplingContext { - DynamicSamplingContext { - trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(), - public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(), - release: Some("1.1.1".to_owned()), - user: Default::default(), - replay_id: None, - environment: None, - transaction: Some("transaction1".into()), - sample_rate: Some(0.5), - sampled: Some(true), - other: BTreeMap::new(), - } - } - - #[tokio::test] - async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() { - let event_id = EventId::new(); - let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" - .parse() - .unwrap(); - let request_meta = RequestMeta::new(dsn); - let mut envelope = Envelope::from_request(Some(event_id), request_meta); - envelope.set_dsc(mock_dsc()); - envelope.add_item(mocked_error_item()); - - // We test with sample rate equal to 100%. - let sampling_project_state = project_state_with_single_rule(1.0); - let new_envelope = process_envelope_with_root_project_state( - envelope.clone(), - Some(&sampling_project_state), - ) - .await; - let event = extract_first_event_from_envelope(new_envelope); - let trace_context = event.context::().unwrap(); - assert!(trace_context.sampled.value().unwrap()); - - // We test with sample rate equal to 0%. - let sampling_project_state = project_state_with_single_rule(0.0); - let new_envelope = - process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await; - let event = extract_first_event_from_envelope(new_envelope); - let trace_context = event.context::().unwrap(); - assert!(!trace_context.sampled.value().unwrap()); - } - - #[tokio::test] - async fn test_error_is_not_tagged_if_already_tagged() { - let event_id = EventId::new(); - let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" - .parse() - .unwrap(); - let request_meta = RequestMeta::new(dsn); - - // We test tagging with an incoming event that has already been tagged by downstream Relay. - let mut envelope = Envelope::from_request(Some(event_id), request_meta); - let mut item = Item::new(ItemType::Event); - item.set_payload( - ContentType::Json, - r#"{ - "event_id": "52df9022835246eeb317dbd739ccd059", - "exception": { - "values": [ - { - "type": "mytype", - "value": "myvalue", - "module": "mymodule", - "thread_id": 42, - "other": "value" - } - ] - }, - "contexts": { - "trace": { - "sampled": true - } - } - }"#, - ); - envelope.add_item(item); - let sampling_project_state = project_state_with_single_rule(0.0); - let new_envelope = - process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await; - let event = extract_first_event_from_envelope(new_envelope); - let trace_context = event.context::().unwrap(); - assert!(trace_context.sampled.value().unwrap()); - } -} diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 445db0517f4..5e632c19f45 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -1,224 +1,18 @@ //! Event processor related code. -use std::error::Error; - use relay_base_schema::events::EventType; use relay_config::Config; use relay_event_schema::protocol::{ - Breadcrumb, Csp, Event, ExpectCt, ExpectStaple, Hpkp, LenientString, Metrics, - SecurityReportType, Values, + Breadcrumb, Csp, Event, ExpectCt, ExpectStaple, Hpkp, LenientString, SecurityReportType, Values, }; -use relay_protocol::{Annotated, Array, Empty, Object}; -use relay_statsd::metric; +use relay_protocol::{Annotated, Array, Object}; use serde_json::Value as SerdeValue; -use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType}; +use crate::envelope::Item; use crate::extractors::RequestMeta; -use crate::managed::TypedEnvelope; -use crate::services::processor::{ - EventFullyNormalized, EventMetricsExtracted, EventProcessing, ExtractedEvent, ProcessingError, - SpansExtracted, event_type, -}; -use crate::statsd::{RelayCounters, RelayTimers}; +use crate::services::processor::{ExtractedEvent, ProcessingError}; use crate::utils::{self, ChunkedFormDataAggregator, FormDataIter}; -/// Extracts the primary event payload from an envelope. -/// -/// The event is obtained from only one source in the following precedence: -/// 1. An explicit event item. This is also the case for JSON uploads. -/// 2. A security report item. -/// 3. Attachments `__sentry-event` and `__sentry-breadcrumb1/2`. -/// 4. A multipart form data body. -/// 5. If none match, `Annotated::empty()`. -pub fn extract( - managed_envelope: &mut TypedEnvelope, - metrics: &mut Metrics, - event_fully_normalized: EventFullyNormalized, - config: &Config, -) -> Result, ProcessingError> { - let envelope = managed_envelope.envelope_mut(); - - // Remove all items first, and then process them. After this function returns, only - // attachments can remain in the envelope. The event will be added again at the end of - // `process_event`. - let event_item = envelope.take_item_by(|item| item.ty() == &ItemType::Event); - let security_item = envelope.take_item_by(|item| item.ty() == &ItemType::Security); - let raw_security_item = envelope.take_item_by(|item| item.ty() == &ItemType::RawSecurity); - let user_report_v2_item = envelope.take_item_by(|item| item.ty() == &ItemType::UserReportV2); - let form_item = envelope.take_item_by(|item| item.ty() == &ItemType::FormData); - let attachment_item = - envelope.take_item_by(|item| item.attachment_type() == Some(AttachmentType::EventPayload)); - let breadcrumbs1 = - envelope.take_item_by(|item| item.attachment_type() == Some(AttachmentType::Breadcrumbs)); - let breadcrumbs2 = - envelope.take_item_by(|item| item.attachment_type() == Some(AttachmentType::Breadcrumbs)); - - // Event items can never occur twice in an envelope. - if let Some(duplicate) = - envelope.get_item_by(|item| is_duplicate(item, config.processing_enabled())) - { - return Err(ProcessingError::DuplicateItem(duplicate.ty().clone())); - } - - let skip_normalization = config.processing_enabled() && event_fully_normalized.0; - - let (event, event_len) = if let Some(item) = event_item.or(security_item) { - relay_log::trace!("processing json event"); - metric!(timer(RelayTimers::EventProcessingDeserialize), { - let (mut annotated_event, len) = event_from_json_payload(item, None)?; - // Event items can never include transactions, so retain the event type and let - // inference deal with this during normalization. - if let Some(event) = annotated_event.value_mut() - && !skip_normalization - { - event.ty.set_value(None); - } - (annotated_event, len) - }) - } else if let Some(item) = user_report_v2_item { - relay_log::trace!("processing user_report_v2"); - event_from_json_payload(item, Some(EventType::UserReportV2))? - } else if let Some(item) = raw_security_item { - relay_log::trace!("processing security report"); - event_from_security_report(item, envelope.meta()).map_err(|error| { - if !matches!(error, ProcessingError::UnsupportedSecurityType) { - relay_log::debug!( - error = &error as &dyn Error, - "failed to extract security report" - ); - } - error - })? - } else if attachment_item.is_some() || breadcrumbs1.is_some() || breadcrumbs2.is_some() { - relay_log::trace!("extracting attached event data"); - event_from_attachments(config, attachment_item, breadcrumbs1, breadcrumbs2)? - } else if let Some(item) = form_item { - relay_log::trace!("extracting form data"); - let len = item.len(); - - let mut value = SerdeValue::Object(Default::default()); - merge_formdata(&mut value, &item); - let event = Annotated::deserialize_with_meta(value).unwrap_or_default(); - - (event, len) - } else { - relay_log::trace!("no event in envelope"); - (Annotated::empty(), 0) - }; - - metrics.bytes_ingested_event = Annotated::new(event_len as u64); - - Ok(event) -} - -pub fn serialize( - managed_envelope: &mut TypedEnvelope, - event: &mut Annotated, - event_fully_normalized: EventFullyNormalized, - event_metrics_extracted: EventMetricsExtracted, - spans_extracted: SpansExtracted, -) -> Result<(), ProcessingError> { - if event.is_empty() { - relay_log::debug!("Cannot serialize empty event"); - return Ok(()); - } - - let data = metric!(timer(RelayTimers::EventProcessingSerialization), { - event.to_json().map_err(ProcessingError::SerializeFailed)? - }); - - let event_type = event_type(event).unwrap_or_default(); - let mut event_item = Item::new(ItemType::from_event_type(event_type)); - event_item.set_payload(ContentType::Json, data); - - // TODO: The state should simply maintain & update an `ItemHeaders` object. - // If transaction metrics were extracted, set the corresponding item header - event_item.set_metrics_extracted(event_metrics_extracted.0); - event_item.set_spans_extracted(spans_extracted.0); - event_item.set_fully_normalized(event_fully_normalized.0); - event_item.set_span_count(event.value().and_then(|e| e.spans.value()).map(Vec::len)); - - managed_envelope.envelope_mut().add_item(event_item); - - Ok(()) -} - -/// Computes and emits metrics for monitoring user feedback (UserReportV2) ingest -pub fn emit_feedback_metrics(envelope: &Envelope) { - let mut has_feedback = false; - let mut num_attachments = 0; - for item in envelope.items() { - match item.ty() { - ItemType::UserReportV2 => has_feedback = true, - ItemType::Attachment => num_attachments += 1, - _ => (), - } - } - if has_feedback { - metric!(counter(RelayCounters::FeedbackAttachments) += num_attachments); - } -} - -/// Checks for duplicate items in an envelope. -/// -/// An item is considered duplicate if it was not removed by sanitation in `process_event` and -/// `extract_event`. This partially depends on the `processing_enabled` flag. -fn is_duplicate(item: &Item, processing_enabled: bool) -> bool { - match item.ty() { - // These should always be removed by `extract_event`: - ItemType::Event => true, - ItemType::Transaction => true, - ItemType::Security => true, - ItemType::FormData => true, - ItemType::RawSecurity => true, - ItemType::UserReportV2 => true, - - // These should be removed conditionally: - ItemType::UnrealReport => processing_enabled, - - // These may be forwarded to upstream / store: - ItemType::Attachment => false, - ItemType::Nel => false, - ItemType::UserReport => false, - - // Aggregate data is never considered as part of deduplication - ItemType::Session => false, - ItemType::Sessions => false, - ItemType::Statsd => false, - ItemType::MetricBuckets => false, - ItemType::ClientReport => false, - ItemType::Profile => false, - ItemType::ReplayEvent => false, - ItemType::ReplayRecording => false, - ItemType::ReplayVideo => false, - ItemType::CheckIn => false, - ItemType::Log => false, - ItemType::TraceMetric => false, - ItemType::Span => false, - ItemType::ProfileChunk => false, - ItemType::Integration => false, - - // Without knowing more, `Unknown` items are allowed to be repeated - ItemType::Unknown(_) => false, - } -} - -fn event_from_json_payload( - item: Item, - event_type: Option, -) -> Result { - let mut event = Annotated::::from_json_bytes(&item.payload()) - .map_err(ProcessingError::InvalidJson)?; - - if let Some(event_value) = event.value_mut() - && event_type.is_some() - { - event_value.ty.set_value(event_type); - } - - Ok((event, item.len())) -} - pub fn event_from_security_report( item: Item, meta: &RequestMeta, @@ -434,6 +228,8 @@ mod tests { use chrono::{DateTime, TimeZone, Utc}; + use crate::envelope::{ContentType, ItemType}; + use super::*; fn create_breadcrumbs_item(breadcrumbs: &[(Option>, &str)]) -> Item { diff --git a/relay-server/src/services/processor/nnswitch.rs b/relay-server/src/services/processor/nnswitch.rs deleted file mode 100644 index 80e88f12606..00000000000 --- a/relay-server/src/services/processor/nnswitch.rs +++ /dev/null @@ -1,344 +0,0 @@ -//! Nintendo Switch crash reports processor related code. -//! -//! These functions are included only in the processing mode. - -use bytes::{Buf, Bytes}; -use std::sync::OnceLock; -use zstd::bulk::Decompressor as ZstdDecompressor; - -use crate::Envelope; -use crate::constants::NNSWITCH_SENTRY_MAGIC; -use crate::envelope::{AttachmentType, ContentType, Item, ItemType}; -use crate::managed::TypedEnvelope; -use crate::services::processor::{ErrorGroup, ProcessingError}; -use crate::utils; - -pub use crate::processing::errors::SwitchProcessingError; - -/// Limit the size of the decompressed data to prevent an invalid frame blowing up memory usage. -const MAX_DECOMPRESSED_SIZE: usize = 100 * 1024; - -type Result = std::result::Result; - -/// Expands Nintendo Switch crash-reports. -/// -/// If the envelope does NOT contain a `dying_message.dat` attachment, it doesn't do anything. -/// If the attachment item is found and it matches the expected format, it parses the attachment -/// and updates the envelope with the content. The processed attachment is then removed from the -/// envelope. -/// -/// The envelope may be dropped if it exceeds size limits after decompression. Particularly, -/// this includes cases where a single attachment file exceeds the maximum file size. This is in -/// line with the behavior of the envelope endpoint. -/// -/// After this, [`crate::services::processor::EnvelopeProcessorService`] should be able to process -/// the envelope the same way it processes any other envelopes. -/// -/// Note: in case of an error, we don't fail but leave the envelope as is. -pub fn expand( - managed_envelope: &mut TypedEnvelope, -) -> std::result::Result<(), ProcessingError> { - let envelope = managed_envelope.envelope_mut(); - - if let Some(item) = envelope.take_item_by(|item| { - item.attachment_type() == Some(AttachmentType::NintendoSwitchDyingMessage) - }) && let Err(e) = expand_dying_message(item.payload(), envelope) - { - // If we fail to process the dying message, we need to add back the original attachment. - envelope.add_item(item); - return Err(ProcessingError::InvalidNintendoDyingMessage(e)); - } - - Ok(()) -} - -/// Parses DyingMessage contents and updates the envelope. -/// See dying_message.md for the documentation. -fn expand_dying_message(mut payload: Bytes, envelope: &mut Envelope) -> Result<()> { - payload.advance(NNSWITCH_SENTRY_MAGIC.len()); - let version = payload - .try_get_u8() - .map_err(|_| SwitchProcessingError::UnexpectedEof { - expected: "version", - })?; - match version { - 0 => expand_dying_message_v0(payload, envelope), - _ => Err(SwitchProcessingError::InvalidValue( - "version", - version as usize, - )), - } -} - -/// DyingMessage protocol v0 parser. -fn expand_dying_message_v0(mut payload: Bytes, envelope: &mut Envelope) -> Result<()> { - let encoding_byte = payload - .try_get_u8() - .map_err(|_| SwitchProcessingError::UnexpectedEof { - expected: "encoding", - })?; - let format = (encoding_byte >> 6) & 0b0000_0011; - let compression = (encoding_byte >> 4) & 0b0000_0011; - let compression_arg = encoding_byte & 0b0000_1111; - - let compressed_length = - payload - .try_get_u16() - .map_err(|_| SwitchProcessingError::UnexpectedEof { - expected: "compressed data length", - })?; - let data = decompress_data( - payload, - compressed_length as usize, - compression, - compression_arg, - )?; - - match format { - 0 => expand_dying_message_from_envelope_items(data, envelope), - _ => Err(SwitchProcessingError::InvalidValue( - "payload format", - format as usize, - )), - } -} - -/// Merges envelope items with the ones contained in the DyingMessage -fn expand_dying_message_from_envelope_items(data: Bytes, envelope: &mut Envelope) -> Result<()> { - let items = - Envelope::parse_items_bytes(data).map_err(SwitchProcessingError::EnvelopeParsing)?; - for item in items { - // If it's an event type, merge it with the main event one already in the envelope. - if item.ty() == &ItemType::Event - && let Some(event) = envelope.get_item_by_mut(|it| it.ty() == &ItemType::Event) - { - update_event(item, event).map_err(SwitchProcessingError::InvalidJson)?; - // Don't add this item as a new envelope item now that it's merged. - continue; - } - envelope.add_item(item); - } - Ok(()) -} - -fn update_event(item: Item, event: &mut Item) -> std::result::Result<(), serde_json::Error> { - let original_json = serde_json::from_slice::(&event.payload())?; - let mut new_json = serde_json::from_slice(&item.payload())?; - utils::merge_values(&mut new_json, original_json); - let new_payload = serde_json::to_vec(&new_json)?; - event.set_payload(ContentType::Json, new_payload); - Ok(()) -} - -fn decompress_data( - payload: Bytes, - compressed_length: usize, - compression: u8, - compression_arg: u8, -) -> Result { - if payload.len() < compressed_length { - return Err(SwitchProcessingError::InvalidValue( - "compressed data length", - compressed_length, - )); - } - - let data = payload.slice(0..compressed_length); - match compression { - // No compression - 0 => Ok(data), - // Zstandard - 1 => decompress_data_zstd(data, compression_arg) - .map(Bytes::from) - .map_err(SwitchProcessingError::Zstandard), - _ => Err(SwitchProcessingError::InvalidValue( - "compression format", - compression as usize, - )), - } -} - -fn get_zstd_dictionary(id: usize) -> Option<&'static zstd::dict::DecoderDictionary<'static>> { - // Inlined dictionary binary data. - static ZSTD_DICTIONARIES: &[&[u8]] = &[ - // index 0 = empty dictionary (a.k.a "none") - b"", - ]; - - // We initialize dictionaries (from their binary representation) only once and reuse them when decompressing. - static ZSTD_DEC_DICTIONARIES: OnceLock< - [zstd::dict::DecoderDictionary; ZSTD_DICTIONARIES.len()], - > = OnceLock::new(); - let dictionaries = ZSTD_DEC_DICTIONARIES.get_or_init(|| { - let mut dictionaries: [zstd::dict::DecoderDictionary; ZSTD_DICTIONARIES.len()] = - [zstd::dict::DecoderDictionary::new(ZSTD_DICTIONARIES[0])]; - for i in 0..ZSTD_DICTIONARIES.len() { - dictionaries[i] = zstd::dict::DecoderDictionary::new(ZSTD_DICTIONARIES[i]); - } - dictionaries - }); - - dictionaries.get(id) -} - -fn decompress_data_zstd(data: Bytes, dictionary_id: u8) -> std::io::Result> { - let dictionary = get_zstd_dictionary(dictionary_id as usize) - .ok_or(std::io::Error::other("Unknown compression dictionary"))?; - - let mut decompressor = ZstdDecompressor::with_prepared_dictionary(dictionary)?; - decompressor.decompress(data.as_ref(), MAX_DECOMPRESSED_SIZE) -} - -#[cfg(test)] -mod tests { - use relay_system::Addr; - use std::io::Write; - - use super::*; - use crate::constants::NNSWITCH_DYING_MESSAGE_FILENAME; - use crate::managed::ManagedEnvelope; - use crate::services::processor::ProcessingGroup; - use zstd::bulk::Compressor as ZstdCompressor; - - fn create_envelope(dying_message: Bytes) -> TypedEnvelope { - // Note: the attachment length specified in the "outer" envelope attachment is very important. - // Otherwise parsing would fail because the inner one can contain line-breaks. - let envelope = - r#"{"event_id":"9ec79c33ec9942ab8353589fcb2e04dc","dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"} -{"type":"event"} -{"message":"hello world","level":"error","map":{"a":"val"}} -{"type":"attachment","filename":"dying_message.dat","length":} -"#.replace("", &dying_message.len().to_string()); - - let envelope = ManagedEnvelope::new( - Envelope::parse_bytes([Bytes::from(envelope), dying_message].concat().into()).unwrap(), - Addr::dummy(), - ); - (envelope, ProcessingGroup::Error).try_into().unwrap() - } - - #[test] - fn test_expand_uncompressed_envelope_items() { - // The attachment content is as follows: - // - 4 bytes magic = sntr - // - 1 byte version = 0 - // - 1 byte encoding = 0b0000_0000 - i.e. envelope items, uncompressed - // - 2 bytes data length = 98 bytes - 0x0062 in big endian representation - // - 98 bytes of content - let mut envelope = create_envelope(Bytes::from( - "sntr\0\0\0\x62\ - {\"type\":\"event\"}\n\ - {\"foo\":\"bar\",\"level\":\"info\",\"map\":{\"b\":\"c\"}}\n\ - {\"type\":\"attachment\",\"length\":2}\n\ - Hi\n", - )); - - let items: Vec<_> = envelope.envelope().items().collect(); - assert_eq!(items.len(), 2); - assert_eq!(items[0].ty(), &ItemType::Event); - assert_eq!( - items[0].payload(), - "{\"message\":\"hello world\",\"level\":\"error\",\"map\":{\"a\":\"val\"}}" - ); - assert_eq!(items[1].ty(), &ItemType::Attachment); - assert_eq!(items[1].filename(), Some(NNSWITCH_DYING_MESSAGE_FILENAME)); - assert_eq!(items[1].payload().len(), 106); - - expand(&mut envelope).unwrap(); - - let items: Vec<_> = envelope.envelope().items().collect(); - assert_eq!(items.len(), 2); - assert_eq!(items[0].ty(), &ItemType::Event); - assert_eq!( - items[0].payload(), - "{\"foo\":\"bar\",\"level\":\"info\",\"map\":{\"a\":\"val\",\"b\":\"c\"},\"message\":\"hello world\"}" - ); - assert_eq!(items[1].ty(), &ItemType::Attachment); - assert_eq!(items[1].filename(), None); - assert_eq!(items[1].payload(), "Hi".as_bytes()); - } - - #[test] - fn test_expand_compressed_envelope_items() { - // encoding 0x10 - i.e. envelope items, Zstandard compressed, no dictionary - let dying_message = create_compressed_dying_message(0x10); - let mut envelope = create_envelope(dying_message.into()); - - let items: Vec<_> = envelope.envelope().items().collect(); - assert_eq!(items.len(), 2); - assert_eq!(items[0].ty(), &ItemType::Event); - assert_eq!( - items[0].payload(), - "{\"message\":\"hello world\",\"level\":\"error\",\"map\":{\"a\":\"val\"}}" - ); - assert_eq!(items[1].ty(), &ItemType::Attachment); - assert_eq!(items[1].filename(), Some(NNSWITCH_DYING_MESSAGE_FILENAME)); - assert_eq!(items[1].payload().len(), 98); - - expand(&mut envelope).unwrap(); - - let items: Vec<_> = envelope.envelope().items().collect(); - assert_eq!(items.len(), 2); - assert_eq!(items[0].ty(), &ItemType::Event); - assert_eq!( - items[0].payload(), - "{\"foo\":\"bar\",\"level\":\"info\",\"map\":{\"a\":\"val\",\"b\":\"c\"},\"message\":\"hello world\"}" - ); - assert_eq!(items[1].ty(), &ItemType::Attachment); - assert_eq!(items[1].filename(), None); - assert_eq!(items[1].payload(), "Hi".as_bytes()); - } - - fn create_compressed_dying_message(encoding: u8) -> Vec { - // The attachment content is as follows: - // - 4 bytes magic = sntr - // - 1 byte version = 0 - // - 1 byte encoding - // - 2 bytes data length = N bytes - in big endian representation - // - N bytes of compressed content (Zstandard) - let mut compressor = ZstdCompressor::new(3).unwrap(); - let compressed_data = compressor - .compress( - b"\ - {\"type\":\"event\"}\n\ - {\"foo\":\"bar\",\"level\":\"info\",\"map\":{\"b\":\"c\"}}\n\ - {\"type\":\"attachment\",\"length\":2}\n\ - Hi\n\ - ", - ) - .unwrap(); - let mut dying_message: Vec = Vec::new(); - dying_message.write_all(b"sntr\0").unwrap(); - dying_message.write_all(&[encoding]).unwrap(); - dying_message - .write_all(&(compressed_data.len() as u16).to_be_bytes()) - .unwrap(); - dying_message.write_all(&compressed_data).unwrap(); - dying_message - } - - #[test] - fn test_expand_fails_with_unknown_dictioary() { - // encoding 0x10 - i.e. envelope items, Zstandard compressed, dictionary ID 1 - let dying_message = create_compressed_dying_message(0b0001_0001); - let mut envelope = create_envelope(dying_message.into()); - - assert!(expand(&mut envelope).is_err()); - } - - #[test] - fn test_expand_fails_on_invalid_data() { - let mut envelope = create_envelope(Bytes::from( - "sntr\0\0\0\x62\ - {\"type\":\"event\"}\n\ - ", - )); - assert!(expand(&mut envelope).is_err()); - } - - #[test] - fn test_expand_works_with_empty_data() { - let mut envelope = create_envelope(Bytes::from("sntr\0\0\0\0")); - assert!(expand(&mut envelope).is_ok()); - } -} diff --git a/relay-server/src/services/processor/playstation.rs b/relay-server/src/services/processor/playstation.rs index 3f52245ba83..99735e01089 100644 --- a/relay-server/src/services/processor/playstation.rs +++ b/relay-server/src/services/processor/playstation.rs @@ -2,138 +2,15 @@ //! //! These functions are included only in the processing mode. -use relay_config::Config; -use relay_dynamic_config::Feature; use relay_event_schema::protocol::{ AppContext, ClientSdkInfo, Context, Contexts, DeviceContext, LenientString, OsContext, RuntimeContext, Tags, }; use relay_event_schema::protocol::{Event, TagEntry}; -use relay_prosperoconv::{self, ProsperoDump}; +use relay_prosperoconv::ProsperoDump; use relay_protocol::{Annotated, Empty, Object}; -use crate::constants::SENTRY_CRASH_PAYLOAD_KEY; -use crate::envelope::{AttachmentType, ContentType, Item, ItemType}; -use crate::managed::TypedEnvelope; -use crate::services::processor::metric; -use crate::services::processor::{ErrorGroup, EventFullyNormalized, ProcessingError}; -use crate::services::projects::project::ProjectInfo; -use crate::statsd::RelayCounters; -use crate::utils; - -pub fn expand( - managed_envelope: &mut TypedEnvelope, - config: &Config, - project_info: &ProjectInfo, -) -> Result<(), ProcessingError> { - if !project_info.has_feature(Feature::PlaystationIngestion) { - return Ok(()); - } - let envelope = managed_envelope.envelope_mut(); - - // Get instead of take as we want to keep the dump as an attachment - if let Some(item) = envelope.get_item_by(|item| { - item.ty() == &ItemType::Attachment - && item.attachment_type() == Some(AttachmentType::Prosperodump) - }) { - let data = relay_prosperoconv::extract_data(&item.payload()).map_err(|err| { - ProcessingError::InvalidPlaystationDump(format!("Failed to extract data: {err}")) - })?; - let prospero_dump = ProsperoDump::parse(&data).map_err(|err| { - ProcessingError::InvalidPlaystationDump(format!("Failed to parse dump: {err}")) - })?; - let minidump_buffer = relay_prosperoconv::write_dump(&prospero_dump).map_err(|err| { - ProcessingError::InvalidPlaystationDump(format!("Failed to create minidump: {err}")) - })?; - - if let Some(json) = prospero_dump.userdata.get(SENTRY_CRASH_PAYLOAD_KEY) { - let event = envelope.take_item_by(|item| item.ty() == &ItemType::Event); - let event_item = merge_or_create_event_item(json, event); - envelope.add_item(event_item); - } - - add_attachments(envelope, prospero_dump, minidump_buffer); - - if let Err(offender) = utils::check_envelope_size_limits(config, envelope) { - return Err(ProcessingError::PayloadTooLarge(offender)); - } - } - Ok(()) -} - -pub fn process( - managed_envelope: &mut TypedEnvelope, - event: &mut Annotated, - project_info: &ProjectInfo, -) -> Result, ProcessingError> { - if !project_info.has_feature(Feature::PlaystationIngestion) { - return Ok(None); - } - let envelope = &mut managed_envelope.envelope_mut(); - - if let Some(item) = envelope.get_item_by(|item| { - item.ty() == &ItemType::Attachment - && item.attachment_type() == Some(AttachmentType::Prosperodump) - }) { - metric!(counter(RelayCounters::PlaystationProcessing) += 1); - let event = event.get_or_insert_with(Event::default); - - // Currently we parse the dump here again, in order to set the contexts on the event - // this can not be done in the expand function since we don't have an event at that point. - // This is inline with how unreal reports are handled. - let data = relay_prosperoconv::extract_data(&item.payload()).map_err(|err| { - ProcessingError::InvalidPlaystationDump(format!("Failed to extract data: {err}")) - })?; - let prospero_dump = ProsperoDump::parse(&data).map_err(|err| { - ProcessingError::InvalidPlaystationDump(format!("Failed to parse dump: {err}")) - })?; - - // If "__sentry" is not a key in the userdata do the legacy extraction. - // This should be removed once all customers migrated to the new format. - if !&prospero_dump - .userdata - .contains_key(SENTRY_CRASH_PAYLOAD_KEY) - { - legacy_userdata_extraction(event, &prospero_dump); - } - merge_playstation_context(event, &prospero_dump); - - return Ok(Some(EventFullyNormalized(false))); - } - Ok(None) -} - -fn add_attachments( - envelope: &mut crate::Envelope, - prospero_dump: ProsperoDump<'_>, - minidump_buffer: Vec, -) { - let mut item = Item::new(ItemType::Attachment); - item.set_filename("generated_minidump.dmp"); - item.set_payload(ContentType::Minidump, minidump_buffer); - item.set_attachment_type(AttachmentType::Minidump); - envelope.add_item(item); - - for file in prospero_dump.files { - let mut item = Item::new(ItemType::Attachment); - item.set_filename(file.name); - item.set_attachment_type(AttachmentType::Attachment); - item.set_payload(infer_content_type(file.name), file.contents.to_owned()); - envelope.add_item(item); - } - - let mut console_log = prospero_dump.system_log.into_owned(); - for log_line in prospero_dump.log_lines { - console_log.push_str(log_line); - } - if !console_log.is_empty() { - let mut item = Item::new(ItemType::Attachment); - item.set_filename("console.log"); - item.set_payload(ContentType::Text, console_log.into_bytes()); - item.set_attachment_type(AttachmentType::Attachment); - envelope.add_item(item); - } -} +use crate::envelope::ContentType; pub fn legacy_userdata_extraction(event: &mut Event, prospero: &ProsperoDump) { let contexts = event.contexts.get_or_insert_with(Contexts::default); @@ -283,30 +160,6 @@ pub fn infer_content_type(filename: &str) -> ContentType { } } -fn create_item_with_json_payload(payload: &str) -> Item { - let mut item = Item::new(ItemType::Event); - item.set_payload(ContentType::Json, payload.to_owned()); - item -} - -fn merge_or_create_event_item(json: &str, event: Option) -> Item { - if let Some(item) = event { - let event = - serde_json::from_slice::(&item.payload()).unwrap_or_default(); - let mut base_event = serde_json::from_str::(json).unwrap_or_default(); - - utils::merge_values(&mut base_event, event); - - if let Ok(merged_json) = serde_json::to_string(&base_event) { - create_item_with_json_payload(&merged_json) - } else { - item - } - } else { - create_item_with_json_payload(json) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/relay-server/src/services/processor/report.rs b/relay-server/src/services/processor/report.rs index aa3cdd8a28f..6af44503cfc 100644 --- a/relay-server/src/services/processor/report.rs +++ b/relay-server/src/services/processor/report.rs @@ -69,7 +69,7 @@ mod tests { use crate::envelope::{Envelope, Item}; use crate::extractors::RequestMeta; use crate::managed::ManagedEnvelope; - use crate::processing; + use crate::processing::{self, Forward}; use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit}; use crate::testutils::create_test_processor; @@ -111,10 +111,10 @@ mod tests { ctx: processing::Context::for_test(), }; - let Ok(Some(Submit::Envelope(new_envelope))) = processor.process(message).await else { + let Ok(Some(Submit::Output { output, ctx })) = processor.process(message).await else { panic!(); }; - let new_envelope = new_envelope.envelope(); + let new_envelope = output.serialize_envelope(ctx).unwrap(); assert_eq!(new_envelope.len(), 1); assert_eq!(new_envelope.items().next().unwrap().ty(), &ItemType::Event); diff --git a/relay-server/src/services/processor/unreal.rs b/relay-server/src/services/processor/unreal.rs deleted file mode 100644 index 43da9df382e..00000000000 --- a/relay-server/src/services/processor/unreal.rs +++ /dev/null @@ -1,54 +0,0 @@ -//! Unreal 4 processor related code. -//! -//! These functions are included only in the processing mode. - -use crate::envelope::ItemType; -use crate::managed::TypedEnvelope; -use crate::services::processor::{ErrorGroup, EventFullyNormalized, ProcessingError}; -use crate::utils; - -use relay_config::Config; -use relay_event_schema::protocol::Event; -use relay_protocol::Annotated; - -/// Expands Unreal 4 items inside an envelope. -/// -/// If the envelope does NOT contain an `UnrealReport` item, it doesn't do anything. If the -/// envelope contains an `UnrealReport` item, it removes it from the envelope and inserts new -/// items for each of its contents. -/// -/// The envelope may be dropped if it exceeds size limits after decompression. Particularly, -/// this includes cases where a single attachment file exceeds the maximum file size. This is in -/// line with the behavior of the envelope endpoint. -/// -/// After this, [`crate::services::processor::EnvelopeProcessorService`] should be able to process the envelope the same -/// way it processes any other envelopes. -pub fn expand( - managed_envelope: &mut TypedEnvelope, - config: &Config, -) -> Result<(), ProcessingError> { - let envelope = &mut managed_envelope.envelope_mut(); - - if let Some(item) = envelope.take_item_by(|item| item.ty() == &ItemType::UnrealReport) { - utils::expand_unreal_envelope(item, envelope, config)?; - } - - Ok(()) -} - -/// Extracts event information from an unreal context. -/// -/// If the event does not contain an unreal context, this function does not perform any action. -/// If there was no event payload prior to this function, it is created. -pub fn process( - managed_envelope: &mut TypedEnvelope, - event: &mut Annotated, -) -> Result, ProcessingError> { - if utils::process_unreal_envelope(event, managed_envelope.envelope_mut()) - .map_err(ProcessingError::InvalidUnrealReport)? - { - return Ok(Some(EventFullyNormalized(false))); - } - - Ok(None) -} diff --git a/relay-server/src/utils/dynamic_sampling.rs b/relay-server/src/utils/dynamic_sampling.rs index 6557c1c4620..ed83669c72b 100644 --- a/relay-server/src/utils/dynamic_sampling.rs +++ b/relay-server/src/utils/dynamic_sampling.rs @@ -1,9 +1,6 @@ //! Functionality for calculating if a trace should be processed or dropped. use std::ops::ControlFlow; -use chrono::Utc; -use relay_sampling::config::{RuleType, SamplingConfig}; -use relay_sampling::dsc::DynamicSamplingContext; use relay_sampling::evaluation::{SamplingDecision, SamplingEvaluator, SamplingMatch}; use crate::services::outcome::Outcome; @@ -71,29 +68,6 @@ impl From>> for SamplingResult } } -/// Runs dynamic sampling if the dsc and root project state are not None and returns whether the -/// transactions received with such dsc and project state would be kept or dropped by dynamic -/// sampling. -pub async fn is_trace_fully_sampled( - root_project_config: &SamplingConfig, - dsc: &DynamicSamplingContext, -) -> Option { - // If the sampled field is not set, we prefer to not tag the error since we have no clue on - // whether the head of the trace was kept or dropped on the client side. - // In addition, if the head of the trace was dropped on the client we will immediately mark - // the trace as not fully sampled. - if !dsc.sampled? { - return Some(false); - } - - let evaluator = SamplingEvaluator::new(Utc::now()); - - let rules = root_project_config.filter_rules(RuleType::Trace); - - let evaluation = evaluator.match_rules(*dsc.trace_id, dsc, rules).await; - Some(SamplingResult::from(evaluation).decision().is_keep()) -} - #[cfg(test)] mod tests { use relay_base_schema::events::EventType; @@ -114,6 +88,10 @@ mod tests { } } + use chrono::Utc; + use relay_sampling::config::RuleType; + use relay_sampling::dsc::DynamicSamplingContext; + use super::*; fn mocked_simple_dynamic_sampling_context( @@ -214,75 +192,4 @@ mod tests { assert!(result.is_match()); assert!(result.decision().is_keep()); } - - #[tokio::test] - async fn test_is_trace_fully_sampled_return_true_with_unsupported_rules() { - let config = SamplingConfig { - rules: vec![ - mocked_sampling_rule(1, RuleType::Unsupported, 1.0), - mocked_sampling_rule(1, RuleType::Trace, 0.0), - ], - ..SamplingConfig::new() - }; - - let dsc = mocked_simple_dynamic_sampling_context(None, None, None, None, Some(true)); - - // If processing is enabled, we simply log an error and otherwise proceed as usual. - assert_eq!(is_trace_fully_sampled(&config, &dsc).await, Some(false)); - } - - #[tokio::test] - /// Tests that a trace is marked as fully sampled correctly when dsc and project state are set. - async fn test_is_trace_fully_sampled_with_valid_dsc_and_sampling_config() { - // We test with `sampled = true` and 100% rule. - - let config = SamplingConfig { - rules: vec![mocked_sampling_rule(1, RuleType::Trace, 1.0)], - ..SamplingConfig::new() - }; - - let dsc = - mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, Some(true)); - - let result = is_trace_fully_sampled(&config, &dsc).await.unwrap(); - assert!(result); - - // We test with `sampled = true` and 0% rule. - let config = SamplingConfig { - rules: vec![mocked_sampling_rule(1, RuleType::Trace, 0.0)], - ..SamplingConfig::new() - }; - - let dsc = - mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, Some(true)); - - let result = is_trace_fully_sampled(&config, &dsc).await.unwrap(); - assert!(!result); - - // We test with `sampled = false` and 100% rule. - let config = SamplingConfig { - rules: vec![mocked_sampling_rule(1, RuleType::Trace, 1.0)], - ..SamplingConfig::new() - }; - - let dsc = - mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, Some(false)); - - let result = is_trace_fully_sampled(&config, &dsc).await.unwrap(); - assert!(!result); - } - - #[tokio::test] - /// Tests that a trace is not marked as fully sampled or not if inputs are invalid. - async fn test_is_trace_fully_sampled_with_invalid_inputs() { - // We test with missing `sampled`. - let config = SamplingConfig { - rules: vec![mocked_sampling_rule(1, RuleType::Trace, 1.0)], - ..SamplingConfig::new() - }; - let dsc = mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, None); - - let result = is_trace_fully_sampled(&config, &dsc).await; - assert!(result.is_none()); - } } diff --git a/relay-server/src/utils/unreal.rs b/relay-server/src/utils/unreal.rs index 0270ce1b3cd..957af359170 100644 --- a/relay-server/src/utils/unreal.rs +++ b/relay-server/src/utils/unreal.rs @@ -13,9 +13,8 @@ use symbolic_unreal::{ use crate::constants::{ ITEM_NAME_BREADCRUMBS1, ITEM_NAME_BREADCRUMBS2, ITEM_NAME_EVENT, SENTRY_CRASH_PAYLOAD_KEY, - UNREAL_USER_HEADER, }; -use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType, Items}; +use crate::envelope::{AttachmentType, ContentType, Item, ItemType, Items}; use crate::services::processor::ProcessingError; /// Maximum number of unreal logs to parse for breadcrumbs. @@ -40,40 +39,6 @@ fn get_event_item(data: &[u8]) -> Result, Unreal4Error> { Ok(Some(item)) } -/// Expands Unreal 4 items inside an envelope. -/// -/// If the envelope does NOT contain an `UnrealReport` item, it doesn't do anything. If the envelope -/// contains an `UnrealReport` item, it removes it from the envelope and inserts new items for each -/// of its contents. -/// -/// After this, the `EnvelopeProcessor` should be able to process the envelope the same way it -/// processes any other envelopes. -pub fn expand_unreal_envelope( - unreal_item: Item, - envelope: &mut Envelope, - config: &Config, -) -> Result<(), ProcessingError> { - let has_event = envelope - .get_item_by(|item| item.ty() == &ItemType::Event) - .is_some(); - - let expansion = expand_unreal(unreal_item, config)?; - - if !has_event && let Some(event) = expansion.event { - envelope.add_item(event); - } - - for attachment in expansion.attachments { - envelope.add_item(attachment); - } - - if let Err(offender) = super::check_envelope_size_limits(config, envelope) { - return Err(ProcessingError::PayloadTooLarge(offender)); - } - - Ok(()) -} - /// Expands an Unreal 4 item and returns the expanded items. pub fn expand_unreal( unreal_item: Item, @@ -367,38 +332,9 @@ fn merge_unreal_context(event: &mut Event, context: Unreal4Context) { } } -/// Processes an unreal envelope. -/// -/// This function returns either the processing error, or a boolean indicating -/// whether the envelope contained an unreal item. -pub fn process_unreal_envelope( - event: &mut Annotated, - envelope: &mut Envelope, -) -> Result { - let user_header = envelope - .get_header(UNREAL_USER_HEADER) - .and_then(Value::as_str); - - // the `unwrap_or_default` here can produce an invalid user report if the envelope id - // is indeed missing. This should not happen under normal circumstances since the EventId is - // created statically. - let event_id = envelope.event_id().unwrap_or_default(); - debug_assert!(!event_id.is_nil()); - - match process_unreal(event_id, event, envelope.items(), user_header)? { - Some(r) => { - for item in r.user_reports { - envelope.add_item(item); - } - Ok(true) - } - None => Ok(false), - } -} - /// Processes an unreal crash report. /// -/// The `user_header` should be extracted from the [`UNREAL_USER_HEADER`] envelope header. +/// The `user_header` should be extracted from the [`crate::constants::UNREAL_USER_HEADER`] envelope header. pub fn process_unreal<'a>( event_id: EventId, event: &mut Annotated, diff --git a/tests/integration/fixtures/mini_sentry.py b/tests/integration/fixtures/mini_sentry.py index 9f3ced5f37e..9d6e7bb4cb2 100644 --- a/tests/integration/fixtures/mini_sentry.py +++ b/tests/integration/fixtures/mini_sentry.py @@ -181,7 +181,6 @@ def full_project_config(self, project_id, dsn_public_key=None, extra=None): }, "blacklistedIps": ["127.43.33.22"], "trustedRelays": [], - "features": ["organizations:relay-new-error-processing"], }, } diff --git a/tests/integration/test_playstation.py b/tests/integration/test_playstation.py index 32ac137294f..93d0c642d37 100644 --- a/tests/integration/test_playstation.py +++ b/tests/integration/test_playstation.py @@ -26,7 +26,6 @@ def playstation_project_config(): "features": [ "organizations:relay-playstation-ingestion", "projects:relay-upload-endpoint", - "organizations:relay-new-error-processing", ], } }