-
Notifications
You must be signed in to change notification settings - Fork 119
core:services:kraken: Publish extension container logs to zenoh #3700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core:services:kraken: Publish extension container logs to zenoh #3700
Conversation
Reviewer's GuidePublishes live logs from running, enabled Kraken extension containers to Zenoh by introducing an ExtensionLogPublisher that tracks running containers, spawns per-extension log streaming tasks, and integrates its lifecycle into the Kraken service startup and shutdown flow. Sequence diagram for streaming extension container logs to ZenohsequenceDiagram
participant Main as main_py
participant Kraken as Kraken
participant Publisher as ExtensionLogPublisher
participant CM as ContainerManager
participant Docker as Docker_daemon
participant ZS as ZenohSession
participant Zenoh as Zenoh_broker
Main->>Kraken: create Kraken()
Main->>Kraken: start_extension_logs_task() as background task
loop every 2s while is_running
Kraken->>Publisher: sync_with_running_extensions()
activate Publisher
Publisher->>CM: get_running_containers()
activate CM
CM->>Docker: list running containers
Docker-->>CM: running container list
CM-->>Publisher: running_containers
deactivate CM
Publisher->>Extension: _fetch_settings()
Extension-->>Publisher: List ExtensionSettings
Publisher->>Publisher: _collect_desired_streams()
Publisher->>Publisher: _start_missing_streams()
Publisher->>Publisher: _stop_removed_streams()
deactivate Publisher
end
par per running, enabled extension
loop log stream
Publisher->>CM: get_container_log_by_name(container_name)
activate CM
CM->>Docker: stream container logs
Docker-->>CM: log line (str | bytes)
CM-->>Publisher: log line (str | bytes)
deactivate CM
Publisher->>Publisher: _format_log_payload(container_name, message)
Publisher->>ZS: _publish(topic, payload)
ZS->>Zenoh: put(topic, payload)
end
and on container stop
Publisher->>Publisher: cancel stream task via _stop_removed_streams()
end
Main->>Kraken: stop()
Kraken->>Publisher: shutdown()
Publisher->>Publisher: cancel all log stream tasks
Publisher-->>Kraken: shutdown complete
Class diagram for ExtensionLogPublisher integration into Kraken serviceclassDiagram
class Kraken {
- manager
- _settings
- is_running bool
- manifest
- extension_log_publisher ExtensionLogPublisher
+ start_cleaner_task() async
+ start_starter_task() async
+ start_extension_logs_task() async
+ stop() async
}
class ExtensionLogPublisher {
- _zenoh_session ZenohSession
- _tasks Dict~str, asyncio.Task~None~~
+ ExtensionLogPublisher()
+ sync_with_running_extensions() async
+ shutdown() async
- _collect_desired_streams() async Optional~Dict~str, ExtensionSettings~~
- _start_missing_streams(desired_streams Dict~str, ExtensionSettings~) void
- _stop_removed_streams(desired_streams Dict~str, ExtensionSettings~) void
- _make_cleanup_callback(container_name str) Callable~asyncio.Task~None~~, None~
- _stream_logs(extension ExtensionSettings) async
- _publish(topic str, log_line str) void
+ _topic_for(extension ExtensionSettings) str
+ _format_log_payload(container_name str, message str) str
+ _extract_level(message str) Tuple~int, str~
}
class ContainerManager {
+ get_running_containers() async List
+ get_container_log_by_name(container_name str) async AsyncGenerator~str | bytes, None~
}
class ZenohSession {
+ ZenohSession(service_name str)
+ session
}
class ExtensionSettings {
+ enabled bool
+ identifier str
+ name str
+ container_name() str
}
class Extension {
+ _fetch_settings() List~ExtensionSettings~
}
Kraken *-- ExtensionLogPublisher
ExtensionLogPublisher ..> ContainerManager
ExtensionLogPublisher ..> ZenohSession
ExtensionLogPublisher ..> ExtensionSettings
ExtensionLogPublisher ..> Extension
Architecture diagram for Kraken extension logs published to Zenohflowchart LR
Kraken["Kraken service"]
ExtMgr["Kraken extensions (ExtensionSettings)"]
ExtContainers["Extension containers"]
Docker["Docker daemon"]
Pub["ExtensionLogPublisher"]
ZenohSessionNode["ZenohSession (in-process)"]
ZenohBroker["Zenoh broker / fabric"]
Kraken -->|manages lifecycle| Pub
Kraken --> ExtMgr
ExtMgr -->|enabled extensions| Pub
ExtContainers -->|run as containers| Docker
Pub -->|discover running containers| Docker
Pub -->|stream logs via get_container_log_by_name| Docker
Pub -->|formats log payloads| Pub
Pub -->|publish log_topic and payload| ZenohSessionNode
ZenohSessionNode -->|put| ZenohBroker
ZenohBroker -->|log topics extensions/logs/*| Consumers["Log consumers (e.g. UI, recorder)"]
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
a66df79 to
9376b54
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- In
ExtensionLogPublisher.shutdown, consider also explicitly closing the underlyingZenohSession(or exposing a close method on it) so that resources and connections are cleaned up when Kraken stops. - In
_collect_desired_streams, errors fromContainerManager.get_running_containersare handled, butExtension._fetch_settings()can still raise and break the sync loop; consider wrapping that call in the same try/except to keep the log publisher resilient to transient settings issues.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `ExtensionLogPublisher.shutdown`, consider also explicitly closing the underlying `ZenohSession` (or exposing a close method on it) so that resources and connections are cleaned up when Kraken stops.
- In `_collect_desired_streams`, errors from `ContainerManager.get_running_containers` are handled, but `Extension._fetch_settings()` can still raise and break the sync loop; consider wrapping that call in the same try/except to keep the log publisher resilient to transient settings issues.
## Individual Comments
### Comment 1
<location> `core/services/kraken/extension_logs.py:28-29` </location>
<code_context>
+ }
+
+ def __init__(self) -> None:
+ self._zenoh_session = ZenohSession(SERVICE_NAME)
+ self._tasks: Dict[str, asyncio.Task[None]] = {}
+
+ async def sync_with_running_extensions(self) -> None:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider explicitly closing the Zenoh session during shutdown to avoid resource leakage.
`ExtensionLogPublisher` instantiates a `ZenohSession` in `__init__`, but `shutdown` only cancels tasks and never closes the session. If `ZenohSession` provides an explicit close/shutdown API or context manager, please call it from `shutdown` so network resources are released deterministically.
Suggested implementation:
```python
async def shutdown(self) -> None:
for task in self._tasks.values():
task.cancel()
self._tasks.clear()
# Explicitly close the Zenoh session to release network resources
await self._zenoh_session.close()
```
If `ZenohSession.close()` is not an `async` method, remove the `await` keyword:
- `await self._zenoh_session.close()` → `self._zenoh_session.close()`
If `ZenohSession` uses a different shutdown API (e.g. `shutdown()`, `stop()`, or `aclose()`), replace `.close()` with the appropriate method name.
If there is a project-wide pattern for closing `ZenohSession` (e.g. used elsewhere in the codebase), align this shutdown implementation with that pattern (including any error handling or logging around close).
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
9376b54 to
99d71a5
Compare
Summary by Sourcery
Add support for installing Kraken extensions from uploaded Docker image tar files with temporary extension handling and automatic cleanup, along with live log streaming for running extensions.
New Features:
Enhancements:
Summary by Sourcery
Stream Kraken extension container logs over Zenoh for running extensions and manage their lifecycle alongside existing background tasks.
Enhancements: