Skip to content

Kafka sink loosing messages, sometimes locks, during reliability testing #24193

@ryn9

Description

@ryn9

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

I am purposefully testing with an unreliable kafka destination - where I bounce the kafka broker, take it offline for a while, bring it back online, etc..

When the kafka server comes back online I sometimes see where errors increment by fairly large numbers for the kafka sink (viewed via vector top) and then when viewing the data downstream, I can see messages get were lost.

Additionally the logs indicate that events were dropped:

2025-11-07T21:36:46.766155Z ERROR sink{component_kind="sink" component_id=output component_type=kafka}:request{request_id=1268}: vector_common::internal_event::service: Service call failed. No retries or retries exhausted. error=Some(KafkaError (Message production error: PurgeQueue (Local: Purged in queue))) request_id=1268 error_type="request_failed" stage="sending"
2025-11-07T21:36:46.766202Z ERROR sink{component_kind="sink" component_id=output component_type=kafka}:request{request_id=1268}: vector_common::internal_event::component_events_dropped: Events dropped intentional=false count=1 reason="Service call failed. No retries or retries exhausted."
2025-11-07T21:36:46.766224Z ERROR sink{component_kind="sink" component_id=output component_type=kafka}:request{request_id=1261}: vector_common::internal_event::service: Internal log [Service call failed. No retries or retries exhausted.] is being suppressed to avoid flooding.
2025-11-07T21:36:46.766239Z ERROR sink{component_kind="sink" component_id=output component_type=kafka}:request{request_id=1261}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] is being suppressed to avoid flooding.
2025-11-07T21:36:49.244315Z  WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Buffer counter underflowed. Clamping value to `0`. current=0 delta=6
2025-11-07T21:36:49.244358Z  WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Internal log [Buffer counter underflowed. Clamping value to `0`.] is being suppressed to avoid flooding.
2025-11-07T21:37:33.246223Z  WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Internal log [Buffer counter underflowed. Clamping value to `0`.] has been suppressed 1 times.
2025-11-07T21:37:33.246635Z  WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Buffer counter underflowed. Clamping value to `0`. current=6 delta=8
2025-11-07T21:37:33.246645Z  WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Internal log [Buffer counter underflowed. Clamping value to `0`.] is being suppressed to avoid flooding.
2025-11-07T21:38:03.244644Z  WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Internal log [Buffer counter underflowed. Clamping value to `0`.] has been suppressed 1 times.
2025-11-07T21:38:03.245116Z  WARN sink{component_kind="sink" component_id=output component_type=kafka}: vector_buffers::buffer_usage_data: Buffer counter underflowed. Clamping value to `0`. current=17056 delta=17072

What is worse, is when this happens vector seems to not recover.
I end up having to restart it to get it to send message to kafka.

I am not sure if it is something specific to my config or a bug.

I will note - the issue in intermittent - but I have been able to reproduce it a number of times.

Please note I am aiming to have a config that has 0 dropped events, keeps messages in order (per kafka partition), and minimizes duplicates (which sometimes otherwise happens in a producer to broker recovery).

Configuration

sinks:
  output:
    type: kafka
    inputs: 
      - input
    bootstrap_servers: "${KAFKA_BROKERS}"
    topic: "my.topic"
    key_field: _METADATA.kafka_key_field
    encoding:
      codec: json
      except_fields: 
        - _METADATA
    compression: lz4
    acknowledgements:
      enabled: true
    batch:
      timeout_secs: 5  # Flush every 5 seconds max || seems to push down to librdkafka queue.buffering.max.ms
    buffer:
      type: disk
      max_size: 536870912  # 512MB
      when_full: block
    message_timeout_ms: 0
    socket_timeout_ms: 10000
    librdkafka_options:
      "enable.idempotence": "true"
      "socket.keepalive.enable": "true"
      "request.timeout.ms": "5000"

Version

0.51.0-debian (container)

Debug Output


Example Data

No response

Additional Context

No response

References

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    type: bugA code related bug.

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions