-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
A note for the community
- Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
- If you are interested in working on this issue or have submitted a pull request, please leave a comment
Problem
I am purposefully testing with an unreliable kafka destination - where I bounce the kafka broker, take it offline for a while, bring it back online, etc..
When the kafka server comes back online I sometimes see where errors increment by fairly large numbers for the kafka sink (viewed via vector top) and then when viewing the data downstream, I can see messages get were lost.
Additionally the logs indicate that events were dropped:
2025-11-07T21:36:46.766155Z ERROR sink{component_kind="sink" component_id=output component_type=kafka}:request{request_id=1268}: vector_common::internal_event::service: Service call failed. No retries or retries exhausted. error=Some(KafkaError (Message production error: PurgeQueue (Local: Purged in queue))) request_id=1268 error_type="request_failed" stage="sending"
2025-11-07T21:36:46.766202Z ERROR sink{component_kind="sink" component_id=output component_type=kafka}:request{request_id=1268}: vector_common::internal_event::component_events_dropped: Events dropped intentional=false count=1 reason="Service call failed. No retries or retries exhausted."
2025-11-07T21:36:46.766224Z ERROR sink{component_kind="sink" component_id=output component_type=kafka}:request{request_id=1261}: vector_common::internal_event::service: Internal log [Service call failed. No retries or retries exhausted.] is being suppressed to avoid flooding.
2025-11-07T21:36:46.766239Z ERROR sink{component_kind="sink" component_id=output component_type=kafka}:request{request_id=1261}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] is being suppressed to avoid flooding.
2025-11-07T21:36:49.244315Z WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Buffer counter underflowed. Clamping value to `0`. current=0 delta=6
2025-11-07T21:36:49.244358Z WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Internal log [Buffer counter underflowed. Clamping value to `0`.] is being suppressed to avoid flooding.
2025-11-07T21:37:33.246223Z WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Internal log [Buffer counter underflowed. Clamping value to `0`.] has been suppressed 1 times.
2025-11-07T21:37:33.246635Z WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Buffer counter underflowed. Clamping value to `0`. current=6 delta=8
2025-11-07T21:37:33.246645Z WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Internal log [Buffer counter underflowed. Clamping value to `0`.] is being suppressed to avoid flooding.
2025-11-07T21:38:03.244644Z WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Internal log [Buffer counter underflowed. Clamping value to `0`.] has been suppressed 1 times.
2025-11-07T21:38:03.245116Z WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Buffer counter underflowed. Clamping value to `0`. current=17056 delta=17072
What is worse, is when this happens vector seems to not recover.
I end up having to restart it to get it to send message to kafka.
I am not sure if it is something specific to my config or a bug.
I will note - the issue in intermittent - but I have been able to reproduce it a number of times.
Please note I am aiming to have a config that has 0 dropped events, keeps messages in order (per kafka partition), and minimizes duplicates (which sometimes otherwise happens in a producer to broker recovery).
Configuration
sinks:
output:
type: kafka
inputs:
- input
bootstrap_servers: "${KAFKA_BROKERS}"
topic: "my.topic"
key_field: _METADATA.kafka_key_field
encoding:
codec: json
except_fields:
- _METADATA
compression: lz4
acknowledgements:
enabled: true
batch:
timeout_secs: 5 # Flush every 5 seconds max || seems to push down to librdkafka queue.buffering.max.ms
buffer:
type: disk
max_size: 536870912 # 512MB
when_full: block
message_timeout_ms: 0
socket_timeout_ms: 10000
librdkafka_options:
"enable.idempotence": "true"
"socket.keepalive.enable": "true"
"request.timeout.ms": "5000"
Version
0.51.0-debian (container)
Debug Output
Example Data
No response
Additional Context
No response
References
No response