Skip to content

Conversation

@mithunbharadwaj
Copy link
Contributor

@mithunbharadwaj mithunbharadwaj commented Jan 8, 2026

Summary by CodeRabbit

  • Bug Fixes
    • Improved Kafka connection reliability with enhanced retry logic that rotates across brokers on each reconnection attempt.
    • Added validation to skip initialization when required Kafka configuration is missing.
    • Enhanced error handling and logging for broker unavailability and connection errors.
    • Ensured stale consumers are replaced so fresh consumer instances are used during retries.

✏️ Tip: You can customize this high-level summary in your review settings.

@mithunbharadwaj mithunbharadwaj self-assigned this Jan 8, 2026
@coderabbitai
Copy link

coderabbitai bot commented Jan 8, 2026

📝 Walkthrough

Walkthrough

Refactors Kafka handler to store KafkaTopic, KafkaGroupId, and KafkaBootstrapServers; adds _build_consumer(bootstrap_servers); updates initialize() to validate config, rotate bootstrap servers on retries, stop/rebuild/start a fresh consumer each attempt, and improve Kafka connection error handling.

Changes

Cohort / File(s) Summary
Kafka configuration & consumer lifecycle
asabiris/handlers/kafkahandler.py
Added persistent fields: KafkaTopic, KafkaGroupId, KafkaBootstrapServers. Added _build_consumer(self, bootstrap_servers) to encapsulate AIOKafkaConsumer construction. initialize() now prevalidates required config, rotates bootstrap server list on each retry, stops any existing consumer before rebuilding, restarts a fresh consumer per attempt, and logs enhanced connection/retry details.

Sequence Diagram(s)

sequenceDiagram
  participant Handler
  participant Config
  participant Consumer
  participant KafkaBroker as Broker

  Config->>Handler: provide KafkaTopic, KafkaGroupId, KafkaBootstrapServers
  Handler->>Handler: preflight validate config
  alt config missing
    Handler-->>Handler: skip Kafka initialization
  else config present
    Handler->>Handler: pick bootstrap server(s) (rotated)
    Handler->>Consumer: _build_consumer(bootstrap_servers)
    Consumer->>Broker: connect
    alt connection success
      Consumer-->>Handler: started
    else connection failure
      Broker-->>Handler: connection error
      Handler->>Handler: stop consumer (if any) and rotate servers
      Handler->>Handler: wait retry interval then retry
    end
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐇 I nudge the bootstrap, hop and retry,
I bundle topic, group, and brokers nigh.
I stop, rebuild, then start anew,
Rotating seeds of brokers true.
A rabbit's cheer for Kafka's cue! 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately reflects the main change: implementing retry logic with alternate bootstrap server rotation for Kafka consumer initialization.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @asabiris/handlers/kafkahandler.py:
- Around line 68-76: Remove the deprecated loop argument from aiokafka usage: in
_build_consumer remove the loop=self.App.Loop parameter passed to
AIOKafkaConsumer, and likewise remove loop=self.App.Loop from any
asyncio.ensure_future(...) invocation that schedules the consumer (the
ensure_future call used to start the consumer). Let aiokafka/asyncio use the
running event loop automatically so the code is compatible with aiokafka
>=0.8.0.
🧹 Nitpick comments (1)
asabiris/handlers/kafkahandler.py (1)

95-99: Consider logging the exception instead of silently swallowing it.

While ignoring errors when stopping a potentially broken consumer is reasonable, logging at debug level would aid troubleshooting without cluttering normal operation.

♻️ Suggested improvement
 			try:
 				if self.Consumer is not None:
 					await self.Consumer.stop()
-			except Exception:
-				pass
+			except Exception as e:
+				L.debug("Ignoring error while stopping stale consumer: {}".format(e))
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0eac68e and d2f8562.

📒 Files selected for processing (1)
  • asabiris/handlers/kafkahandler.py
🧰 Additional context used
🧬 Code graph analysis (1)
asabiris/handlers/kafkahandler.py (2)
asabiris/output/slack/service.py (1)
  • check_config (16-22)
asabiris/output/msteams/service.py (1)
  • check_config (11-17)
🪛 Ruff (0.14.10)
asabiris/handlers/kafkahandler.py

64-64: Do not catch blind exception: Exception

(BLE001)


78-78: Unused method argument: app

(ARG002)


98-99: try-except-pass detected, consider logging the exception

(S110)


98-98: Do not catch blind exception: Exception

(BLE001)

🔇 Additional comments (4)
asabiris/handlers/kafkahandler.py (4)

46-49: LGTM!

The persistent configuration fields are properly initialized with appropriate default values (None for optional strings, empty list for servers).


51-66: LGTM!

The configuration loading logic properly validates required parameters and gracefully handles initialization failures. The broad exception catch is acceptable here as it's guarding the constructor and ensures the service can be instantiated even if Kafka configuration is problematic.


88-91: LGTM!

The bootstrap server rotation logic is well-implemented. It cycles through servers on each retry attempt, ensuring connection attempts are distributed across available brokers.


107-117: LGTM!

The error handling correctly catches both KafkaConnectionError and NoBrokersAvailable, provides informative logging, and implements proper exponential backoff. The for-else pattern correctly handles the case when all retry attempts are exhausted.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @asabiris/handlers/kafkahandler.py:
- Around line 68-76: The _build_consumer method passes a deprecated loop
parameter to AIOKafkaConsumer (loop=self.App.Loop); remove the loop argument so
AIOKafkaConsumer is constructed without passing self.App.Loop and relies on the
running event loop, updating the AIOKafkaConsumer call in _build_consumer
accordingly.
🧹 Nitpick comments (2)
asabiris/handlers/kafkahandler.py (2)

95-99: Consider logging the exception during consumer cleanup.

The silent try-except-pass swallows any error during Consumer.stop(). While this is intentional to ensure cleanup proceeds regardless of consumer state, logging at debug level would aid troubleshooting without disrupting the retry flow.

♻️ Suggested improvement
 			try:
 				if self.Consumer is not None:
 					await self.Consumer.stop()
-			except Exception:
-				pass
+			except Exception as e:
+				L.debug("Failed to stop existing consumer before rebuild: {}".format(e))

108-118: Consider handling broader exceptions during consumer start.

The error handling catches KafkaConnectionError and KafkaError, but Consumer.start() could raise other exceptions (e.g., OSError for network issues, asyncio.CancelledError). These would propagate unhandled, bypassing the retry logic.

♻️ Suggested improvement
-		except (aiokafka.errors.KafkaConnectionError, aiokafka.errors.KafkaError) as e:
+		except (aiokafka.errors.KafkaConnectionError, aiokafka.errors.KafkaError, OSError) as e:
 			L.warning(
 				"No connection to Kafka established. Attempt {} of {}. Retrying in {} seconds... {}".format(
 					attempt + 1, max_retries, delay, e
 				)
 			)

Alternatively, catch Exception but re-raise asyncio.CancelledError to respect cancellation:

except asyncio.CancelledError:
    raise
except Exception as e:
    L.warning(...)
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d2f8562 and db002e7.

📒 Files selected for processing (1)
  • asabiris/handlers/kafkahandler.py
🧰 Additional context used
🧬 Code graph analysis (1)
asabiris/handlers/kafkahandler.py (2)
asabiris/output/slack/service.py (1)
  • check_config (16-22)
asabiris/output/msteams/service.py (1)
  • check_config (11-17)
🪛 Ruff (0.14.10)
asabiris/handlers/kafkahandler.py

64-64: Do not catch blind exception: Exception

(BLE001)


78-78: Unused method argument: app

(ARG002)


98-99: try-except-pass detected, consider logging the exception

(S110)


98-98: Do not catch blind exception: Exception

(BLE001)

🔇 Additional comments (2)
asabiris/handlers/kafkahandler.py (2)

46-66: Configuration storage for retry logic looks good.

The approach of storing Kafka configuration as instance variables (KafkaTopic, KafkaGroupId, KafkaBootstrapServers) enables the retry mechanism to rebuild consumers with rotated servers. The validation check on line 56 properly guards against incomplete configuration.

Minor observation: Line 63 builds a consumer that will be immediately rebuilt in initialize() (as the comment acknowledges). This is acceptable but slightly wasteful.


83-91: Bootstrap server rotation logic is well-designed.

The rotation strategy correctly distributes connection attempts across available brokers:

  • attempt % len(servers) ensures proper wrap-around
  • Each retry starts with a different primary broker

The exponential backoff with delay * 2 capped at max_delay=300 is a good resilience pattern.

Comment on lines +68 to +76
def _build_consumer(self, bootstrap_servers):
self.Consumer = AIOKafkaConsumer(
self.KafkaTopic,
group_id=self.KafkaGroupId,
bootstrap_servers=bootstrap_servers,
loop=self.App.Loop,
retry_backoff_ms=10000,
auto_offset_reset="earliest",
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

aiokafka AIOKafkaConsumer loop parameter deprecated

💡 Result:

Short answer: The loop parameter on AIOKafkaConsumer is deprecated — aiokafka warns "The loop argument is deprecated since 0.7.1, and scheduled for removal in 0.9.0". You should stop passing loop and let aiokafka use the running event loop (get_running_loop()) or obtain the loop yourself with asyncio.get_running_loop()/asyncio.new_event_loop() as appropriate. [1][2]

What to change:

  • Before (deprecated):
    consumer = AIOKafkaConsumer(..., loop=loop)
  • After (recommended):
    consumer = AIOKafkaConsumer(...)

    call from a running coroutine or use asyncio.get_running_loop() if needed

If you must use an explicit loop in older code, create and set it explicitly (asyncio.new_event_loop() + asyncio.set_event_loop(loop)) or call asyncio.get_running_loop() inside a coroutine; avoid asyncio.get_event_loop() because its behavior is deprecated in recent Python versions. [2][3]

Sources:

  • aiokafka consumer source showing the deprecation warning. [1]
  • aiokafka producer code uses get_running_loop when loop is None (same pattern). [2]
  • Python asyncio docs on deprecation of get_event_loop / recommended get_running_loop. [3]

References:
[1] aiokafka consumer implementation (deprecation warning).
[2] aiokafka producer implementation (uses get_running_loop when loop is None).
[3] Python asyncio docs — deprecation of get_event_loop / use get_running_loop.


🌐 Web query:

aiokafka 0.7.2 loop parameter asyncio

💡 Result:

Short answer: In aiokafka 0.7.2 the loop argument is deprecated — passing loop to AIOKafkaConsumer/AIOKafkaProducer emits a deprecation warning (it was deprecated in 0.7.1) and the project planned to remove the parameter in 0.8.0 as preparation for asyncio’s removal of explicit loop parameters in Python 3.10. Use the current default event loop (don’t pass loop). [1][2]

Sources:

  • aiokafka release notes (deprecation in 0.7.x, 0.7.2 release history). [1]
  • CPython issue removing deprecated asyncio "loop" parameter (Python 3.10 change). [2]

🌐 Web query:

Python asyncio loop parameter deprecated event loop

💡 Result:

Short answer: The optional loop= arguments were deprecated/removed across asyncio because getting the current loop via asyncio.get_running_loop() (or higher-level APIs like asyncio.run and asyncio.create_task) is now the preferred, safer and faster approach. Remove explicit loop= usage and use get_running_loop()/get_event_loop() (in sync code with care), or the new high-level APIs. [1][2][6]

What changed (concise):

  • Many asyncio APIs accepted a loop= parameter historically so you could target a specific event loop. That became redundant after get_event_loop() was made reliable/fast and newer helpers were added. The loop parameter was deprecated (around 3.8–3.10) and removed/forcibly rejected in later versions. [6][5]
  • asyncio.get_event_loop() behavior was tightened; prefer asyncio.get_running_loop() inside coroutines/callbacks and asyncio.run() for top-level execution. Some event-loop policy functions are also being deprecated in recent Python releases. [2][1][4]

How to update code (practical):

  • Inside coroutines/callbacks: use asyncio.get_running_loop() instead of passing loop=. Example: loop = asyncio.get_running_loop(); fut = loop.create_future(). [2][6]
  • Use top-level helpers: asyncio.run(main()), asyncio.create_task(coro) (instead of loop.create_task), asyncio.sleep(), asyncio.wait(), asyncio.as_completed(), etc., without loop=. [6]
  • From synchronous code that must access a loop: explicitly create/manage a loop (asyncio.new_event_loop(), set it on the thread) and pass that loop where strictly needed (e.g., run_coroutine_threadsafe(coro, loop) still expects a loop), but prefer running coroutines with asyncio.run or running a dedicated loop in that thread. [5][9]
  • If a third-party API still takes loop=, upgrade the library; if you control it, remove the parameter and use get_running_loop()/get_event_loop() internally. [5][6]

References

  • Python asyncio event loop docs (get_running_loop / get_event_loop / deprecation notes). [2]
  • Stack Overflow explanation and history of deprecated loop parameters. [6]
  • Python issue tracker: removal/simplification of loop parameter handling and related PRs. [5]
  • Python deprecations index documenting related asyncio deprecations. [4]

Remove the deprecated loop parameter from AIOKafkaConsumer.

The loop parameter has been deprecated in aiokafka since version 0.7.1 and will be removed in 0.9.0. Passing it triggers a deprecation warning. Remove loop=self.App.Loop and let aiokafka use the running event loop automatically:

self.Consumer = AIOKafkaConsumer(
    self.KafkaTopic,
    group_id=self.KafkaGroupId,
    bootstrap_servers=bootstrap_servers,
    retry_backoff_ms=10000,
    auto_offset_reset="earliest",
)
🤖 Prompt for AI Agents
In @asabiris/handlers/kafkahandler.py around lines 68 - 76, The _build_consumer
method passes a deprecated loop parameter to AIOKafkaConsumer
(loop=self.App.Loop); remove the loop argument so AIOKafkaConsumer is
constructed without passing self.App.Loop and relies on the running event loop,
updating the AIOKafkaConsumer call in _build_consumer accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants