Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -3228,6 +3228,12 @@
default=10 * 1024 * 1024,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
# When enabled, oversized segments are split into chunks instead of being dropped.
register(
"spans.buffer.flush-oversized-segments",
default=False,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
# TTL for keys in Redis. This is a downside protection in case of bugs.
register(
"spans.buffer.redis-ttl",
Expand Down
3 changes: 2 additions & 1 deletion src/sentry/spans/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ def _load_segment_data(
cursors[payload_key] = 0
payload_keys_map[key] = segment_payload_keys

flush_oversized_segments = options.get("spans.buffer.flush-oversized-segments")
dropped_segments: set[SegmentKey] = set()

def _add_spans(key: SegmentKey, raw_data: bytes) -> bool:
Expand All @@ -724,7 +725,7 @@ def _add_spans(key: SegmentKey, raw_data: bytes) -> bool:
decompress_latency_ms += (time.monotonic() - decompress_start) * 1000

sizes[key] = sizes.get(key, 0) + sum(len(span) for span in decompressed)
if sizes[key] > max_segment_bytes:
if sizes[key] > max_segment_bytes and not flush_oversized_segments:
metrics.incr("spans.buffer.flush_segments.segment_size_exceeded")
logger.warning("Skipping too large segment, byte size %s", sizes[key])
payloads.pop(key, None)
Expand Down
59 changes: 52 additions & 7 deletions src/sentry/spans/consumers/process/flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from collections.abc import Callable, Mapping
from concurrent.futures import Future
from functools import partial
from typing import Any

import orjson
import sentry_sdk
Expand Down Expand Up @@ -35,6 +36,42 @@
type ProduceToPipe = Callable[[int, KafkaPayload, int], None]


type SpanPayload = dict[str, Any]


def _chunk_segment(span_payloads: list[SpanPayload]) -> list[list[SpanPayload]]:
"""
Split a segment into chunks of spans payloads that fit under max_segment_bytes.
If all spans of the segment fit in one chunk, returns a single-element list.
"""
max_segment_bytes = options.get("spans.buffer.max-segment-bytes")

sizes = [len(orjson.dumps(s)) for s in span_payloads]
total_size = sum(sizes)
if total_size <= max_segment_bytes:
return [span_payloads]

chunks: list[list[SpanPayload]] = []
current_chunk: list[SpanPayload] = []
current_size = 0

for payload, payload_size in zip(span_payloads, sizes):
if current_chunk and current_size + payload_size > max_segment_bytes:
chunks.append(current_chunk)
current_chunk = []
current_size = 0
current_chunk.append(payload)
current_size += payload_size

if current_chunk:
chunks.append(current_chunk)

if len(chunks) > 1:
metrics.incr("spans.buffer.flusher.oversized_segments_chunked")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also track a distribution on the number of chunks we have? metrics.timing(..., len(chunks))?


return chunks


class MultiProducer:
"""
Manages multiple Kafka producers for load balancing across brokers/topics.
Expand Down Expand Up @@ -303,6 +340,7 @@ def produce(project_id: int, payload: KafkaPayload, dropped: int) -> None:

first_iteration = True
while not stopped.value:
flush_oversized_segments = options.get("spans.buffer.flush-oversized-segments")
system_now = int(time.time())
now = system_now + current_drift.value
flushed_segments = buffer.flush_segments(now=now)
Expand Down Expand Up @@ -334,13 +372,20 @@ def produce(project_id: int, payload: KafkaPayload, dropped: int) -> None:
continue

spans = [span.payload for span in flushed_segment.spans]
kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), [])
metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},
)
produce(flushed_segment.project_id, kafka_payload, len(spans))

if flush_oversized_segments:
chunks = _chunk_segment(spans)
else:
chunks = [spans]

for chunk in chunks:
kafka_payload = KafkaPayload(None, orjson.dumps({"spans": chunk}), [])
metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},
)
produce(flushed_segment.project_id, kafka_payload, len(chunk))
Comment on lines +375 to +388
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this logic belongs here.
The design of this system is such that buffer.py contains all the business logic to manage segments and flushing.
This is the consumer code, which takes care of run the business logic into a kafka consumer. If (unlikely) tomorrow, we started flushing into ObjectStore we would touch this but the business logic would not change.

Deciding to chunk the segment is a business logic concern not a kafka consumer concern. This logic should go into the buffer.py file.
Is there any specific reason you added it here ?


with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
for project_id, future, dropped in producer_futures:
Expand Down
61 changes: 60 additions & 1 deletion tests/sentry/spans/consumers/process/test_flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from sentry.conf.types.kafka_definition import Topic
from sentry.spans.buffer import Span, SpansBuffer
from sentry.spans.consumers.process.flusher import MultiProducer, SpanFlusher
from sentry.spans.consumers.process.flusher import MultiProducer, SpanFlusher, _chunk_segment
from sentry.testutils.helpers.options import override_options
from tests.sentry.spans.test_buffer import DEFAULT_OPTIONS

Expand Down Expand Up @@ -216,3 +216,62 @@ def hang_main(
next_step=Noop(),
produce_to_pipe=lambda project_id, payload, dropped: None,
)


def test_chunk_segment_under_limit() -> None:
spans = [{"span_id": "a"}, {"span_id": "b"}]
with override_options({"spans.buffer.max-segment-bytes": 10000}):
chunks = _chunk_segment(spans)
assert chunks == [spans]


def test_chunk_segment_splits_oversized() -> None:
# Mirror the segment payload shape used in buffer tests.
spans = [
{
"span_id": "a" * 16,
"is_segment": True,
"attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}},
},
{
"span_id": "b" * 16,
"is_segment": False,
"attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}},
},
{
"span_id": "c" * 16,
"is_segment": False,
"attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}},
},
{
"span_id": "d" * 16,
"is_segment": False,
"attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}},
},
{
"span_id": "e" * 16,
"is_segment": False,
"attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}},
},
]

with override_options({"spans.buffer.max-segment-bytes": 500}):
chunks = _chunk_segment(spans)

assert len(chunks) == 2
assert [len(chunk) for chunk in chunks] == [3, 2]

all_spans = [span for chunk in chunks for span in chunk]
assert all_spans == spans

for chunk in chunks[:-1]:
chunk_size = sum(len(orjson.dumps(s)) for s in chunk)
assert chunk_size <= 500


def test_chunk_segment_single_large_span() -> None:
"""A single span larger than max_bytes still gets its own chunk."""
spans = [{"span_id": "a" * 16}]
with override_options({"spans.buffer.max-segment-bytes": 10}):
chunks = _chunk_segment(spans)
assert chunks == [spans]
83 changes: 83 additions & 0 deletions tests/sentry/spans/test_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"spans.buffer.debug-traces": [],
"spans.buffer.evalsha-cumulative-logger-enabled": True,
"spans.process-segments.schema-validation": 1.0,
"spans.buffer.flush-oversized-segments": False,
}


Expand Down Expand Up @@ -967,6 +968,88 @@ def test_dropped_spans_emit_outcomes(
assert ingested_bytes_timing_calls[0].args[1] == expected_bytes


@mock.patch("sentry.spans.buffer.Project")
def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> None:
"""When flush-oversized-segments is enabled, oversized segments are kept instead of dropped."""
mock_project = mock.Mock()
mock_project.id = 1
mock_project.organization_id = 100
mock_project_model.objects.get_from_cache.return_value = mock_project

batch1 = [
Span(
payload=_payload("c" * 16),
trace_id="a" * 32,
span_id="c" * 16,
parent_span_id="b" * 16,
segment_id=None,
project_id=1,
),
Span(
payload=_payload("b" * 16),
trace_id="a" * 32,
span_id="b" * 16,
parent_span_id="a" * 16,
segment_id=None,
project_id=1,
),
]
batch2 = [
Span(
payload=_payload("d" * 16),
trace_id="a" * 32,
span_id="d" * 16,
parent_span_id="a" * 16,
segment_id=None,
project_id=1,
),
Span(
payload=_payload("e" * 16),
trace_id="a" * 32,
span_id="e" * 16,
parent_span_id="a" * 16,
segment_id=None,
project_id=1,
),
Span(
payload=_payload("a" * 16),
trace_id="a" * 32,
span_id="a" * 16,
parent_span_id=None,
project_id=1,
segment_id=None,
is_segment_span=True,
),
]

with override_options(
{"spans.buffer.max-segment-bytes": 100, "spans.buffer.flush-oversized-segments": True}
):
buffer.process_spans(batch1, now=0)
buffer.process_spans(batch2, now=0)
rv = buffer.flush_segments(now=11)

segment = rv[_segment_id(1, "a" * 32, "a" * 16)]
assert len(segment.spans) == 5
_normalize_output(rv)
assert rv == {
_segment_id(1, "a" * 32, "a" * 16): FlushedSegment(
queue_key=mock.ANY,
score=mock.ANY,
ingested_count=mock.ANY,
payload_keys=mock.ANY,
project_id=1,
spans=[
_output_segment(b"a" * 16, b"a" * 16, True),
_output_segment(b"b" * 16, b"a" * 16, False),
_output_segment(b"c" * 16, b"a" * 16, False),
_output_segment(b"d" * 16, b"a" * 16, False),
_output_segment(b"e" * 16, b"a" * 16, False),
],
)
}


def test_kafka_slice_id(buffer: SpansBuffer) -> None:
with override_options(DEFAULT_OPTIONS):
buffer = SpansBuffer(assigned_shards=list(range(1)), slice_id=2)
Expand Down
Loading