Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions pipeline/outputs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,29 @@ This plugin supports the following parameters:

| Key | Description | Default |
| :--- | :--- | :--- |
| `format` | Specify data format. Available formats: `json`, `msgpack`, `raw`. | `json` |
| `aws_msk_iam` | Enable AWS MSK IAM authentication. Requires Fluent Bit 4.0.4 or later. | `false` |
| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster used for region extraction. Required when `aws_msk_iam` is enabled. | _none_ |
| `brokers` | Single or multiple list of Kafka brokers. For example, `192.168.1.3:9092`, `192.168.1.4:9092`. | _none_ |
| `client_id` | Client ID to use when connecting to Kafka. | _none_ |
| `dynamic_topic` | Adds unknown topics (found in `topic_key`) to `topics`. Only a default topic needs to be configured in `topics`. | `false` |
| `format` | Specify data format. Available formats: `avro` (requires Avro encoder build option), `gelf`, `json`, `msgpack`, `raw`. | `json` |
| `gelf_full_message_key` | Key to use as the long message for GELF format output. | _none_ |
| `gelf_host_key` | Key to use as the host for GELF format output. | _none_ |
| `gelf_level_key` | Key to use as the log level for GELF format output. | _none_ |
| `gelf_short_message_key` | Key to use as the short message for GELF format output. | _none_ |
| `gelf_timestamp_key` | Key to use as the timestamp for GELF format output. | _none_ |
| `group_id` | Consumer group ID. | _none_ |
| `message_key` | Optional key to store the message. | _none_ |
| `message_key_field` | If set, the value of `message_key_field` in the record will indicate the message key. If not set nor found in the record, `message_key` will be used if set. | _none_ |
| `timestamp_key` | Set the key to store the record timestamp | `@timestamp` |
| `timestamp_format` | Specify timestamp format. Allowed values:`double`, `[iso8601](https://en.wikipedia.org/wiki/ISO_8601)` (seconds precision) or `iso8601_ns` (fractional seconds precision). | `double` |
| `brokers` | Single or multiple list of Kafka Brokers. For example, `192.168.1.3:9092`, `192.168.1.4:9092`. | _none_ |
| `topics` | Single entry or list of topics separated by comma (,) that Fluent Bit will use to send messages to Kafka. If only one topic is set, that one will be used for all records. Instead if multiple topics exists, the one set in the record by `Topic_Key` will be used. | `fluent-bit` |
| `topic_key` | If multiple `topics` exist, the value of `Topic_Key` in the record will indicate the topic to use. For example, if `Topic_Key` is `router` and the record is `{"key1": 123, "router": "route_2"}`, Fluent Bit will use `topic _route_2_`. If the value of `Topic_Key` isn't present in `topics`, then the first topic in the `topics` list will indicate the topic to be used. | _none_ |
| `dynamic_topic` | Adds unknown topics (found in `Topic_Key`) to `topics`. In `topics`, only a default topic needs to be configured. | `Off` |
| `queue_full_retries` | Fluent Bit queues data into `rdkafka` library. If the underlying library can't flush the records the queue might fill up, blocking new addition of records. `queue_full_retries` sets the number of local retries to enqueue the data. The interval between retries is 1 second. Setting the `queue_full_retries` value to `0` sets an unlimited number of retries. | `10` |
| `rdkafka.{property}` | `{property}` can be any [librdkafka properties](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) | _none_ |
| `raw_log_key` | When using the raw format and set, the value of `raw_log_key` in the record will be send to Kafka as the payload. | _none_ |
| `message_key_field` | If set, the value of `message_key_field` in the record will indicate the message key. If not set or not found in the record, `message_key` is used if set. | _none_ |
| `queue_full_retries` | Number of local retries to enqueue data when the `rdkafka` queue is full. The interval between retries is 1 second. Set to `0` for unlimited retries. | `10` |
| `raw_log_key` | When using the `raw` format, the value of `raw_log_key` in the record is sent to Kafka as the payload. | _none_ |
| `rdkafka.{property}` | `{property}` can be any [librdkafka property](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). | _none_ |
| `schema_id` | Avro schema ID. Requires the Avro encoder build option. | _none_ |
| `schema_str` | Avro schema string. Requires the Avro encoder build option. | _none_ |
| `timestamp_format` | Specify the timestamp format. Allowed values: `double`, `iso8601` (seconds precision), `iso8601_ns` (nanoseconds precision). | `double` |
| `timestamp_key` | Key to store the record timestamp. | `@timestamp` |
| `topic_key` | If multiple `topics` exist, the value of `topic_key` in the record indicates the topic to use. If the value isn't present in `topics`, the first topic in the list is used. | _none_ |
| `topics` | Single topic or comma-separated list of topics that Fluent Bit will use to send messages to Kafka. If multiple topics are set, the `topic_key` field in the record selects the topic. | `fluent-bit` |
| `workers` | The number of [workers](../../administration/multithreading.md#outputs) to perform flush operations for this output. | `0` |

Setting `rdkafka.log.connection.close` to `false` and `rdkafka.request.required.acks` to `1` are examples of recommended settings of `librdfkafka` properties.
Expand Down Expand Up @@ -53,7 +64,7 @@ pipeline:
outputs:
- name: kafka
match: '*'
host: 192.1681.3:9092
brokers: 192.168.1.3:9092
topics: test
```

Expand Down Expand Up @@ -150,7 +161,7 @@ pipeline:
DB /dbdir/some.db
Skip_Long_Lines On
Refresh_Interval 10
Parser some-parser
Parser some-parser

[FILTER]
Name kubernetes
Expand Down Expand Up @@ -205,7 +216,7 @@ pipeline:
outputs:
- name: kafka
match: '*'
host: 192.1681.3:9092
brokers: 192.168.1.3:9092
topics: test
format: raw
raw_log_key: payloadkey
Expand Down Expand Up @@ -266,12 +277,7 @@ If you are compiling Fluent Bit from source, ensure the following requirements a

### AWS MSK IAM configuration parameters

This plugin supports the following parameters:

| Property | Description | Type | Default |
|---------------------------|-----------------------------------------------------|---------|-------------------------------|
| `aws_msk_iam` | Optional. Enable AWS MSK IAM authentication. | Boolean | `false` |
| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction. Required if `aws_msk_iam` is set. | String | _none_ |
See `aws_msk_iam` and `aws_msk_iam_cluster_arn` in the [configuration parameters](#configuration-parameters) table.

### Configuration example

Expand Down
Loading