-
Notifications
You must be signed in to change notification settings - Fork 1
Retry Kafka consumer with alternate bootstrap servers #122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughRefactors Kafka handler to store KafkaTopic, KafkaGroupId, and KafkaBootstrapServers; adds _build_consumer(bootstrap_servers); updates initialize() to validate config, rotate bootstrap servers on retries, stop/rebuild/start a fresh consumer each attempt, and improve Kafka connection error handling. Changes
Sequence Diagram(s)sequenceDiagram
participant Handler
participant Config
participant Consumer
participant KafkaBroker as Broker
Config->>Handler: provide KafkaTopic, KafkaGroupId, KafkaBootstrapServers
Handler->>Handler: preflight validate config
alt config missing
Handler-->>Handler: skip Kafka initialization
else config present
Handler->>Handler: pick bootstrap server(s) (rotated)
Handler->>Consumer: _build_consumer(bootstrap_servers)
Consumer->>Broker: connect
alt connection success
Consumer-->>Handler: started
else connection failure
Broker-->>Handler: connection error
Handler->>Handler: stop consumer (if any) and rotate servers
Handler->>Handler: wait retry interval then retry
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @asabiris/handlers/kafkahandler.py:
- Around line 68-76: Remove the deprecated loop argument from aiokafka usage: in
_build_consumer remove the loop=self.App.Loop parameter passed to
AIOKafkaConsumer, and likewise remove loop=self.App.Loop from any
asyncio.ensure_future(...) invocation that schedules the consumer (the
ensure_future call used to start the consumer). Let aiokafka/asyncio use the
running event loop automatically so the code is compatible with aiokafka
>=0.8.0.
🧹 Nitpick comments (1)
asabiris/handlers/kafkahandler.py (1)
95-99: Consider logging the exception instead of silently swallowing it.While ignoring errors when stopping a potentially broken consumer is reasonable, logging at debug level would aid troubleshooting without cluttering normal operation.
♻️ Suggested improvement
try: if self.Consumer is not None: await self.Consumer.stop() - except Exception: - pass + except Exception as e: + L.debug("Ignoring error while stopping stale consumer: {}".format(e))
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
asabiris/handlers/kafkahandler.py
🧰 Additional context used
🧬 Code graph analysis (1)
asabiris/handlers/kafkahandler.py (2)
asabiris/output/slack/service.py (1)
check_config(16-22)asabiris/output/msteams/service.py (1)
check_config(11-17)
🪛 Ruff (0.14.10)
asabiris/handlers/kafkahandler.py
64-64: Do not catch blind exception: Exception
(BLE001)
78-78: Unused method argument: app
(ARG002)
98-99: try-except-pass detected, consider logging the exception
(S110)
98-98: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (4)
asabiris/handlers/kafkahandler.py (4)
46-49: LGTM!The persistent configuration fields are properly initialized with appropriate default values (
Nonefor optional strings, empty list for servers).
51-66: LGTM!The configuration loading logic properly validates required parameters and gracefully handles initialization failures. The broad exception catch is acceptable here as it's guarding the constructor and ensures the service can be instantiated even if Kafka configuration is problematic.
88-91: LGTM!The bootstrap server rotation logic is well-implemented. It cycles through servers on each retry attempt, ensuring connection attempts are distributed across available brokers.
107-117: LGTM!The error handling correctly catches both
KafkaConnectionErrorandNoBrokersAvailable, provides informative logging, and implements proper exponential backoff. Thefor-elsepattern correctly handles the case when all retry attempts are exhausted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @asabiris/handlers/kafkahandler.py:
- Around line 68-76: The _build_consumer method passes a deprecated loop
parameter to AIOKafkaConsumer (loop=self.App.Loop); remove the loop argument so
AIOKafkaConsumer is constructed without passing self.App.Loop and relies on the
running event loop, updating the AIOKafkaConsumer call in _build_consumer
accordingly.
🧹 Nitpick comments (2)
asabiris/handlers/kafkahandler.py (2)
95-99: Consider logging the exception during consumer cleanup.The silent
try-except-passswallows any error duringConsumer.stop(). While this is intentional to ensure cleanup proceeds regardless of consumer state, logging at debug level would aid troubleshooting without disrupting the retry flow.♻️ Suggested improvement
try: if self.Consumer is not None: await self.Consumer.stop() - except Exception: - pass + except Exception as e: + L.debug("Failed to stop existing consumer before rebuild: {}".format(e))
108-118: Consider handling broader exceptions during consumer start.The error handling catches
KafkaConnectionErrorandKafkaError, butConsumer.start()could raise other exceptions (e.g.,OSErrorfor network issues,asyncio.CancelledError). These would propagate unhandled, bypassing the retry logic.♻️ Suggested improvement
- except (aiokafka.errors.KafkaConnectionError, aiokafka.errors.KafkaError) as e: + except (aiokafka.errors.KafkaConnectionError, aiokafka.errors.KafkaError, OSError) as e: L.warning( "No connection to Kafka established. Attempt {} of {}. Retrying in {} seconds... {}".format( attempt + 1, max_retries, delay, e ) )Alternatively, catch
Exceptionbut re-raiseasyncio.CancelledErrorto respect cancellation:except asyncio.CancelledError: raise except Exception as e: L.warning(...)
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
asabiris/handlers/kafkahandler.py
🧰 Additional context used
🧬 Code graph analysis (1)
asabiris/handlers/kafkahandler.py (2)
asabiris/output/slack/service.py (1)
check_config(16-22)asabiris/output/msteams/service.py (1)
check_config(11-17)
🪛 Ruff (0.14.10)
asabiris/handlers/kafkahandler.py
64-64: Do not catch blind exception: Exception
(BLE001)
78-78: Unused method argument: app
(ARG002)
98-99: try-except-pass detected, consider logging the exception
(S110)
98-98: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (2)
asabiris/handlers/kafkahandler.py (2)
46-66: Configuration storage for retry logic looks good.The approach of storing Kafka configuration as instance variables (
KafkaTopic,KafkaGroupId,KafkaBootstrapServers) enables the retry mechanism to rebuild consumers with rotated servers. The validation check on line 56 properly guards against incomplete configuration.Minor observation: Line 63 builds a consumer that will be immediately rebuilt in
initialize()(as the comment acknowledges). This is acceptable but slightly wasteful.
83-91: Bootstrap server rotation logic is well-designed.The rotation strategy correctly distributes connection attempts across available brokers:
attempt % len(servers)ensures proper wrap-around- Each retry starts with a different primary broker
The exponential backoff with
delay * 2capped atmax_delay=300is a good resilience pattern.
| def _build_consumer(self, bootstrap_servers): | ||
| self.Consumer = AIOKafkaConsumer( | ||
| self.KafkaTopic, | ||
| group_id=self.KafkaGroupId, | ||
| bootstrap_servers=bootstrap_servers, | ||
| loop=self.App.Loop, | ||
| retry_backoff_ms=10000, | ||
| auto_offset_reset="earliest", | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
aiokafka AIOKafkaConsumer loop parameter deprecated
💡 Result:
Short answer: The loop parameter on AIOKafkaConsumer is deprecated — aiokafka warns "The loop argument is deprecated since 0.7.1, and scheduled for removal in 0.9.0". You should stop passing loop and let aiokafka use the running event loop (get_running_loop()) or obtain the loop yourself with asyncio.get_running_loop()/asyncio.new_event_loop() as appropriate. [1][2]
What to change:
- Before (deprecated):
consumer = AIOKafkaConsumer(..., loop=loop) - After (recommended):
consumer = AIOKafkaConsumer(...)call from a running coroutine or use asyncio.get_running_loop() if needed
If you must use an explicit loop in older code, create and set it explicitly (asyncio.new_event_loop() + asyncio.set_event_loop(loop)) or call asyncio.get_running_loop() inside a coroutine; avoid asyncio.get_event_loop() because its behavior is deprecated in recent Python versions. [2][3]
Sources:
- aiokafka consumer source showing the deprecation warning. [1]
- aiokafka producer code uses get_running_loop when loop is None (same pattern). [2]
- Python asyncio docs on deprecation of get_event_loop / recommended get_running_loop. [3]
References:
[1] aiokafka consumer implementation (deprecation warning).
[2] aiokafka producer implementation (uses get_running_loop when loop is None).
[3] Python asyncio docs — deprecation of get_event_loop / use get_running_loop.
🌐 Web query:
aiokafka 0.7.2 loop parameter asyncio
💡 Result:
Short answer: In aiokafka 0.7.2 the loop argument is deprecated — passing loop to AIOKafkaConsumer/AIOKafkaProducer emits a deprecation warning (it was deprecated in 0.7.1) and the project planned to remove the parameter in 0.8.0 as preparation for asyncio’s removal of explicit loop parameters in Python 3.10. Use the current default event loop (don’t pass loop). [1][2]
Sources:
- aiokafka release notes (deprecation in 0.7.x, 0.7.2 release history). [1]
- CPython issue removing deprecated asyncio "loop" parameter (Python 3.10 change). [2]
🌐 Web query:
Python asyncio loop parameter deprecated event loop
💡 Result:
Short answer: The optional loop= arguments were deprecated/removed across asyncio because getting the current loop via asyncio.get_running_loop() (or higher-level APIs like asyncio.run and asyncio.create_task) is now the preferred, safer and faster approach. Remove explicit loop= usage and use get_running_loop()/get_event_loop() (in sync code with care), or the new high-level APIs. [1][2][6]
What changed (concise):
- Many asyncio APIs accepted a loop= parameter historically so you could target a specific event loop. That became redundant after get_event_loop() was made reliable/fast and newer helpers were added. The loop parameter was deprecated (around 3.8–3.10) and removed/forcibly rejected in later versions. [6][5]
- asyncio.get_event_loop() behavior was tightened; prefer asyncio.get_running_loop() inside coroutines/callbacks and asyncio.run() for top-level execution. Some event-loop policy functions are also being deprecated in recent Python releases. [2][1][4]
How to update code (practical):
- Inside coroutines/callbacks: use asyncio.get_running_loop() instead of passing loop=. Example: loop = asyncio.get_running_loop(); fut = loop.create_future(). [2][6]
- Use top-level helpers: asyncio.run(main()), asyncio.create_task(coro) (instead of loop.create_task), asyncio.sleep(), asyncio.wait(), asyncio.as_completed(), etc., without loop=. [6]
- From synchronous code that must access a loop: explicitly create/manage a loop (asyncio.new_event_loop(), set it on the thread) and pass that loop where strictly needed (e.g., run_coroutine_threadsafe(coro, loop) still expects a loop), but prefer running coroutines with asyncio.run or running a dedicated loop in that thread. [5][9]
- If a third-party API still takes loop=, upgrade the library; if you control it, remove the parameter and use get_running_loop()/get_event_loop() internally. [5][6]
References
- Python asyncio event loop docs (get_running_loop / get_event_loop / deprecation notes). [2]
- Stack Overflow explanation and history of deprecated loop parameters. [6]
- Python issue tracker: removal/simplification of loop parameter handling and related PRs. [5]
- Python deprecations index documenting related asyncio deprecations. [4]
Remove the deprecated loop parameter from AIOKafkaConsumer.
The loop parameter has been deprecated in aiokafka since version 0.7.1 and will be removed in 0.9.0. Passing it triggers a deprecation warning. Remove loop=self.App.Loop and let aiokafka use the running event loop automatically:
self.Consumer = AIOKafkaConsumer(
self.KafkaTopic,
group_id=self.KafkaGroupId,
bootstrap_servers=bootstrap_servers,
retry_backoff_ms=10000,
auto_offset_reset="earliest",
)🤖 Prompt for AI Agents
In @asabiris/handlers/kafkahandler.py around lines 68 - 76, The _build_consumer
method passes a deprecated loop parameter to AIOKafkaConsumer
(loop=self.App.Loop); remove the loop argument so AIOKafkaConsumer is
constructed without passing self.App.Loop and relies on the running event loop,
updating the AIOKafkaConsumer call in _build_consumer accordingly.
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.