Skip to content

in_amqp: Initial implementation for in_amqp plugin#11427

Open
matwey wants to merge 1 commit intofluent:masterfrom
matwey:in_amqp
Open

in_amqp: Initial implementation for in_amqp plugin#11427
matwey wants to merge 1 commit intofluent:masterfrom
matwey:in_amqp

Conversation

@matwey
Copy link

@matwey matwey commented Feb 4, 2026

Here is initial implementation for input AMQP plugin. I guess it is stable enough to obtain some review. The plugin was tested with RabbitMQ 4.2.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change:
    fluent-bit -i amqp -pqueue=flb.in_amqp -o stdout
  • [ N/A ] Debug log output from testing the change
  • [ N/A ] Attached Valgrind output that shows no leaks or memory corruption was found

Documentation


Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features
    • Added an AMQP input plugin: consumes queue messages, emits envelope metadata, supports optional parsing, SSL/TLS, pause/resume, and configurable reconnect/retry behavior; plugin is now available in builds.
  • Chores
    • Build now checks for the AMQP client library and will disable AMQP support with a status message if the required library is not found.
  • Chores
    • Added a build-time option to enable/disable the AMQP input plugin.

@coderabbitai
Copy link

coderabbitai bot commented Feb 4, 2026

📝 Walkthrough

Walkthrough

Adds a new AMQP input plugin: build gating for rabbitmq-c, plugin registration and CMake entries, public header types for connection/plugin state, and a full plugin implementation handling AMQP connections, consumption, parsing, encoding, retry/reconnect, and lifecycle callbacks.

Changes

Cohort / File(s) Summary
Top-level build gating
CMakeLists.txt
Inserted find_package(rabbitmq-c) checks and conditional blocks that disable FLB_IN_AMQP with a STATUS message when rabbitmq-c is not found (two insertion points).
Plugin registry
plugins/CMakeLists.txt
Added registration for the input plugin: REGISTER_IN_PLUGIN("in_amqp").
Plugin build config
plugins/in_amqp/CMakeLists.txt
New CMake for plugin: declares in_amqp.c and registers plugin via FLB_PLUGIN(in_amqp "${src}" rabbitmq::rabbitmq).
Plugin header
plugins/in_amqp/in_amqp.h
New header declaring struct flb_amqp_connection and struct flb_amqp with fields for connection/socket/channel, URI/queue/parser strings, reconnect settings, encoder, parser pointer, input instance and retry state.
Plugin implementation
plugins/in_amqp/in_amqp.c
New AMQP input implementation (~691 LOC): connection lifecycle, socket/channel management, consumer loop, envelope metadata extraction, optional parser integration, encoding/emission to Fluent Bit, retry/reconnect collectors, and lifecycle callbacks (init/collect/pause/resume/exit).
Plugin option flag
cmake/plugins_options.cmake
Added input option DEFINE_OPTION(FLB_IN_AMQP "Enable AMQP input plugin" ON) to enable/disable the AMQP plugin at configure time.

Sequence Diagrams

sequenceDiagram
    participant FLB as Fluent Bit
    participant Plugin as in_amqp Plugin
    participant Broker as AMQP Broker
    participant Parser as Parser
    participant Encoder as Encoder

    FLB->>Plugin: in_amqp_init()
    Plugin->>Plugin: load config (uri, queue, parser, reconnect)
    Plugin->>Broker: open socket / login / open channel / basic_consume
    Plugin->>Parser: init parser (if configured)
    Plugin->>Encoder: init encoder
    Plugin->>FLB: register collect callback
Loading
sequenceDiagram
    participant Collector as in_amqp collect loop
    participant Broker as AMQP Broker
    participant Parser as Parser
    participant Encoder as Encoder
    participant FLB as Fluent Bit

    Collector->>Broker: amqp_consume_message() / poll
    alt delivery received
        Broker->>Collector: envelope (body, headers, props, routing_key)
        Collector->>Parser: parse body (optional)
        Parser-->>Collector: parsed record / error
        Collector->>Collector: attach metadata (routing_key, headers, timestamp)
        Collector->>Encoder: encode event
        Encoder-->>Collector: msgpack buffer
        Collector->>FLB: emit record
    else error / connection loss
        Broker-->>Collector: error / timeout
        Collector->>Collector: schedule reconnect via retry collector
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I hopped into queues where routing keys gleam,
Opened channels, nibbled bytes from the stream,
I parse and I pack, then emit what I find,
If errors arise, I retry and rewind,
A rabbitly plugin, reconnecting with vim.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 13.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main change: the initial implementation of an AMQP input plugin (in_amqp) for Fluent Bit, which aligns with all the files added in this changeset.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ead056ea6c

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Fix all issues with AI agents
In `@plugins/in_amqp/in_amqp.c`:
- Around line 579-583: The cast in in_amqp_exit improperly dereferences the
unused parameter using "(void) *config;", which can cause undefined behavior if
config is NULL; change that to the standard unused-parameter idiom "(void)
config;" inside the in_amqp_exit function so you do not dereference config while
keeping the variable marked as intentionally unused.
- Around line 446-516: The ctx struct allocated in in_amqp_init is created with
flb_malloc so its members (notably ctx->encoder) are uninitialized and later
in_amqp_config_destroy calls flb_log_event_encoder_destroy which can free
invalid pointers; change the allocation in in_amqp_init to use flb_calloc (or
allocate with flb_malloc followed by memset to zero) so struct flb_amqp fields
(including ctx->encoder) are zero-initialized before any early teardown; keep
all other initialization logic the same.
- Around line 170-176: The flb_plg_trace call logs AMQP body using "%s" which
assumes NUL-termination and can read past the buffer; change the trace to use a
length-limited format (use "%.*s" with body->len) so it prints at most body->len
bytes from body->bytes. Update the flb_plg_trace invocation(s) in the parsing
block around flb_parser_do (referencing ctx->parser, flb_parser_do, body->bytes,
body->len, and ctx->ins) to pass (int)body->len and body->bytes accordingly and
keep the flb_plg_error call as-is.
- Around line 182-218: The code reads optional fields from
amqp_basic_properties_t (properties->timestamp, content_type, content_encoding,
correlation_id, reply_to, headers) without checking the properties->_flags
bitmask; update the logic around flb_time_set(&out_time, properties->timestamp,
0) and each call to in_amqp_append_metadata_bytes/in_amqp_append_metadata_entry
so you first test the appropriate flag (AMQP_BASIC_TIMESTAMP_FLAG,
AMQP_BASIC_CONTENT_TYPE_FLAG, AMQP_BASIC_CONTENT_ENCODING_FLAG,
AMQP_BASIC_CORRELATION_ID_FLAG, AMQP_BASIC_REPLY_TO_FLAG,
AMQP_BASIC_HEADERS_FLAG) before accessing the field; only call flb_time_set or
in_amqp_append_metadata_* when the corresponding flag is set, otherwise skip to
the fallback (flb_time_get) or omit that metadata entry to avoid reading
uninitialized memory (refer to symbols: properties, properties->_flags,
AMQP_BASIC_*_FLAG, out_time, flb_time_set, flb_time_get,
in_amqp_append_metadata_bytes, in_amqp_append_metadata_entry, headers,
ctx->encoder).
- Around line 350-386: The AMQP_RESPONSE_LIBRARY_EXCEPTION branch currently
handles only UNEXPECTED_STATE, CONNECTION_CLOSED and TIMEOUT; other
ret.library_error values from amqp_consume_message() should be handled to avoid
tight CPU loops—add an else clause after the existing checks that logs the
unexpected ret.library_error via flb_plg_warn (including the numeric error),
tears down the connection with in_amqp_connection_destroy(ctx) and either return
a non-zero error or call in_amqp_consumer_start(in, config, in_context) to
trigger a reconnect/backoff; reference the amqp_consume_message() return
handling, ret.library_error, AMQP_RESPONSE_LIBRARY_EXCEPTION, flb_plg_warn,
in_amqp_connection_destroy and in_amqp_consumer_start when applying the change.
- Around line 398-423: When reconnecting, the old socket collector
(ctx->coll_id) is left registered and must be removed before creating a new one;
modify the reconnect path (in the code that calls in_amqp_connection_init and
then sets ctx->coll_id) to check if ctx->coll_id >= 0 and call
flb_input_collector_destroy(ctx->coll_id, in) (or the appropriate
collector-remove API used elsewhere) to delete the stale collector, then clear
ctx->coll_id and proceed to create the new collector with
flb_input_set_collector_socket (refer to ctx->coll_id, in_amqp_collect,
amqp_socket_get_sockfd and in_amqp_connection_init).

@matwey matwey force-pushed the in_amqp branch 2 times, most recently from 3a5b411 to d3d939e Compare February 4, 2026 13:19
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@plugins/in_amqp/in_amqp.c`:
- Around line 421-433: When collector creation or start fails, the code must
clean up the partially-created collector and the open AMQP connection to avoid
leaving the plugin stuck: if flb_input_set_collector_socket succeeded but
flb_input_collector_start failed, call flb_input_collector_remove(ctx->coll_id)
(or the equivalent collector-remove API) and set ctx->coll_id = -1; in both
failure branches close the underlying socket (c->sock) and tear down the AMQP
connection using the library cleanup functions (e.g., amqp_socket_close(c->sock)
and the appropriate amqp connection destroy/close function), clear any
connection pointers in ctx, and then return the error.
- Around line 346-386: in in_amqp_collect(), when amqp_consume_message() returns
AMQP_RESPONSE_SERVER_EXCEPTION or AMQP_RESPONSE_NONE, call
in_amqp_log_reply_error(&ret, in) to record the error, then call
in_amqp_connection_destroy(ctx) and restart the consumer by returning
in_amqp_consumer_start(in, config, in_context); this mirrors the existing
library-exception handling flow and prevents a busy spin; reference the
amqp_consume_message return handling, in_amqp_log_reply_error(),
in_amqp_connection_destroy(), and in_amqp_consumer_start().
- Around line 592-595: The default AMQP URI "amqp://" is missing a HOST and will
cause amqp_parse_url to fail; update the default for the config map entry that
sets struct flb_amqp->uri (the FLB_CONFIG_MAP_STR for "uri") to a parseable
value (either NULL to allow amqp_default_connection_info or a concrete
"amqp://localhost") and ensure any code that reads flb_amqp->uri and calls
amqp_parse_url handles NULL appropriately by falling back to
amqp_default_connection_info; locate the config map entry (FLB_CONFIG_MAP_STR,
"uri", "amqp://") and change the default to NULL or "amqp://localhost" and
adjust initialization paths that call amqp_parse_url to accommodate the new
default.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Fix all issues with AI agents
In `@plugins/in_amqp/in_amqp.c`:
- Around line 139-141: Fix the typo in the comment inside the switch handling
AMQP field kinds: change the comment "fallthrought" to the correct "fallthrough"
where the case AMQP_FIELD_KIND_UTF8 falls into case AMQP_FIELD_KIND_BYTES so the
intent is clear; update the comment text near the AMQP_FIELD_KIND_UTF8 /
AMQP_FIELD_KIND_BYTES case labels to the correct spelling.
- Around line 413-451: in in_amqp_consumer_start: remove the direct call to
in_amqp_collect (the mutual recursion risk) and instead finish setup after
resetting ctx->retry and pausing the retry collector; rely on the socket/IO
collector you registered earlier to invoke in_amqp_collect naturally.
Specifically, stop calling in_amqp_collect(in, config, ctx) here, keep the
flb_input_collector_pause(ctx->retry_coll_id, in) and return 0 so the existing
socket collector will handle pending messages; ensure no other code path assumes
in_amqp_consumer_start will synchronously process the queue.
- Around line 276-327: The loop in in_amqp_collect can recurse unbounded by
calling in_amqp_consumer_start on AMQP_STATUS_CONNECTION_CLOSED; change the
behavior so that when ret.library_error == AMQP_STATUS_CONNECTION_CLOSED you
call in_amqp_connection_destroy(ctx) and break out of the for(;;) loop (or
return an error code) instead of calling in_amqp_consumer_start to avoid
recursion, letting the outer retry/timer logic handle reconnects; also add
catch-all branches: handle other ret.reply_type values
(AMQP_RESPONSE_SERVER_EXCEPTION, AMQP_RESPONSE_NONE) by logging and
breaking/returning appropriately, and add an else for ret.library_error to log
the unexpected error and break/return so the loop does not spin indefinitely.
- Around line 566-572: in in_amqp_pause ensure you only call
flb_input_collector_pause for the connection collector when a connection exists:
check ctx->conn.coll_id != -1 before invoking
flb_input_collector_pause(ctx->conn.coll_id, ctx->ins); still pause
ctx->retry_coll_id unconditionally as before. This mirrors the guard used in
in_amqp_resume and prevents attempting to pause a non-existent collector.
- Around line 585-593: Add a NULL guard in in_amqp_exit: check whether the
incoming data pointer is NULL before casting to struct flb_amqp *ctx and calling
in_amqp_config_destroy; if data is NULL simply return 0. This ensures
in_amqp_config_destroy (which dereferences ctx->encoder, ctx->retry_coll_id,
ctx->ins) is only called when ctx is non-NULL and avoids null pointer
dereference when the framework invokes cb_exit with a NULL context.

Comment on lines +276 to +327
for (;;) {
amqp_rpc_reply_t ret;
amqp_envelope_t envelope;

amqp_maybe_release_buffers(c->conn);
ret = amqp_consume_message(c->conn, &envelope, &tv, 0);

if (ret.reply_type == AMQP_RESPONSE_NORMAL) {
in_amqp_handle_envelope(ctx, &envelope);

amqp_destroy_envelope(&envelope);
} else if (ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
/*
* If ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and
* ret.library_error == AMQP_STATUS_UNEXPECTED_STATE, a frame
* other than AMQP_BASIC_DELIVER_METHOD was received, the caller
* should call amqp_simple_wait_frame() to read this frame and
* take appropriate action.
*/
if (ret.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
if (amqp_simple_wait_frame(c->conn, &frame) != AMQP_STATUS_OK) {
break;
}

if (frame.frame_type == AMQP_FRAME_METHOD) {
switch (frame.payload.method.id) {
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *)frame.payload.method.decoded;
flb_plg_warn(in, "AMQP server channel error %hu, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *)frame.payload.method.decoded;
flb_plg_warn(in, "AMQP server connection error %hu, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
default:
flb_plg_warn(in, "An unexpected AMQP method id 0x%08X", frame.payload.method.id);
}
}
} else if (ret.library_error == AMQP_STATUS_CONNECTION_CLOSED) {
in_amqp_connection_destroy(ctx);

return in_amqp_consumer_start(in, config, in_context);
} else if (ret.library_error == AMQP_STATUS_TIMEOUT) {
return 0;
}
}
}

return 0;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Unbounded recursion between in_amqp_collect and in_amqp_consumer_start.

When AMQP_STATUS_CONNECTION_CLOSED is detected at line 317, the code calls in_amqp_consumer_start, which on success calls in_amqp_collect again (line 443). If the new connection also drops immediately, this chain recurses indefinitely, risking a stack overflow.

Additionally, unhandled reply_type values (AMQP_RESPONSE_SERVER_EXCEPTION, AMQP_RESPONSE_NONE) and unhandled library_error values (anything other than UNEXPECTED_STATE, CONNECTION_CLOSED, TIMEOUT) cause the for(;;) loop to spin without breaking or reconnecting.

Consider:

  1. Breaking out of in_amqp_collect on connection failure instead of recursing — let the retry timer handle reconnection.
  2. Adding a catch-all else for unhandled library_error values and an else for unhandled reply_type values.
Proposed fix (break instead of recurse; add catch-all handlers)
             } else if (ret.library_error == AMQP_STATUS_CONNECTION_CLOSED) {
                 in_amqp_connection_destroy(ctx);
-
-                return in_amqp_consumer_start(in, config, in_context);
+                /* Let the retry timer handle reconnection */
+                if (!flb_input_collector_running(ctx->retry_coll_id, in)) {
+                    flb_input_collector_resume(ctx->retry_coll_id, in);
+                }
+                return 0;
             } else if (ret.library_error == AMQP_STATUS_TIMEOUT) {
                 return 0;
+            } else {
+                flb_plg_error(in, "AMQP consume error: %s", amqp_error_string2(ret.library_error));
+                in_amqp_connection_destroy(ctx);
+                if (!flb_input_collector_running(ctx->retry_coll_id, in)) {
+                    flb_input_collector_resume(ctx->retry_coll_id, in);
+                }
+                return 0;
             }
+        } else {
+            in_amqp_log_reply_error(in, ret, "AMQP consume error");
+            in_amqp_connection_destroy(ctx);
+            if (!flb_input_collector_running(ctx->retry_coll_id, in)) {
+                flb_input_collector_resume(ctx->retry_coll_id, in);
+            }
+            return 0;
         }
🤖 Prompt for AI Agents
In `@plugins/in_amqp/in_amqp.c` around lines 276 - 327, The loop in
in_amqp_collect can recurse unbounded by calling in_amqp_consumer_start on
AMQP_STATUS_CONNECTION_CLOSED; change the behavior so that when
ret.library_error == AMQP_STATUS_CONNECTION_CLOSED you call
in_amqp_connection_destroy(ctx) and break out of the for(;;) loop (or return an
error code) instead of calling in_amqp_consumer_start to avoid recursion,
letting the outer retry/timer logic handle reconnects; also add catch-all
branches: handle other ret.reply_type values (AMQP_RESPONSE_SERVER_EXCEPTION,
AMQP_RESPONSE_NONE) by logging and breaking/returning appropriately, and add an
else for ret.library_error to log the unexpected error and break/return so the
loop does not spin indefinitely.

@coderabbitai
Copy link

coderabbitai bot commented Feb 9, 2026

Caution

Failed to replace (edit) comment. This is likely due to insufficient permissions or the comment being deleted.

Error details
{"name":"HttpError","status":500,"request":{"method":"PATCH","url":"https://api.github.com/repos/fluent/fluent-bit/issues/comments/3847295303","headers":{"accept":"application/vnd.github.v3+json","user-agent":"octokit.js/0.0.0-development octokit-core.js/7.0.6 Node.js/24","authorization":"token [REDACTED]","content-type":"application/json; charset=utf-8"},"body":{"body":"<!-- This is an auto-generated comment: summarize by coderabbit.ai -->\n<!-- This is an auto-generated comment: failure by coderabbit.ai -->\n\n> [!CAUTION]\n> ## Review failed\n> \n> Failed to post review comments\n\n<!-- end of auto-generated comment: failure by coderabbit.ai -->\n\n<!-- This is an auto-generated comment: review paused by coderabbit.ai -->\n\n> [!NOTE]\n> ## Reviews paused\n> \n> It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the `reviews.auto_review.auto_pause_after_reviewed_commits` setting.\n> \n> Use the following commands to manage reviews:\n> - `@coderabbitai resume` to resume automatic reviews.\n> - `@coderabbitai review` to trigger a single review.\n> \n> Use the checkboxes below for quick actions:\n> - [ ] <!-- {\"checkboxId\": \"7f6cc2e2-2e4e-497a-8c31-c9e4573e93d1\"} --> ▶️ Resume reviews\n> - [ ] <!-- {\"checkboxId\": \"e9bb8d72-00e8-4f67-9cb2-caf3b22574fe\"} --> 🔍 Trigger review\n\n<!-- end of auto-generated comment: review paused by coderabbit.ai -->\n\n<!-- walkthrough_start -->\n\n<details>\n<summary>📝 Walkthrough</summary>\n\n## Walkthrough\n\nAdds a new AMQP input plugin: build gating for rabbitmq-c, plugin registration and CMake entries, public header types for connection/plugin state, and a full plugin implementation handling AMQP connections, consumption, parsing, encoding, retry/reconnect, and lifecycle callbacks.\n\n## Changes\n\n|Cohort / File(s)|Summary|\n|---|---|\n|**Top-level build** <br> `CMakeLists.txt`|Added `find_package(rabbitmq-c)` and a guard that disables `FLB_IN_AMQP` with a STATUS message if rabbitmq-c is not found.|\n|**Plugin registry** <br> `plugins/CMakeLists.txt`|Registered the new input plugin via `REGISTER_IN_PLUGIN(\"in_amqp\")`.|\n|**Plugin build** <br> `plugins/in_amqp/CMakeLists.txt`|New CMake for `in_amqp`: declares `in_amqp.c` and registers the plugin with `FLB_PLUGIN(... rabbitmq::rabbitmq)`.|\n|**Plugin header** <br> `plugins/in_amqp/in_amqp.h`|Adds `struct flb_amqp_connection` and `struct flb_amqp` types containing connection, config, encoder, parser, and runtime fields.|\n|**Plugin implementation** <br> `plugins/in_amqp/in_amqp.c`|New AMQP input implementation: connection/socket/channel management, consumer loop, envelope metadata extraction, optional parser integration, encoding/emission, retry/reconnect scheduling, collectors, and lifecycle callbacks (init/collect/pause/resume/exit).|\n\n## Sequence Diagrams\n\n```mermaid\nsequenceDiagram\n    participant FLB as Fluent Bit\n    participant Plugin as in_amqp Plugin\n    participant AMQP as AMQP Broker\n    participant Parser as Parser\n    participant Encoder as Encoder\n\n    FLB->>Plugin: in_amqp_init()\n    Plugin->>Plugin: parse config (uri, queue, parser, retries)\n    Plugin->>Parser: init parser (if configured)\n    Plugin->>Encoder: init encoder\n    Plugin->>AMQP: open socket / login / open channel / basic_consume\n    Plugin->>FLB: register collect callback\n```\n\n```mermaid\nsequenceDiagram\n    participant Plugin as in_amqp Plugin\n    participant AMQP as AMQP Broker\n    participant Parser as Parser\n    participant Encoder as Encoder\n    participant FLB as Fluent Bit\n\n    Plugin->>AMQP: amqp_consume_message() (poll)\n    alt delivery received\n        AMQP->>Plugin: envelope (body, headers, routing_key, props)\n        Plugin->>Parser: parse body (optional)\n        Parser-->>Plugin: parsed record / raw body\n        Plugin->>Plugin: attach metadata (routing_key, headers, props)\n        Plugin->>Encoder: encode event\n        Encoder-->>Plugin: msgpack buffer\n        Plugin->>FLB: emit record\n    else connection/error\n        AMQP-->>Plugin: error / timeout\n        Plugin->>Plugin: schedule reconnect via retry collector\n    end\n```\n\n## Estimated code review effort\n\n🎯 4 (Complex) | ⏱️ ~45 minutes\n\n## Poem\n\n> 🐰 I hopped into queues where headers gleam,  \n> Opened channels, nibbled bytes from the stream,  \n> I parse and pack, then retry with a grin,  \n> Reconnect, emit — let the messages in,  \n> A floppy‑eared patch, now ready to spin.\n\n</details>\n\n<!-- walkthrough_end -->\n\n\n<!-- pre_merge_checks_walkthrough_start -->\n\n<details>\n<summary>🚥 Pre-merge checks | ✅ 2 | ❌ 1</summary>\n\n<details>\n<summary>❌ Failed checks (1 warning)</summary>\n\n|     Check name     | Status     | Explanation                                                                           | Resolution                                                                         |\n| :----------------: | :--------- | :------------------------------------------------------------------------------------ | :--------------------------------------------------------------------------------- |\n| Docstring Coverage | ⚠️ Warning | Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. | Write docstrings for the functions missing them to satisfy the coverage threshold. |\n\n</details>\n<details>\n<summary>✅ Passed checks (2 passed)</summary>\n\n|     Check name    | Status   | Explanation                                                                                                                                                                                             |\n| :---------------: | :------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |\n| Description Check | ✅ Passed | Check skipped - CodeRabbit’s high-level summary is enabled.                                                                                                                                             |\n|    Title check    | ✅ Passed | The title 'Initial implementation for in_amqp plugin' directly and clearly describes the main change: adding a new AMQP input plugin to Fluent Bit, which is the primary objective of the pull request. |\n\n</details>\n\n<sub>✏️ Tip: You can configure your own custom pre-merge checks in the settings.</sub>\n\n</details>\n\n<!-- pre_merge_checks_walkthrough_end -->\n\n<!-- finishing_touch_checkbox_start -->\n\n<details>\n<summary>✨ Finishing touches</summary>\n\n- [ ] <!-- {\"checkboxId\": \"7962f53c-55bc-4827-bfbf-6a18da830691\"} --> 📝 Generate docstrings\n<details>\n<summary>🧪 Generate unit tests (beta)</summary>\n\n- [ ] <!-- {\"checkboxId\": \"f47ac10b-58cc-4372-a567-0e02b2c3d479\", \"radioGroupId\": \"utg-output-choice-group-unknown_comment_id\"} -->   Create PR with unit tests\n- [ ] <!-- {\"checkboxId\": \"07f1e7d6-8a8e-4e23-9900-8731c2c87f58\", \"radioGroupId\": \"utg-output-choice-group-unknown_comment_id\"} -->   Post copyable unit tests in a comment\n\n</details>\n\n</details>\n\n<!-- finishing_touch_checkbox_end -->\n\n<!-- tips_start -->\n\n---\n\nThanks for using [CodeRabbit](https://coderabbit.ai?utm_source=oss&utm_medium=github&utm_campaign=fluent/fluent-bit&utm_content=11427)! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.\n\n<details>\n<summary>❤️ Share</summary>\n\n- [X](https://twitter.com/intent/tweet?text=I%20just%20used%20%40coderabbitai%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20the%20proprietary%20code.%20Check%20it%20out%3A&url=https%3A//coderabbit.ai)\n- [Mastodon](https://mastodon.social/share?text=I%20just%20used%20%40coderabbitai%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20the%20proprietary%20code.%20Check%20it%20out%3A%20https%3A%2F%2Fcoderabbit.ai)\n- [Reddit](https://www.reddit.com/submit?title=Great%20tool%20for%20code%20review%20-%20CodeRabbit&text=I%20just%20used%20CodeRabbit%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20proprietary%20code.%20Check%20it%20out%3A%20https%3A//coderabbit.ai)\n- [LinkedIn](https://www.linkedin.com/sharing/share-offsite/?url=https%3A%2F%2Fcoderabbit.ai&mini=true&title=Great%20tool%20for%20code%20review%20-%20CodeRabbit&summary=I%20just%20used%20CodeRabbit%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20proprietary%20code)\n\n</details>\n\n<sub>Comment `@coderabbitai help` to get the list of available commands and usage tips.</sub>\n\n<!-- tips_end -->\n\n<!-- internal state start -->\n\n\n<!-- DwQgtGAEAqAWCWBnSTIEMB26CuAXA9mAOYCmGJATmriQCaQDG+Ats2bgFyQAOFk+AIwBWJBrngA3EsgEBPRvlqU0AgfFwA6NPEgQAfACgjoCEYDEZyAAUASpETZWaCrKPR1AGxJcAkhnXwaB4ozNxebBi41PD4WABm+HzwGAD6aMwAjtw8HthEyZCQBgByjgKUXACMlQAsAEwA7IUGAKo2ADJcsLi43IgcAPQD+biw2AIaTMwDcbnsM3ORYGq4A9zYHh4D1fVNRS2IFZDM1ADuJPJFAMr42BQMJJACVBgMsFzJaZnZgEmEMM6kXBPF5vLgnRA0PjXKK4bD9fjcMjNGwkCTwEjnCjwuiIeCIigAGkg/joAiCiAA1vAidxqBR4AwKWAISRuLAyIgiQ8PEESD1kpU6gAOABszXaKhIHnhtHwDEQYAoJAy2HgSvoRQAwkrqHR0JxIHUAAx1EVgE3mmrQQUcACsIo4RpqAC0jAARaQMenccSxDgGKAAQVotGQmBQ/nEQRCYRIERhMSw+Di6CwgYAsgBFKwR9ZAsJ5AoACk+6SyAEpIAk+AAxRZAgBC6g0AesuXyWFOaGQNBZ9FO6lgkBsKhWWcgNQ0dRbUAAogAPdKxhQYOLwIh3aKxHgUfBopS0LgAA1m2HYy3Uuh0ZeyYG4KpIZ4AvLMJqXvrp8PZcLK8EeZ5AmqxLg9ICHgiSMOyjIeEgQLqHG/QGIUc6LqEXgrmuG5UL6GBcLwe7wAeLbIZAHpgUQkAePgFG3L0eBVruzCQL24gYEQXDFAMgbEbokAAGpBEQ9IYPQtF5vYsD4AObHEl+XhoBS8KcdxSG8W6cqOOwW5YEqKpqnQXDYK8UEUnQAHtAyHLeAoSj2OMzDqDQ9BGUofCBrSbyPNORo8JQO74CIYiQLpqpKvGiAtgYFiQAA8sIojiFIyAOE4LhuJ41l+AE0bwGhcZaThVYQe+WQ5IWWBFKUzDlBQVS1I0zRtJ0kDdL0/RDCMYwTFMCxnpEvXnisawbFsOwNfshy1ccZwXM0Nx3A8wKYKCEZfKVfzQACfJLcZYLdpCc0wnCXD4IiFUGCiaIYpQ2KhnilBEiStBktKVI0nSDJMiybIclyUq8vyGCCqK4qStKXCyvKirKqFepajqTn6lwxqmuadSWtadR2g6TqugYHqIF6eI4f6QYhmGWDJNlwS5bG8bafwKbhhm2a5vRBYdpAJapDelbVpAdZ9Y2zatlY7YFCxeoDqMw6juo46TtOrYLku6FMKu66boV+H7gZkAnvWF5AmA14fneD7Pq+GglbeX4Qr+uD/q2QGRKB4F8J50GwSgNDMIhJEq3lGGa9hiZ4buuu0DxUBkXklHUfweDiXEjHMdIrHsZAynR/xgnCaJSf0YgknSRRGBySQClKVxOfqQwmmRAzIX6YekBGZ7plR62FkPBghxcEwtkOAIDk9HqLl+e5aCeYaGg+fi/mBUCzdhewEVGPoxjgFAZCiUz4HEGQyiI1M8bh/w8ViJI0hPPIg/KKozbaLoYCGCYUBwKgqDhmgB+kOQ2E9Sn3YFwKgpw7KpXkHIGyD8VhaB0JvLepgDCanTApEgFkIQRVwPOTgBgABEhCoqWEDD4Q+ADdT0BSicFwjNIKYFIIgDekAWY5l4JQGGSB4LICILqXwfdKCI1wKcL8rDoY8kRuw5uuJeyQVEIpCMgE0GmUwbgbBuCc7BgPFWZItAUgeQpGgUgRYqCP1wJkMADByyqSgD4FMNZ2gNhSD4YoKRWEoGQDFYoqZ6CmJWBYhgHjZJAgSC5IktAkAqHQg4pxLi3FZhzGiNAAtHEpBilYaAPgvFFhic41x7iYo1hrJWaWQ5klXGgIGaALQrjHGkIgIxJBWzQBESgJQkQGTRjQCGAIsQwzcERM4RRhxaSAJgcFEg+Q+lEgEPgGW+JqwORkqMR4DS2CQF4RneO+RAn83cQ4AZiRNCi3GDBBgAwSDzm4EcvUgYrA+AmScZIyB26wAYXQAA3LJYkGJICXOuRQRGiBZDVXwNKfgSRIiUDiNPaQkVzAkI8JCbSPYvwrMgEoBgPJQ59Lof8m5ok+DrAEGcv5HTxDSGYcUWITSDAWXIMgN47zW4AGohQDHNAQohAZkEc2eQMVB6DVHqLwYQ/BxCWFkP/sfPU1DnDyGTPQtilLWxaL1OQcBNsyqcyVPkCEOKsDQOeYI5IFEbCzgAOI+AqbOGweSUhWHaC0K1xQiz4JtvgysBQ+V9wFcojBsERUaBgAgZAsob7lyBMwRQ8A4jyEwPIS5sFTXbIZBCv5FBdx8DeSJGCbEvmxA8Imq5mBQzMXZKtH1yBdWwVoaU8tjwNVkpArIeFErAxIuPomVFDaMWiGxSivFVyCXpuJaS9gAQVW2IwBIOUDNukHmPOaq1Nq7VxMdc6lxbqPXliPIotc6EjxVr9UKwNGgcGOyMPSm+TLlWssqByo0RhZwQlypQ8ZSorrgJIHEasBp0x0HgI4Ll4qeVgCMMem2J6VFnovf6MV7apVH0AVQxwNCFUplvYwowWiwzHCpicYIgrTLBywgzfm6KtU+tJqRftzgb7JMQLce4jwYIQn1oge4e71ZRCpjJI8NtJhHlbCiPVkIewVp9ZAet6KRxmPHFMa55BIiQCSfrXJG6XXczWtkfBAASAA3pxhgABffBwU5bmIyBwDgfj1CZF3QilhnaDU9vRZigdOFkCKvxYCvUEEx1ponRSphrZqV/OHX52gw0SVpo884Qd0aIlrjMk5tB/g4jpwFvAdCgYMBBFkAAL0oFe5IN6c2kFZTUR9pWGVKsq1wFl1XOUIbAxB8WvqoOCYYPBoh0VSHkJlahyBdCsMqp8HlcK6AqwjRYQktm+YOtFVrPWSATY4JYEgzzb4AxuvBtw5RWNohZBYseCcfLpB4xcliOQK+24c20DzRRbmWLsARJklcK47QBjQE1Ik5wgRIiIHLESNgiAGmkDJVIKiiJ/IPHB6aokp0cLRlGZNCMNAhLaSJKWv5UgVNxiQLibcan0WC3YGty8ZB74UGDX4V7Shq2CDhECSgWaJkAozkSJU6tbuFVx0qFt9g4yYHEPKHHIk+0RIYO+pgmwEqJGQPzcuFBCMIi7duAXfJaFUR2cGoCSpGBoFpGoGCIX/TOxuwlRMPilXkGCDBTLDATteC4OnKJSAhzuN59b2IRJdfJCR2dO3UoiQQmcECdWKUU0pxYNN9WmE7h6gtiQIk9bTuYGwNkbcbPFcAVnDOqUp1Hj4Xh7iNiJ0sBKBglIFwQeUebHkGjx46KwcQ8ePW5JCeQ56mbxQL5YEcv0FRBTnniQy3JAZym8QYOoihG57RU1KRTKyGu1CyIKRcCyERGvmgG/qcxrYtdzN/0cIpEItz1kRbN/4Al/Qdx7Juk3S+YT/MzhDj0HTIgIgVhp4UieDGjfKcOyFgMkn3kEt3lhHQEjishQAOIcH8qPBZuArMrQK2q2ABlELQNQMkoTgjn6C1FKAvLSKMD2tQFEG8FDkXrDmwFgTgVzLMmCpXH3ESMkJjjdESLMPgNQJyJAC0NADWOynIL2ESDPu7qEMDsxGihWiPgTq8IoJQPnvIe9hRLjngcTrhCuPqtgGIESAfkoHfjwKcp7njmvF8g9uhPoSmrnliLfNsh2KoZLkqIcIDBROHjQABL/liH5GwZMgaidD6ImKju/n5HCI0stgAWgXDvUqal8sAUiGQFEtAW3IcBJiXiEXwAQP5LQDoY8F/j/n/onHRMclACiD7mIAMILjrtRAyAPLEInqYuhFUfIDBKPBTPQL4RQBIOSF8s0QoPLmIBBITOyDkU9l8rSHCCQJUdIJpMLudmLj2OiPQFkVJo7sdqdgBEBAMQQLYbjmsc7qdntGwdoH3NNk8ptmCk9v0V4IMXwLjoxqyAljQBMkLnLjcTsV8gcpzsgBMeXkQNMdHjJNuFJr8VMc4ZpF8lTFGDBMVu0YwPJKcVniuDQLgnocoQ9DwBkYYW8QrliJsfUSHNpBDD+mVsgG0D4ESCnu9N4YSLbmPhgHzsLq4YgOMSEWSTYA8hBEoDChsGol8t0TBNgbIivMnmeGeLblCYEDCTfOAbGqRknl3FAFYMYYEncj4G7pFmotNk2skHmPoktvFgaqtDePqeVNJoOIdk7i7o8DLpsGSIyHCV3gSRRCcNwABFlNCfAIVvOpLhnhgFnlwEEFRDLrIrjpKUEF6TesBJcrgGiTToYUqBIrKiBDobCM4eMdCokH7PCcwUiTnpmorumkmrGRGFPjJGwNGrQrjuUYVDQM4LKKcBgOZNRA4XUX3LBBTjYQMF2BQBgLtquHJNRErhBDWYmGvilIEX7piViIjmSoPLOVrqOTnvjmopFP1i5oOlke5nRq5kOpzv5kSiqc2pOqFlAJgZJPQAunrEeNoUFK+OfhgHqVJjbKaR2HugUAeo8Eeh1ogP2Tpn+TeEJkYCRB6LxuDKpCRMUOkNZC+T6hBbxNANvtZLeSEh4AIA+U+R1vBVAO0HOiTNqvyl1ttlkJMNhZAFcOuPlmmY8EWIFqqfcstswOWP6IUKxbxFHpHm8hQAAFTEjQWQBPiQD4I3jipsUkQcX0I8V9qEzeiFSCX4LuK6nswdaiViVQAMDoVQkCXGnfAPnqDwXiXoXsIpAUBGTaXFAtDtDtAGXsXoU4lBSCUvn2W4A2XqXoWnjFwpBgQpiCUWVWWuWkYpCunaWQFBXG4BUaX6K/wIGOXEXcBRWTERXoXgnrKxU6YmUzFsBJUpDFnaUvnFlpaYBHbsY1g5aPB5YFbFYUAlBeKzhOYegMBcKxBGCFUZZZafl9qQhLLaS1blbMqNYigADMDQNWrWEA4GBgW2/5gm7wIGiGg2KGEC6Go2FWKqB2ySTadFhBT+kKkIMKi0FGFa7i5OKm62C2BFuEzCHoa4dWKFVYaF6VS5nYFpKWHgoYgZulT1KQ7hJAm+K4GAXyJpTGjIfIf13FwNFIgNn1Oa9uf1t6kJKmbx5+iptGN1N8d195N45pMsr171916FiAoYf1dw8AXy95hNiAf1KeKQ+WbAZND1FNf1feNN0FnxKZQUJpX1yQCQ/1D5CQCNy8ogVuYgGVLaKQrR6gLJGOEyNZotLgD5kIApbNpld5D1uuOVK5OV6JfA+hlAytqZ+NUVNJkA3Ffe+tqtmlj5eAD54erwjw3FzyAtLx8tSNhETtzR5tqF6FnNwthUvOkU9OuQjONGgdb2jwG49ZXAuScSrCKQAAEq2B6GdO0o1dIKApZgEgMIBbAJwYbENPeUpZoDnfdULEbAsOherbIbgFrTThoLAEYOFnEO3F5umuUG8miFySSZGN2ugIbleV3O2huS3VuRWoaYOj5pqQeUYbFoEsFuiKeTAEhegCGNeRjQ9T7Qyb7hgO+fEGVfrFNTeABd8HXcJiRKVVKHjUeBvXzomN9TCL9ZHjdh8keESFfbpRDaDUCODXKJDS/frJzTDVKHDTms/a/Wwdccjc/c0kvf3ceGvd7d8DvTooegfTtjNafbxOfW9fCAbATUTUCCTaA/rOTfg5ANTbTSQEQ7g99aQ8zRQ1Q3ddfVvXzV+Lziw1Q+A/SXznLbIOLblJLRwyplwwlDwwrZQErX/TeezV7eLdRBrewDXQoTrdrQw9I4beAabRkaoyrTI4XTbVEHbSbY7ZI5w9rrw67bQII4LS2towbZjZ9b7TbrzlAxKulsVUCKVblvlkWlVQ3bVfVaIE1VgDYBsCqsUP4xKi0NwEKaxiPsEFEEQL1YyqtayraINaNdyogu/GSnvDgAQAtbLiwGfMgUtfKnYTTpZvAi/G/NvAoKwOoMjZTZ+uiOcHouHoCtU0gjvN0kaPaJXCKAwLUEKEaJUA0LaLaGgGgDCiKKoA0MaAwAwCKLaLQDUCKA0HEAAJzVCdPZODVoC2gCA1DVAbOZaVCTNxDGiVBbO0BChxCDUjPDUqALMCBGi0AihGgkB1BoCDU7O1O0CDW0AbODUbMkAigkBCh0BjOmi2gy41CqBoA1ANAbNgt1A/oNBCgbMfPgs1C2i/MQDoAHPVBCi2g1DYGVwnODU1A1DfMMATNotoBCi0sDMMA1AkCDV1CDOIudMGDZOnwNOERNOogtN0Ba30D6BAA=== -->\n\n<!-- internal state end -->"},"request":{"retryCount":3,"retries":3,"retryAfter":16}},"response":{"url":"https://api.github.com/repos/fluent/fluent-bit/issues/comments/3847295303","status":500,"headers":{"access-control-allow-origin":"*","access-control-expose-headers":"ETag, Link, Location, Retry-After, X-GitHub-OTP, X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Used, X-RateLimit-Resource, X-RateLimit-Reset, X-OAuth-Scopes, X-Accepted-OAuth-Scopes, X-Poll-Interval, X-GitHub-Media-Type, X-GitHub-SSO, X-GitHub-Request-Id, Deprecation, Sunset","content-length":"0","content-security-policy":"default-src 'none'","content-type":"application/json; charset=utf-8","date":"Mon, 09 Feb 2026 16:12:32 GMT","referrer-policy":"origin-when-cross-origin, strict-origin-when-cross-origin","server":"github.com","strict-transport-security":"max-age=31536000; includeSubdomains; preload","vary":"Accept-Encoding, Accept, X-Requested-With","x-accepted-github-permissions":"issues=write; pull_requests=write","x-content-type-options":"nosniff","x-frame-options":"deny","x-github-api-version-selected":"2022-11-28","x-github-media-type":"github.v3; format=json","x-github-request-id":"404F:68365:10B97A:47D026:698A076F","x-ratelimit-limit":"15000","x-ratelimit-remaining":"14927","x-ratelimit-reset":"1770656672","x-ratelimit-resource":"core","x-ratelimit-used":"73","x-xss-protection":"0"},"data":""}}

1 similar comment
@coderabbitai
Copy link

coderabbitai bot commented Feb 9, 2026

Caution

Failed to replace (edit) comment. This is likely due to insufficient permissions or the comment being deleted.

Error details
{"name":"HttpError","status":500,"request":{"method":"PATCH","url":"https://api.github.com/repos/fluent/fluent-bit/issues/comments/3847295303","headers":{"accept":"application/vnd.github.v3+json","user-agent":"octokit.js/0.0.0-development octokit-core.js/7.0.6 Node.js/24","authorization":"token [REDACTED]","content-type":"application/json; charset=utf-8"},"body":{"body":"<!-- This is an auto-generated comment: summarize by coderabbit.ai -->\n<!-- This is an auto-generated comment: failure by coderabbit.ai -->\n\n> [!CAUTION]\n> ## Review failed\n> \n> Failed to post review comments\n\n<!-- end of auto-generated comment: failure by coderabbit.ai -->\n\n<!-- This is an auto-generated comment: review paused by coderabbit.ai -->\n\n> [!NOTE]\n> ## Reviews paused\n> \n> It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the `reviews.auto_review.auto_pause_after_reviewed_commits` setting.\n> \n> Use the following commands to manage reviews:\n> - `@coderabbitai resume` to resume automatic reviews.\n> - `@coderabbitai review` to trigger a single review.\n> \n> Use the checkboxes below for quick actions:\n> - [ ] <!-- {\"checkboxId\": \"7f6cc2e2-2e4e-497a-8c31-c9e4573e93d1\"} --> ▶️ Resume reviews\n> - [ ] <!-- {\"checkboxId\": \"e9bb8d72-00e8-4f67-9cb2-caf3b22574fe\"} --> 🔍 Trigger review\n\n<!-- end of auto-generated comment: review paused by coderabbit.ai -->\n\n<!-- walkthrough_start -->\n\n<details>\n<summary>📝 Walkthrough</summary>\n\n## Walkthrough\n\nAdds a new AMQP input plugin: build gating for rabbitmq-c, plugin registration and CMake entries, public header types for connection/plugin state, and a full plugin implementation handling AMQP connections, consumption, parsing, encoding, retry/reconnect, and lifecycle callbacks.\n\n## Changes\n\n|Cohort / File(s)|Summary|\n|---|---|\n|**Top-level build** <br> `CMakeLists.txt`|Added `find_package(rabbitmq-c)` and a guard that disables `FLB_IN_AMQP` with a STATUS message if rabbitmq-c is not found.|\n|**Plugin registry** <br> `plugins/CMakeLists.txt`|Registered the new input plugin via `REGISTER_IN_PLUGIN(\"in_amqp\")`.|\n|**Plugin build** <br> `plugins/in_amqp/CMakeLists.txt`|New CMake for `in_amqp`: declares `in_amqp.c` and registers the plugin with `FLB_PLUGIN(... rabbitmq::rabbitmq)`.|\n|**Plugin header** <br> `plugins/in_amqp/in_amqp.h`|Adds `struct flb_amqp_connection` and `struct flb_amqp` types containing connection, config, encoder, parser, and runtime fields.|\n|**Plugin implementation** <br> `plugins/in_amqp/in_amqp.c`|New AMQP input implementation: connection/socket/channel management, consumer loop, envelope metadata extraction, optional parser integration, encoding/emission, retry/reconnect scheduling, collectors, and lifecycle callbacks (init/collect/pause/resume/exit).|\n\n## Sequence Diagrams\n\n```mermaid\nsequenceDiagram\n    participant FLB as Fluent Bit\n    participant Plugin as in_amqp Plugin\n    participant AMQP as AMQP Broker\n    participant Parser as Parser\n    participant Encoder as Encoder\n\n    FLB->>Plugin: in_amqp_init()\n    Plugin->>Plugin: parse config (uri, queue, parser, retries)\n    Plugin->>Parser: init parser (if configured)\n    Plugin->>Encoder: init encoder\n    Plugin->>AMQP: open socket / login / open channel / basic_consume\n    Plugin->>FLB: register collect callback\n```\n\n```mermaid\nsequenceDiagram\n    participant Plugin as in_amqp Plugin\n    participant AMQP as AMQP Broker\n    participant Parser as Parser\n    participant Encoder as Encoder\n    participant FLB as Fluent Bit\n\n    Plugin->>AMQP: amqp_consume_message() (poll)\n    alt delivery received\n        AMQP->>Plugin: envelope (body, headers, routing_key, props)\n        Plugin->>Parser: parse body (optional)\n        Parser-->>Plugin: parsed record / raw body\n        Plugin->>Plugin: attach metadata (routing_key, headers, props)\n        Plugin->>Encoder: encode event\n        Encoder-->>Plugin: msgpack buffer\n        Plugin->>FLB: emit record\n    else connection/error\n        AMQP-->>Plugin: error / timeout\n        Plugin->>Plugin: schedule reconnect via retry collector\n    end\n```\n\n## Estimated code review effort\n\n🎯 4 (Complex) | ⏱️ ~45 minutes\n\n## Poem\n\n> 🐰 I hopped into queues where headers gleam,  \n> Opened channels, nibbled bytes from the stream,  \n> I parse and pack, then retry with a grin,  \n> Reconnect, emit — let the messages in,  \n> A floppy‑eared patch, now ready to spin.\n\n</details>\n\n<!-- walkthrough_end -->\n\n\n<!-- pre_merge_checks_walkthrough_start -->\n\n<details>\n<summary>🚥 Pre-merge checks | ✅ 2 | ❌ 1</summary>\n\n<details>\n<summary>❌ Failed checks (1 warning)</summary>\n\n|     Check name     | Status     | Explanation                                                                           | Resolution                                                                         |\n| :----------------: | :--------- | :------------------------------------------------------------------------------------ | :--------------------------------------------------------------------------------- |\n| Docstring Coverage | ⚠️ Warning | Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. | Write docstrings for the functions missing them to satisfy the coverage threshold. |\n\n</details>\n<details>\n<summary>✅ Passed checks (2 passed)</summary>\n\n|     Check name    | Status   | Explanation                                                                                                                                                                                             |\n| :---------------: | :------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |\n| Description Check | ✅ Passed | Check skipped - CodeRabbit’s high-level summary is enabled.                                                                                                                                             |\n|    Title check    | ✅ Passed | The title 'Initial implementation for in_amqp plugin' directly and clearly describes the main change: adding a new AMQP input plugin to Fluent Bit, which is the primary objective of the pull request. |\n\n</details>\n\n<sub>✏️ Tip: You can configure your own custom pre-merge checks in the settings.</sub>\n\n</details>\n\n<!-- pre_merge_checks_walkthrough_end -->\n\n<!-- finishing_touch_checkbox_start -->\n\n<details>\n<summary>✨ Finishing touches</summary>\n\n- [ ] <!-- {\"checkboxId\": \"7962f53c-55bc-4827-bfbf-6a18da830691\"} --> 📝 Generate docstrings\n<details>\n<summary>🧪 Generate unit tests (beta)</summary>\n\n- [ ] <!-- {\"checkboxId\": \"f47ac10b-58cc-4372-a567-0e02b2c3d479\", \"radioGroupId\": \"utg-output-choice-group-unknown_comment_id\"} -->   Create PR with unit tests\n- [ ] <!-- {\"checkboxId\": \"07f1e7d6-8a8e-4e23-9900-8731c2c87f58\", \"radioGroupId\": \"utg-output-choice-group-unknown_comment_id\"} -->   Post copyable unit tests in a comment\n\n</details>\n\n</details>\n\n<!-- finishing_touch_checkbox_end -->\n\n<!-- tips_start -->\n\n---\n\nThanks for using [CodeRabbit](https://coderabbit.ai?utm_source=oss&utm_medium=github&utm_campaign=fluent/fluent-bit&utm_content=11427)! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.\n\n<details>\n<summary>❤️ Share</summary>\n\n- [X](https://twitter.com/intent/tweet?text=I%20just%20used%20%40coderabbitai%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20the%20proprietary%20code.%20Check%20it%20out%3A&url=https%3A//coderabbit.ai)\n- [Mastodon](https://mastodon.social/share?text=I%20just%20used%20%40coderabbitai%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20the%20proprietary%20code.%20Check%20it%20out%3A%20https%3A%2F%2Fcoderabbit.ai)\n- [Reddit](https://www.reddit.com/submit?title=Great%20tool%20for%20code%20review%20-%20CodeRabbit&text=I%20just%20used%20CodeRabbit%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20proprietary%20code.%20Check%20it%20out%3A%20https%3A//coderabbit.ai)\n- [LinkedIn](https://www.linkedin.com/sharing/share-offsite/?url=https%3A%2F%2Fcoderabbit.ai&mini=true&title=Great%20tool%20for%20code%20review%20-%20CodeRabbit&summary=I%20just%20used%20CodeRabbit%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20proprietary%20code)\n\n</details>\n\n<sub>Comment `@coderabbitai help` to get the list of available commands and usage tips.</sub>\n\n<!-- tips_end -->\n\n<!-- internal state start -->\n\n\n<!-- DwQgtGAEAqAWCWBnSTIEMB26CuAXA9mAOYCmGJATmriQCaQDG+Ats2bgFyQAOFk+AIwBWJBrngA3EsgEBPRvlqU0AgfFwA6NPEgQAfACgjoCEYDEZyAAUASpETZWaCrKPR1AGxJcAkhnXwaB4ozNxebBi41PD4WABm+HzwGAD6aMwAjtw8HthEyZCQBgByjgKUXACMlQAsAEwA7IUGAKo2ADJcsLi43IgcAPQD+biw2AIaTMwDcbnsM3ORYGq4A9zYHh4D1fVNRS2IFZDM1ADuJPJFAMr42BQMJJACVBgMsFzJaZnZgEmEMM6kXBPF5vLgnRA0PjXKK4bD9fjcMjNGwkCTwEjnCjwuiIeCIigAGkg/joAiCiAA1vAidxqBR4AwKWAISRuLAyIgiQ8PEESD1kpU6gAOABszXaKhIHnhtHwDEQYAoJAy2HgSvoRQAwkrqHR0JxIHUAAx1EVgE3mmrQQUcACsIo4RpqAC0jAARaQMenccSxDgGKAAQVotGQmBQ/nEQRCYRIERhMSw+Di6CwgYAsgBFKwR9ZAsJ5AoACk+6SyAEpIAk+AAxRZAgBC6g0AesuXyWFOaGQNBZ9FO6lgkBsKhWWcgNQ0dRbUAAogAPdKxhQYOLwIh3aKxHgUfBopS0LgAA1m2HYy3Uuh0ZeyYG4KpIZ4AvLMJqXvrp8PZcLK8EeZ5AmqxLg9ICHgiSMOyjIeEgQLqHG/QGIUc6LqEXgrmuG5UL6GBcLwe7wAeLbIZAHpgUQkAePgFG3L0eBVruzCQL24gYEQXDFAMgbEbokAAGpBEQ9IYPQtF5vYsD4AObHEl+XhoBS8KcdxSG8W6cqOOwW5YEqKpqnQXDYK8UEUnQAHtAyHLeAoSj2OMzDqDQ9BGUofCBrSbyPNORo8JQO74CIYiQLpqpKvGiAtgYFiQAA8sIojiFIyAOE4LhuJ41l+AE0bwGhcZaThVYQe+WQ5IWWBFKUzDlBQVS1I0zRtJ0kDdL0/RDCMYwTFMCxnpEvXnisawbFsOwNfshy1ccZwXM0Nx3A8wKYKCEZfKVfzQACfJLcZYLdpCc0wnCXD4IiFUGCiaIYpQ2KhnilBEiStBktKVI0nSDJMiybIclyUq8vyGCCqK4qStKXCyvKirKqFepajqTn6lwxqmuadSWtadR2g6TqugYHqIF6eI4f6QYhmGWDJNlwS5bG8bafwKbhhm2a5vRBYdpAJapDelbVpAdZ9Y2zatlY7YFCxeoDqMw6juo46TtOrYLku6FMKu66boV+H7gZkAnvWF5AmA14fneD7Pq+GglbeX4Qr+uD/q2QGRKB4F8J50GwSgNDMIhJEq3lGGa9hiZ4buuu0DxUBkXklHUfweDiXEjHMdIrHsZAynR/xgnCaJSf0YgknSRRGBySQClKVxOfqQwmmRAzIX6YekBGZ7plR62FkPBghxcEwtkOAIDk9HqLl+e5aCeYaGg+fi/mBUCzdhewEVGPoxjgFAZCiUz4HEGQyiI1M8bh/w8ViJI0hPPIg/KKozbaLoYCGCYUBwKgqDhmgB+kOQ2E9Sn3YFwKgpw7KpXkHIGyD8VhaB0JvLepgDCanTApEgFkIQRVwPOTgBgABEhCoqWEDD4Q+ADdT0BSicFwjNIKYFIIgDekAWY5l4JQGGSB4LICILqXwfdKCI1wKcL8rDoY8kRuw5uuJeyQVEIpCMgE0GmUwbgbBuCc7BgPFWZItAUgeQpGgUgRYqCP1wJkMADByyqSgD4FMNZ2gNhSD4YoKRWEoGQDFYoqZ6CmJWBYhgHjZJAgSC5IktAkAqHQg4pxLi3FZhzGiNAAtHEpBilYaAPgvFFhic41x7iYo1hrJWaWQ5klXGgIGaALQrjHGkIgIxJBWzQBESgJQkQGTRjQCGAIsQwzcERM4RRhxaSAJgcFEg+Q+lEgEPgGW+JqwORkqMR4DS2CQF4RneO+RAn83cQ4AZiRNCi3GDBBgAwSDzm4EcvUgYrA+AmScZIyB26wAYXQAA3LJYkGJICXOuRQRGiBZDVXwNKfgSRIiUDiNPaQkVzAkI8JCbSPYvwrMgEoBgPJQ59Lof8m5ok+DrAEGcv5HTxDSGYcUWITSDAWXIMgN47zW4AGohQDHNAQohAZkEc2eQMVB6DVHqLwYQ/BxCWFkP/sfPU1DnDyGTPQtilLWxaL1OQcBNsyqcyVPkCEOKsDQOeYI5IFEbCzgAOI+AqbOGweSUhWHaC0K1xQiz4JtvgysBQ+V9wFcojBsERUaBgAgZAsob7lyBMwRQ8A4jyEwPIS5sFTXbIZBCv5FBdx8DeSJGCbEvmxA8Imq5mBQzMXZKtH1yBdWwVoaU8tjwNVkpArIeFErAxIuPomVFDaMWiGxSivFVyCXpuJaS9gAQVW2IwBIOUDNukHmPOaq1Nq7VxMdc6lxbqPXliPIotc6EjxVr9UKwNGgcGOyMPSm+TLlWssqByo0RhZwQlypQ8ZSorrgJIHEasBp0x0HgI4Ll4qeVgCMMem2J6VFnovf6MV7apVH0AVQxwNCFUplvYwowWiwzHCpicYIgrTLBywgzfm6KtU+tJqRftzgb7JMQLce4jwYIQn1oge4e71ZRCpjJI8NtJhHlbCiPVkIewVp9ZAet6KRxmPHFMa55BIiQCSfrXJG6XXczWtkfBAASAA3pxhgABffBwU5bmIyBwDgfj1CZF3QilhnaDU9vRZigdOFkCKvxYCvUEEx1ponRSphrZqV/OHX52gw0SVpo884Qd0aIlrjMk5tB/g4jpwFvAdCgYMBBFkAAL0oFe5IN6c2kFZTUR9pWGVKsq1wFl1XOUIbAxB8WvqoOCYYPBoh0VSHkJlahyBdCsMqp8HlcK6AqwjRYQktm+YOtFVrPWSATY4JYEgzzb4AxuvBtw5RWNohZBYseCcfLpB4xcliOQK+24c20DzRRbmWLsARJklcK47QBjQE1Ik5wgRIiIHLESNgiAGmkDJVIKiiJ/IPHB6aokp0cLRlGZNCMNAhLaSJKWv5UgVNxiQLibcan0WC3YGty8ZB74UGDX4V7Shq2CDhECSgWaJkAozkSJU6tbuFVx0qFt9g4yYHEPKHHIk+0RIYO+pgmwEqJGQPzcuFBCMIi7duAXfJaFUR2cGoCSpGBoFpGoGCIX/TOxuwlRMPilXkGCDBTLDATteC4OnKJSAhzuN59b2IRJdfJCR2dO3UoiQQmcECdWKUU0pxYNN9WmE7h6gtiQIk9bTuYGwNkbcbPFcAVnDOqUp1Hj4Xh7iNiJ0sBKBglIFwQeUebHkGjx46KwcQ8ePW5JCeQ56mbxQL5YEcv0FRBTnniQy3JAZym8QYOoihG57RU1KRTKyGu1CyIKRcCyERGvmgG/qcxrYtdzN/0cIpEItz1kRbN/4Al/Qdx7Juk3S+YT/MzhDj0HTIgIgVhp4UieDGjfKcOyFgMkn3kEt3lhHQEjishQAOIcH8qPBZuArMrQK2q2ABlELQNQMkoTgjn6C1FKAvLSKMD2tQFEG8FDkXrDmwFgTgVzLMmCpXH3ESMkJjjdESLMPgNQJyJAC0NADWOynIL2ESDPu7qEMDsxGihWiPgTq8IoJQPnvIe9hRLjngcTrhCuPqtgGIESAfkoHfjwKcp7njmvF8g9uhPoSmrnliLfNsh2KoZLkqIcIDBROHjQABL/liH5GwZMgaidD6ImKju/n5HCI0stgAWgXDvUqal8sAUiGQFEtAW3IcBJiXiEXwAQP5LQDoY8F/j/n/onHRMclACiD7mIAMILjrtRAyAPLEInqYuhFUfIDBKPBTPQL4RQBIOSF8s0QoPLmIBBITOyDkU9l8rSHCCQJUdIJpMLudmLj2OiPQFkVJo7sdqdgBEBAMQQLYbjmsc7qdntGwdoH3NNk8ptmCk9v0V4IMXwLjoxqyAljQBMkLnLjcTsV8gcpzsgBMeXkQNMdHjJNuFJr8VMc4ZpF8lTFGDBMVu0YwPJKcVniuDQLgnocoQ9DwBkYYW8QrliJsfUSHNpBDD+mVsgG0D4ESCnu9N4YSLbmPhgHzsLq4YgOMSEWSTYA8hBEoDChsGol8t0TBNgbIivMnmeGeLblCYEDCTfOAbGqRknl3FAFYMYYEncj4G7pFmotNk2skHmPoktvFgaqtDePqeVNJoOIdk7i7o8DLpsGSIyHCV3gSRRCcNwABFlNCfAIVvOpLhnhgFnlwEEFRDLrIrjpKUEF6TesBJcrgGiTToYUqBIrKiBDobCM4eMdCokH7PCcwUiTnpmorumkmrGRGFPjJGwNGrQrjuUYVDQM4LKKcBgOZNRA4XUX3LBBTjYQMF2BQBgLtquHJNRErhBDWYmGvilIEX7piViIjmSoPLOVrqOTnvjmopFP1i5oOlke5nRq5kOpzv5kSiqc2pOqFlAJgZJPQAunrEeNoUFK+OfhgHqVJjbKaR2HugUAeo8Eeh1ogP2Tpn+TeEJkYCRB6LxuDKpCRMUOkNZC+T6hBbxNANvtZLeSEh4AIA+U+R1vBVAO0HOiTNqvyl1ttlkJMNhZAFcOuPlmmY8EWIFqqfcstswOWP6IUKxbxFHpHm8hQAAFTEjQWQBPiQD4I3jipsUkQcX0I8V9qEzeiFSCX4LuK6nswdaiViVQAMDoVQkCXGnfAPnqDwXiXoXsIpAUBGTaXFAtDtDtAGXsXoU4lBSCUvn2W4A2XqXoWnjFwpBgQpiCUWVWWuWkYpCunaWQFBXG4BUaX6K/wIGOXEXcBRWTERXoXgnrKxU6YmUzFsBJUpDFnaUvnFlpaYBHbsY1g5aPB5YFbFYUAlBeKzhOYegMBcKxBGCFUZZZafl9qQhLLaS1blbMqNYigADMDQNWrWEA4GBgW2/5gm7wIGiGg2KGEC6Go2FWKqB2ySTadFhBT+kKkIMKi0FGFa7i5OKm62C2BFuEzCHoa4dWKFVYaF6VS5nYFpKWHgoYgZulT1KQ7hJAm+K4GAXyJpTGjIfIf13FwNFIgNn1Oa9uf1t6kJKmbx5+iptGN1N8d195N45pMsr171916FiAoYf1dw8AXy95hNiAf1KeKQ+WbAZND1FNf1feNN0FnxKZQUJpX1yQCQ/1D5CQCNy8ogVuYgGVLaKQrR6gLJGOEyNZotLgD5kIApbNpld5D1uuOVK5OV6JfA+hlAytqZ+NUVNJkA3Ffe+tqtmlj5eAD54erwjw3FzyAtLx8tSNhETtzR5tqF6FnNwthUvOkU9OuQjONGgdb2jwG49ZXAuScSrCKQAAEq2B6GdO0o1dIKApZgEgMIBbAJwYbENPeUpZoDnfdULEbAsOherbIbgFrTThoLAEYOFnEO3F5umuUG8miFySSZGN2ugIbleV3O2huS3VuRWoaYOj5pqQeUYbFoEsFuiKeTAEhegCGNeRjQ9T7Qyb7hgO+fEGVfrFNTeABd8HXcJiRKVVKHjUeBvXzomN9TCL9ZHjdh8keESFfbpRDaDUCODXKJDS/frJzTDVKHDTms/a/Wwdccjc/c0kvf3ceGvd7d8DvTooegfTtjNafbxOfW9fCAbATUTUCCTaA/rOTfg5ANTbTSQEQ7g99aQ8zRQ1Q3ddfVvXzV+Lziw1Q+A/SXznLbIOLblJLRwyplwwlDwwrZQErX/TeezV7eLdRBrewDXQoTrdrQw9I4beAabRkaoyrTI4XTbVEHbSbY7ZI5w9rrw67bQII4LS2towbZjZ9b7TbrzlAxKulsVUCKVblvlkWlVQ3bVfVaIE1VgDYBsCqsUP4xKi0NwEKaxiPsEFEEQL1YyqtayraINaNdyogu/GSnvDgAQAtbLiwGfMgUtfKnYTTpZvAi/G/NvAoKwOoMjZTZ+uiOcHouHoCtU0gjvN0kaPaJXCKAwLUEKEaJUA0LaLaGgGgDCiKKoA0MaAwAwCKLaLQDUCKA0HEAAJzVCdPZODVoC2gCA1DVAbOZaVCTNxDGiVBbO0BChxCDUjPDUqALMCBGi0AihGgkB1BoCDU7O1O0CDW0AbODUbMkAigkBCh0BjOmi2gy41CqBoA1ANAbNgt1A/oNBCgbMfPgs1C2i/MQDoAHPVBCi2g1DYGVwnODU1A1DfMMATNotoBCi0sDMMA1AkCDV1CDOIudMGDZOnwNOERNOogtN0Ba30D6BAA=== -->\n\n<!-- internal state end -->"},"request":{"retryCount":3,"retries":3,"retryAfter":16}},"response":{"url":"https://api.github.com/repos/fluent/fluent-bit/issues/comments/3847295303","status":500,"headers":{"access-control-allow-origin":"*","access-control-expose-headers":"ETag, Link, Location, Retry-After, X-GitHub-OTP, X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Used, X-RateLimit-Resource, X-RateLimit-Reset, X-OAuth-Scopes, X-Accepted-OAuth-Scopes, X-Poll-Interval, X-GitHub-Media-Type, X-GitHub-SSO, X-GitHub-Request-Id, Deprecation, Sunset","content-length":"0","content-security-policy":"default-src 'none'","content-type":"application/json; charset=utf-8","date":"Mon, 09 Feb 2026 16:12:32 GMT","referrer-policy":"origin-when-cross-origin, strict-origin-when-cross-origin","server":"github.com","strict-transport-security":"max-age=31536000; includeSubdomains; preload","vary":"Accept-Encoding, Accept, X-Requested-With","x-accepted-github-permissions":"issues=write; pull_requests=write","x-content-type-options":"nosniff","x-frame-options":"deny","x-github-api-version-selected":"2022-11-28","x-github-media-type":"github.v3; format=json","x-github-request-id":"404F:68365:10B97A:47D026:698A076F","x-ratelimit-limit":"15000","x-ratelimit-remaining":"14927","x-ratelimit-reset":"1770656672","x-ratelimit-resource":"core","x-ratelimit-used":"73","x-xss-protection":"0"},"data":""}}

@coderabbitai
Copy link

coderabbitai bot commented Feb 9, 2026

✅ Actions performed

Reviews resumed.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@plugins/in_amqp/in_amqp.c`:
- Around line 451-459: in in_amqp_config_destroy, guard the call to
flb_input_collector_delete by checking ctx->retry_coll_id is valid (not -1)
before calling it; update the function handling for struct flb_amqp (used by
in_amqp_init and cleanup paths) to only call
flb_input_collector_delete(ctx->retry_coll_id, ctx->ins) when ctx->retry_coll_id
has been initialized to a non -1 value, leaving other cleanup
(flb_log_event_encoder_destroy, in_amqp_connection_destroy, flb_free) unchanged.
- Around line 327-339: in_amqp_connection_destroy currently calls
flb_input_collector_delete unconditionally which can be invoked with c->coll_id
== -1; update the function to check c->coll_id != -1 before calling
flb_input_collector_delete and only reset c->coll_id to -1 after successful
deletion; locate the logic in in_amqp_connection_destroy that references struct
flb_amqp_connection *c = &ctx->conn, the c->conn check, and the
flb_input_collector_delete(c->coll_id, ctx->ins) call to add the guard.

@matwey
Copy link
Author

matwey commented Feb 16, 2026

@coderabbitai resume

@coderabbitai
Copy link

coderabbitai bot commented Feb 16, 2026

✅ Actions performed

Reviews resumed.

@matwey
Copy link
Author

matwey commented Feb 16, 2026

@codex review

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@plugins/in_amqp/in_amqp.c`:
- Line 301: Fix the three English typos in the in_amqp.c messages: change the
comment/string "All messages has been processed" to "All messages have been
processed", change "An error occured during waiting frame" to "An error occurred
during waiting frame", and change "An error occured during consuming message" to
"An error occurred during consuming message" (update the literal
strings/comments where they appear in in_amqp.c so both logs and comments
reflect the corrected spelling).
- Around line 288-342: The loop in in_amqp_collect can spin unbounded if
in_amqp_consumer_start succeeds but the broker immediately drops the new
connection; to fix, after detecting a successful in_amqp_consumer_start (the
call to in_amqp_consumer_start(ctx, config) following
in_amqp_connection_destroy), break out of the for(;;) so the socket collector
can re-schedule collection instead of retrying inline, or alternatively
implement a local reconnect limit/backoff counter in in_amqp_collect that
increments on consecutive immediate failures and returns an error or sleeps when
the limit is reached; update references to c->conn and ctx handling so you don't
continue consuming on a dropped connection.
🧹 Nitpick comments (2)
plugins/in_amqp/in_amqp.c (2)

213-227: Missing rollback_map on metadata header encoding failure.

If an entry fails during the loop (lines 220-222) or begin_map succeeds but commit_map is never reached, the map is left uncommitted. While flb_log_event_encoder_reset at line 263 will clean up the whole record, adding a rollback on error is the correct pattern used elsewhere (e.g., flb_opentelemetry_logs.c, kubernetes.c) and prevents potential encoder state inconsistencies.

Proposed fix
     if (ret ==  FLB_EVENT_ENCODER_SUCCESS && (properties->_flags & AMQP_BASIC_HEADERS_FLAG)) {
         ret = flb_log_event_encoder_append_metadata_cstring(&ctx->encoder, "headers");

         if (ret == FLB_EVENT_ENCODER_SUCCESS) {
             ret = flb_log_event_encoder_metadata_begin_map((&ctx->encoder));
         }

         for (i = 0; i < headers->num_entries && ret == FLB_EVENT_ENCODER_SUCCESS; ++i) {
             ret = in_amqp_append_metadata_entry(&ctx->encoder, &headers->entries[i]);
         }

         if (ret == FLB_EVENT_ENCODER_SUCCESS) {
             ret = flb_log_event_encoder_metadata_commit_map((&ctx->encoder));
+        } else {
+            flb_log_event_encoder_metadata_rollback_map((&ctx->encoder));
         }
     }

608-617: Pause/resume only handles one collector — verify intent under edge cases.

in_amqp_pause pauses either the socket collector (if connected) or the retry collector (if not). If the framework calls pause during a transient state where the connection just dropped but the retry collector hasn't been resumed yet, neither collector gets paused. This window is narrow but exists between in_amqp_connection_destroy (which deletes the socket collector) and the subsequent flb_input_collector_resume(ctx->retry_coll_id, ...).

Consider pausing both collectors unconditionally (guarded by validity checks) for robustness:

Proposed fix
 static void in_amqp_pause(void *data, struct flb_config *config)
 {
     struct flb_amqp *ctx = data;

-    if (ctx->conn.conn) {
-        flb_input_collector_pause(ctx->conn.coll_id, ctx->ins);
-    } else {
+    if (ctx->conn.coll_id >= 0) {
+        flb_input_collector_pause(ctx->conn.coll_id, ctx->ins);
+    }
+    if (ctx->retry_coll_id >= 0) {
         flb_input_collector_pause(ctx->retry_coll_id, ctx->ins);
     }
 }

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: b2377614c1

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

@matwey
Copy link
Author

matwey commented Feb 16, 2026

@coderabbitai resume
@codex review

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@plugins/in_amqp/in_amqp.c`:
- Around line 167-176: The parser error path can leak memory because
flb_parser_do may allocate out_buf before returning an error; in the block that
checks ctx->parser and calls flb_parser_do (the one that currently logs then
returns ret), ensure any partially allocated out_buf is freed before returning:
initialize out_buf to NULL before the call, and on ret < 0 call
flb_free(out_buf) (or equivalent) before returning the error. Update the error
path in the function that contains ctx->parser/flb_parser_do so out_buf is
cleaned up and out_size/out_time handling remains consistent.

@coderabbitai
Copy link

coderabbitai bot commented Feb 16, 2026

@matwey: I will resume the automatic reviews for this PR.

✅ Actions performed

Reviews resumed.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 83ae35cb81

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +333 to +335
in_amqp_log_reply_error(in, reply, "An error occurred during consuming message");

in_amqp_connection_destroy(ctx);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Skip reconnect for non-close AMQP unexpected-state frames

After handling AMQP_STATUS_UNEXPECTED_STATE, this path always logs an error and destroys/recreates the connection. That means non-delivery frames that are not connection/channel-close (e.g., normal control/heartbeat traffic) still trigger a full reconnect cycle, which can cause repeated disconnect/reconnect churn and dropped throughput on otherwise healthy brokers. The branch should continue consuming for benign frames and only reconnect on definitive close/error conditions.

Useful? React with 👍 / 👎.


queue_bytes.len = flb_sds_len(ctx->queue_name);
queue_bytes.bytes = ctx->queue_name;
if (amqp_basic_consume(c->conn, c->chan, queue_bytes, amqp_empty_bytes, 0, 1, 1, amqp_empty_table) == NULL) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Disable forced exclusive mode in amqp_basic_consume

The consume call hardcodes exclusive=1, which asks RabbitMQ for an exclusive consumer every time. In queues that already have another consumer (including another Fluent Bit instance), the broker will reject the consume request and this plugin will loop through reconnect attempts instead of consuming. This should be non-exclusive by default (or configurable) to avoid breaking common shared-queue deployments.

Useful? React with 👍 / 👎.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes no sense. There can not be two consumers for this queue, because it will lead to data loss.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
plugins/in_amqp/in_amqp.c (3)

538-543: Consider using flb_calloc for consistency with other input plugins.

The flb_malloc + memset approach is functionally correct, but flb_calloc(1, sizeof(struct flb_amqp)) is the prevailing idiom in Fluent Bit input plugins and achieves the same zero-initialization in one call. The explicit -1 assignments for retry_coll_id and conn.coll_id on lines 547 and 552 are still needed regardless.


608-628: Pause/resume should handle both collectors to avoid missed state transitions.

Currently, in_amqp_pause only pauses one collector based on whether a connection exists. If the plugin is paused while connected, only the socket collector is paused. If the connection drops while paused (e.g., TCP keepalive detects a dead connection — the socket collector is paused so in_amqp_collect won't run), and then in_amqp_resume is called, it would check ctx->conn.conn (still non-NULL since nothing cleaned it up), resume only the socket collector (now pointing at a dead FD), and the retry collector would never be resumed.

This is an edge case, but unconditionally pausing/resuming both collectors (guarding against invalid coll_id) would be more robust:

Proposed fix
 static void in_amqp_pause(void *data, struct flb_config *config)
 {
     struct flb_amqp *ctx = data;
 
-    if (ctx->conn.conn) {
+    if (ctx->conn.coll_id >= 0) {
         flb_input_collector_pause(ctx->conn.coll_id, ctx->ins);
-    } else {
-        flb_input_collector_pause(ctx->retry_coll_id, ctx->ins);
     }
+    flb_input_collector_pause(ctx->retry_coll_id, ctx->ins);
 }
 
 static void in_amqp_resume(void *data, struct flb_config *config)
 {
     struct flb_amqp *ctx = data;
 
-    if (ctx->conn.conn) {
+    if (ctx->conn.coll_id >= 0) {
         flb_input_collector_resume(ctx->conn.coll_id, ctx->ins);
-    } else {
+    }
+    if (!ctx->conn.conn) {
         flb_input_collector_resume(ctx->retry_coll_id, ctx->ins);
     }
 }

509-511: amqp_parse_url mutates the URI string in-place, creating a fragile pattern.

amqp_parse_url inserts NUL bytes into its input buffer and stores pointers into it within conn_info. Since ctx->uri is a framework-managed flb_sds_t from the config map, this mutates the config string in-place. While this works correctly at runtime (the SDS length header preserves proper deallocation), it means ctx->uri is no longer a valid C-string after parsing. Although no logging of ctx->uri currently occurs after the parse, this creates a fragile pattern where future debug/config logging could show truncated values. Consider making a mutable copy to avoid this brittleness:

Proposed refactor
     if (ctx->uri) {
-        ret = amqp_parse_url(ctx->uri, &ctx->conn_info);
+        ctx->uri_buf = flb_sds_create(ctx->uri);
+        if (!ctx->uri_buf) {
+            return -1;
+        }
+        ret = amqp_parse_url(ctx->uri_buf, &ctx->conn_info);
         if (ret != AMQP_STATUS_OK) {

(with corresponding flb_sds_destroy(ctx->uri_buf) in in_amqp_config_destroy)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
plugins/in_amqp/in_amqp.c (2)

154-268: Solid envelope handling with proper _flags guards and encoder error chaining.

A few minor observations:

  • Line 213: extra whitespace in ret == FLB_EVENT_ENCODER_SUCCESS (double space before FLB_EVENT_ENCODER_SUCCESS).
  • If a header entry encoding fails mid-iteration (line 220-222), the map opened at line 217 is never explicitly rolled back via flb_log_event_encoder_metadata_rollback_map. The overall record won't be committed (guarded at line 250), and encoder_reset at line 263 will clean up, so this doesn't cause a data corruption bug. However, an explicit rollback on error would be more robust.

Neither is blocking — the encoder reset path handles cleanup correctly.

Minor: fix double space and add explicit map rollback
-    if (ret ==  FLB_EVENT_ENCODER_SUCCESS && (properties->_flags & AMQP_BASIC_HEADERS_FLAG)) {
+    if (ret == FLB_EVENT_ENCODER_SUCCESS && (properties->_flags & AMQP_BASIC_HEADERS_FLAG)) {

For robustness, consider adding an else branch after the commit_map:

         if (ret == FLB_EVENT_ENCODER_SUCCESS) {
             ret = flb_log_event_encoder_metadata_commit_map((&ctx->encoder));
+        } else {
+            flb_log_event_encoder_metadata_rollback_map((&ctx->encoder));
         }

657-665: Consider validating reconnect.retry_interval to prevent a rapid-fire timer.

A value of 0 for reconnect.retry_interval would create a timer with zero-second interval, potentially causing high CPU usage during reconnect cycles. A minimum (e.g., 1 second) would be a reasonable safeguard.

Suggested validation in in_amqp_configure
+    if (ctx->reconnect_retry_interval < 1) {
+        flb_plg_warn(in, "reconnect.retry_interval must be >= 1, defaulting to 1");
+        ctx->reconnect_retry_interval = 1;
+    }
+
     if (!ctx->queue_name) {

Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
@matwey
Copy link
Author

matwey commented Feb 25, 2026

@coderabbitai resume
@codex review

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@plugins/in_amqp/in_amqp.c`:
- Around line 586-603: The code sets the input context with
flb_input_set_context(in, ctx) but on the two late-init failure paths (after
flb_input_set_collector_time returning <0 and after in_amqp_consumer_start
returning <0) it calls in_amqp_config_destroy(ctx) without clearing in->context,
leaving a dangling pointer; update both failure branches to call
flb_input_set_context(in, NULL) (or otherwise clear the input context) before
calling in_amqp_config_destroy(ctx) and returning -1 so the freed ctx is not
referenced by the input instance.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 187f4e9 and 8d485c3.

📒 Files selected for processing (6)
  • CMakeLists.txt
  • cmake/plugins_options.cmake
  • plugins/CMakeLists.txt
  • plugins/in_amqp/CMakeLists.txt
  • plugins/in_amqp/in_amqp.c
  • plugins/in_amqp/in_amqp.h
🚧 Files skipped from review as they are similar to previous changes (4)
  • plugins/CMakeLists.txt
  • plugins/in_amqp/in_amqp.h
  • CMakeLists.txt
  • plugins/in_amqp/CMakeLists.txt

Comment on lines +586 to +603
flb_input_set_context(in, ctx);

ctx->retry_coll_id = flb_input_set_collector_time(in, &in_amqp_reconnect, ctx->reconnect_retry_interval, 0, config);
if (ctx->retry_coll_id < 0) {
flb_plg_error(in, "Cannot create reconnection collector");
in_amqp_config_destroy(ctx);

return -1;
}

flb_input_collector_pause(ctx->retry_coll_id, in);

ret = in_amqp_consumer_start(ctx, config);
if (ret < 0) {
flb_plg_error(in, "Cannot start AMQP consumer");
in_amqp_config_destroy(ctx);

return -1;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Clear the input context before freeing ctx on late init failures.

After flb_input_set_context(in, ctx) at Line 586, failure paths at Lines 589-593 and 599-603 free ctx but leave in->context pointing to freed memory. Clear the context before destroy in those branches.

🔧 Suggested fix
     flb_input_set_context(in, ctx);

     ctx->retry_coll_id = flb_input_set_collector_time(in, &in_amqp_reconnect, ctx->reconnect_retry_interval, 0, config);
     if (ctx->retry_coll_id < 0) {
         flb_plg_error(in, "Cannot create reconnection collector");
+        flb_input_set_context(in, NULL);
         in_amqp_config_destroy(ctx);

         return -1;
     }
@@
     ret = in_amqp_consumer_start(ctx, config);
     if (ret < 0) {
         flb_plg_error(in, "Cannot start AMQP consumer");
+        flb_input_set_context(in, NULL);
         in_amqp_config_destroy(ctx);

         return -1;
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/in_amqp/in_amqp.c` around lines 586 - 603, The code sets the input
context with flb_input_set_context(in, ctx) but on the two late-init failure
paths (after flb_input_set_collector_time returning <0 and after
in_amqp_consumer_start returning <0) it calls in_amqp_config_destroy(ctx)
without clearing in->context, leaving a dangling pointer; update both failure
branches to call flb_input_set_context(in, NULL) (or otherwise clear the input
context) before calling in_amqp_config_destroy(ctx) and returning -1 so the
freed ctx is not referenced by the input instance.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 8d485c33e5

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +315 to +317
} else if (frame.frame_type == AMQP_FRAME_METHOD) {
switch (frame.payload.method.id) {
case AMQP_CHANNEL_CLOSE_METHOD: {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Ignore heartbeat frames on unexpected-state consume replies

When amqp_consume_message() returns AMQP_STATUS_UNEXPECTED_STATE, this code only treats method frames as recoverable and falls through to connection teardown for any non-method frame. In RabbitMQ deployments with negotiated heartbeats (commonly non-zero), heartbeat frames can hit this path and trigger needless disconnect/reconnect cycles, causing intermittent ingestion gaps and churn instead of continuing consumption.

Useful? React with 👍 / 👎.


queue_bytes.len = flb_sds_len(ctx->queue_name);
queue_bytes.bytes = ctx->queue_name;
if (amqp_basic_consume(c->conn, c->chan, queue_bytes, amqp_empty_bytes, 0, 1, 1, amqp_empty_table) == NULL) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid forcing exclusive consumers in basic.consume

The amqp_basic_consume call hard-codes exclusive=1, which makes the broker reject this consumer whenever the queue already has another active consumer. That turns normal shared-queue or failover topologies into repeated startup/reconnect failures; this should default to non-exclusive (or be configurable) so the plugin can attach to queues used by multiple consumers.

Useful? React with 👍 / 👎.

@coderabbitai
Copy link

coderabbitai bot commented Feb 25, 2026

@matwey: I will resume the automatic reviews for this PR.

✅ Actions performed

Reviews resumed.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
plugins/in_amqp/in_amqp.c (1)

471-479: ⚠️ Potential issue | 🟠 Major

Clear input context during teardown to avoid dangling pointer after late init failures.

After flb_input_set_context(in, ctx) on Line 586, failures on Line 589 and Line 599 call in_amqp_config_destroy(ctx). ctx is freed, but in->context is not cleared, leaving stale state and possible double cleanup paths.

🔧 Proposed fix
 static int in_amqp_config_destroy(struct flb_amqp *ctx)
 {
+    if (ctx->ins) {
+        flb_input_set_context(ctx->ins, NULL);
+    }
+
     flb_log_event_encoder_destroy(&ctx->encoder);
     if (ctx->retry_coll_id >= 0) {
         flb_input_collector_delete(ctx->retry_coll_id, ctx->ins);
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/in_amqp/in_amqp.c` around lines 471 - 479, in_amqp_config_destroy
currently frees ctx but does not clear the input instance's context pointer,
leaving in->context dangling after failures that call this teardown; update the
teardown so that after flb_input_set_context(in, ctx) failures and inside
in_amqp_config_destroy you explicitly clear the input context (set in->context =
NULL) before or immediately after freeing ctx, or change in_amqp_config_destroy
signature to accept struct flb_input_instance *in and null out in->context
inside the function (affecting functions: in_amqp_config_destroy, the code paths
that call flb_input_set_context and call in_amqp_config_destroy).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@plugins/in_amqp/in_amqp.c`:
- Around line 650-675: The new AMQP input options declared in config_map
(fields: uri, queue, parser, reconnect.retry_limits, reconnect.retry_interval
tied to struct flb_amqp) lack user-facing documentation; add a concise docs
section describing each option's purpose and type, state the defaults ("amqp://"
for uri, no default for queue and parser, reconnect.retry_limits=5,
reconnect.retry_interval=60), show a minimal working example config snippet
using these keys, and update the plugin's README/docs index and changelog so the
PR no longer requires a docs follow-up.

---

Duplicate comments:
In `@plugins/in_amqp/in_amqp.c`:
- Around line 471-479: in_amqp_config_destroy currently frees ctx but does not
clear the input instance's context pointer, leaving in->context dangling after
failures that call this teardown; update the teardown so that after
flb_input_set_context(in, ctx) failures and inside in_amqp_config_destroy you
explicitly clear the input context (set in->context = NULL) before or
immediately after freeing ctx, or change in_amqp_config_destroy signature to
accept struct flb_input_instance *in and null out in->context inside the
function (affecting functions: in_amqp_config_destroy, the code paths that call
flb_input_set_context and call in_amqp_config_destroy).

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 187f4e9 and 8d485c3.

📒 Files selected for processing (6)
  • CMakeLists.txt
  • cmake/plugins_options.cmake
  • plugins/CMakeLists.txt
  • plugins/in_amqp/CMakeLists.txt
  • plugins/in_amqp/in_amqp.c
  • plugins/in_amqp/in_amqp.h
🚧 Files skipped from review as they are similar to previous changes (2)
  • CMakeLists.txt
  • plugins/in_amqp/CMakeLists.txt

Comment on lines +650 to +675
static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "uri", "amqp://",
0, FLB_TRUE, offsetof(struct flb_amqp, uri),
"Specify an AMQP URI to connect the broker"
},
{
FLB_CONFIG_MAP_STR, "queue", NULL,
0, FLB_TRUE, offsetof(struct flb_amqp, queue_name),
"Specify an AMQP queue name to consume from"
},
{
FLB_CONFIG_MAP_STR, "parser", NULL,
0, FLB_TRUE, offsetof(struct flb_amqp, parser_name),
"Set a parser"
},
{
FLB_CONFIG_MAP_INT, "reconnect.retry_limits", "5",
0, FLB_TRUE, offsetof(struct flb_amqp, reconnect_retry_limits),
"Maximum number to retry to connect the broker"
},
{
FLB_CONFIG_MAP_INT, "reconnect.retry_interval", "60",
0, FLB_TRUE, offsetof(struct flb_amqp, reconnect_retry_interval),
"Retry interval to connect the broker"
},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add user-facing documentation for the new AMQP input options before merge.

Lines 652-675 introduce externally configurable AMQP settings (uri, queue, parser, reconnect.*). The PR is marked docs-required and this surface should be documented with option semantics/defaults and a minimal example.

I can draft a concise docs section for these options if you want me to open a follow-up issue template.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/in_amqp/in_amqp.c` around lines 650 - 675, The new AMQP input options
declared in config_map (fields: uri, queue, parser, reconnect.retry_limits,
reconnect.retry_interval tied to struct flb_amqp) lack user-facing
documentation; add a concise docs section describing each option's purpose and
type, state the defaults ("amqp://" for uri, no default for queue and parser,
reconnect.retry_limits=5, reconnect.retry_interval=60), show a minimal working
example config snippet using these keys, and update the plugin's README/docs
index and changelog so the PR no longer requires a docs follow-up.


# Inputs (sources, data collectors)
# =================================
DEFINE_OPTION(FLB_IN_AMQP "Enable AMQP input plugin" ON)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this compile for all supported targets, including Windows and macOS? If not you need to provide the cmake overrides for those targets appropriately - there are macOS and Windows ones plus you can do the usual CMake config approach to enable/disable as required.

We provide a linked build script in the PR template to help test Linux targets.

There are no changes to any of the builds so no new dependencies are required?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested this only on Linux since I don't have macOS and Windows. From what I see, this PR should work on all platforms.

There is single new dependency for this plugin:
https://github.com/matwey/fluent-bit/blob/8d485c33e5eeab4b42489f56caeafb58bce3b166/CMakeLists.txt#L1337

rabbitmq-c is cross-platform library. Existing vcpkg port proves that it can be built for all platforms.

Copy link
Contributor

@patrick-stephens patrick-stephens Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to include the dependency for all builds as part of this change then , it might be easier to disable as well for macos/windows for now - there are config files in cmake/ for this.

Linux builds are done using the containers under the packaging/ directory so we can add the dependencies there if required, or include a vendored version of it if we are building from source under lib/.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs-required ok-package-test Run PR packaging tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants