diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 45fe2de8d841e3..e62b44f4a7a7a1 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3228,6 +3228,12 @@ default=10 * 1024 * 1024, flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, ) +# When enabled, oversized segments are split into chunks instead of being dropped. +register( + "spans.buffer.flush-oversized-segments", + default=False, + flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, +) # TTL for keys in Redis. This is a downside protection in case of bugs. register( "spans.buffer.redis-ttl", diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index a00d7517a62b9e..57aa1da0a72b60 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -710,6 +710,7 @@ def _load_segment_data( cursors[payload_key] = 0 payload_keys_map[key] = segment_payload_keys + flush_oversized_segments = options.get("spans.buffer.flush-oversized-segments") dropped_segments: set[SegmentKey] = set() def _add_spans(key: SegmentKey, raw_data: bytes) -> bool: @@ -724,7 +725,7 @@ def _add_spans(key: SegmentKey, raw_data: bytes) -> bool: decompress_latency_ms += (time.monotonic() - decompress_start) * 1000 sizes[key] = sizes.get(key, 0) + sum(len(span) for span in decompressed) - if sizes[key] > max_segment_bytes: + if sizes[key] > max_segment_bytes and not flush_oversized_segments: metrics.incr("spans.buffer.flush_segments.segment_size_exceeded") logger.warning("Skipping too large segment, byte size %s", sizes[key]) payloads.pop(key, None) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 78d219db430917..479c7c5d721aa7 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -6,6 +6,7 @@ from collections.abc import Callable, Mapping from concurrent.futures import Future from functools import partial +from typing import Any import orjson import sentry_sdk @@ -35,6 +36,42 @@ type ProduceToPipe = Callable[[int, KafkaPayload, int], None] +type SpanPayload = dict[str, Any] + + +def _chunk_segment(span_payloads: list[SpanPayload]) -> list[list[SpanPayload]]: + """ + Split a segment into chunks of spans payloads that fit under max_segment_bytes. + If all spans of the segment fit in one chunk, returns a single-element list. + """ + max_segment_bytes = options.get("spans.buffer.max-segment-bytes") + + sizes = [len(orjson.dumps(s)) for s in span_payloads] + total_size = sum(sizes) + if total_size <= max_segment_bytes: + return [span_payloads] + + chunks: list[list[SpanPayload]] = [] + current_chunk: list[SpanPayload] = [] + current_size = 0 + + for payload, payload_size in zip(span_payloads, sizes): + if current_chunk and current_size + payload_size > max_segment_bytes: + chunks.append(current_chunk) + current_chunk = [] + current_size = 0 + current_chunk.append(payload) + current_size += payload_size + + if current_chunk: + chunks.append(current_chunk) + + if len(chunks) > 1: + metrics.incr("spans.buffer.flusher.oversized_segments_chunked") + + return chunks + + class MultiProducer: """ Manages multiple Kafka producers for load balancing across brokers/topics. @@ -303,6 +340,7 @@ def produce(project_id: int, payload: KafkaPayload, dropped: int) -> None: first_iteration = True while not stopped.value: + flush_oversized_segments = options.get("spans.buffer.flush-oversized-segments") system_now = int(time.time()) now = system_now + current_drift.value flushed_segments = buffer.flush_segments(now=now) @@ -334,13 +372,20 @@ def produce(project_id: int, payload: KafkaPayload, dropped: int) -> None: continue spans = [span.payload for span in flushed_segment.spans] - kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) - metrics.timing( - "spans.buffer.segment_size_bytes", - len(kafka_payload.value), - tags={"shard": shard_tag}, - ) - produce(flushed_segment.project_id, kafka_payload, len(spans)) + + if flush_oversized_segments: + chunks = _chunk_segment(spans) + else: + chunks = [spans] + + for chunk in chunks: + kafka_payload = KafkaPayload(None, orjson.dumps({"spans": chunk}), []) + metrics.timing( + "spans.buffer.segment_size_bytes", + len(kafka_payload.value), + tags={"shard": shard_tag}, + ) + produce(flushed_segment.project_id, kafka_payload, len(chunk)) with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): for project_id, future, dropped in producer_futures: diff --git a/tests/sentry/spans/consumers/process/test_flusher.py b/tests/sentry/spans/consumers/process/test_flusher.py index 3ee2f8428bebe0..587c2f700890d7 100644 --- a/tests/sentry/spans/consumers/process/test_flusher.py +++ b/tests/sentry/spans/consumers/process/test_flusher.py @@ -10,7 +10,7 @@ from sentry.conf.types.kafka_definition import Topic from sentry.spans.buffer import Span, SpansBuffer -from sentry.spans.consumers.process.flusher import MultiProducer, SpanFlusher +from sentry.spans.consumers.process.flusher import MultiProducer, SpanFlusher, _chunk_segment from sentry.testutils.helpers.options import override_options from tests.sentry.spans.test_buffer import DEFAULT_OPTIONS @@ -216,3 +216,62 @@ def hang_main( next_step=Noop(), produce_to_pipe=lambda project_id, payload, dropped: None, ) + + +def test_chunk_segment_under_limit() -> None: + spans = [{"span_id": "a"}, {"span_id": "b"}] + with override_options({"spans.buffer.max-segment-bytes": 10000}): + chunks = _chunk_segment(spans) + assert chunks == [spans] + + +def test_chunk_segment_splits_oversized() -> None: + # Mirror the segment payload shape used in buffer tests. + spans = [ + { + "span_id": "a" * 16, + "is_segment": True, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "b" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "c" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "d" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "e" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + ] + + with override_options({"spans.buffer.max-segment-bytes": 500}): + chunks = _chunk_segment(spans) + + assert len(chunks) == 2 + assert [len(chunk) for chunk in chunks] == [3, 2] + + all_spans = [span for chunk in chunks for span in chunk] + assert all_spans == spans + + for chunk in chunks[:-1]: + chunk_size = sum(len(orjson.dumps(s)) for s in chunk) + assert chunk_size <= 500 + + +def test_chunk_segment_single_large_span() -> None: + """A single span larger than max_bytes still gets its own chunk.""" + spans = [{"span_id": "a" * 16}] + with override_options({"spans.buffer.max-segment-bytes": 10}): + chunks = _chunk_segment(spans) + assert chunks == [spans] diff --git a/tests/sentry/spans/test_buffer.py b/tests/sentry/spans/test_buffer.py index 84b06f7ac22ab8..f3bc7c2ce84deb 100644 --- a/tests/sentry/spans/test_buffer.py +++ b/tests/sentry/spans/test_buffer.py @@ -39,6 +39,7 @@ "spans.buffer.debug-traces": [], "spans.buffer.evalsha-cumulative-logger-enabled": True, "spans.process-segments.schema-validation": 1.0, + "spans.buffer.flush-oversized-segments": False, } @@ -967,6 +968,88 @@ def test_dropped_spans_emit_outcomes( assert ingested_bytes_timing_calls[0].args[1] == expected_bytes +@mock.patch("sentry.spans.buffer.Project") +def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> None: + """When flush-oversized-segments is enabled, oversized segments are kept instead of dropped.""" + mock_project = mock.Mock() + mock_project.id = 1 + mock_project.organization_id = 100 + mock_project_model.objects.get_from_cache.return_value = mock_project + + batch1 = [ + Span( + payload=_payload("c" * 16), + trace_id="a" * 32, + span_id="c" * 16, + parent_span_id="b" * 16, + segment_id=None, + project_id=1, + ), + Span( + payload=_payload("b" * 16), + trace_id="a" * 32, + span_id="b" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + ] + batch2 = [ + Span( + payload=_payload("d" * 16), + trace_id="a" * 32, + span_id="d" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + Span( + payload=_payload("e" * 16), + trace_id="a" * 32, + span_id="e" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + Span( + payload=_payload("a" * 16), + trace_id="a" * 32, + span_id="a" * 16, + parent_span_id=None, + project_id=1, + segment_id=None, + is_segment_span=True, + ), + ] + + with override_options( + {"spans.buffer.max-segment-bytes": 100, "spans.buffer.flush-oversized-segments": True} + ): + buffer.process_spans(batch1, now=0) + buffer.process_spans(batch2, now=0) + rv = buffer.flush_segments(now=11) + + segment = rv[_segment_id(1, "a" * 32, "a" * 16)] + assert len(segment.spans) == 5 + _normalize_output(rv) + assert rv == { + _segment_id(1, "a" * 32, "a" * 16): FlushedSegment( + queue_key=mock.ANY, + score=mock.ANY, + ingested_count=mock.ANY, + payload_keys=mock.ANY, + project_id=1, + spans=[ + _output_segment(b"a" * 16, b"a" * 16, True), + _output_segment(b"b" * 16, b"a" * 16, False), + _output_segment(b"c" * 16, b"a" * 16, False), + _output_segment(b"d" * 16, b"a" * 16, False), + _output_segment(b"e" * 16, b"a" * 16, False), + ], + ) + } + + def test_kafka_slice_id(buffer: SpansBuffer) -> None: with override_options(DEFAULT_OPTIONS): buffer = SpansBuffer(assigned_shards=list(range(1)), slice_id=2)