Skip to content

Conversation

@joaomariolago
Copy link
Collaborator

@joaomariolago joaomariolago commented Dec 19, 2025

Related to #2962 since in conjunction with blueos-recorder these logs are gonna be saved

Summary by Sourcery

Add support for installing Kraken extensions from uploaded Docker image tar files with temporary extension handling and automatic cleanup, along with live log streaming for running extensions.

New Features:

  • Introduce a UI flow to upload a .tar Docker image, inspect its metadata, and configure/install it as a Kraken extension.
  • Expose backend API endpoints to upload extension tar files, keep temporary uploads alive, and finalize their installation.
  • Stream extension container logs over Zenoh for enabled, running extensions.

Enhancements:

  • Update the extension manager floating action button to offer both creation from scratch and installation from file options.
  • Exclude temporary, identifier-less extensions from the standard extension listing API and UI.
  • Improve extension log styling and theming to match the application sheet background.
  • Add periodic cleanup of expired temporary extensions and associated Docker image tags.

Summary by Sourcery

Stream Kraken extension container logs over Zenoh for running extensions and manage their lifecycle alongside existing background tasks.

Enhancements:

  • Introduce an extension log publisher that discovers enabled, running extensions and streams their container logs over Zenoh with basic log-level extraction and JSON formatting.
  • Track and clean up per-container log streaming tasks in Kraken, including graceful shutdown handling for log publishers.
  • Extend container log retrieval to support both string and byte outputs for compatibility with the new log streaming pipeline.
  • Add a periodic background task in the Kraken service to keep extension log streams in sync with currently running extension containers.

@sourcery-ai
Copy link

sourcery-ai bot commented Dec 19, 2025

Reviewer's Guide

Publishes live logs from running, enabled Kraken extension containers to Zenoh by introducing an ExtensionLogPublisher that tracks running containers, spawns per-extension log streaming tasks, and integrates its lifecycle into the Kraken service startup and shutdown flow.

Sequence diagram for streaming extension container logs to Zenoh

sequenceDiagram
    participant Main as main_py
    participant Kraken as Kraken
    participant Publisher as ExtensionLogPublisher
    participant CM as ContainerManager
    participant Docker as Docker_daemon
    participant ZS as ZenohSession
    participant Zenoh as Zenoh_broker

    Main->>Kraken: create Kraken()
    Main->>Kraken: start_extension_logs_task() as background task

    loop every 2s while is_running
        Kraken->>Publisher: sync_with_running_extensions()
        activate Publisher
        Publisher->>CM: get_running_containers()
        activate CM
        CM->>Docker: list running containers
        Docker-->>CM: running container list
        CM-->>Publisher: running_containers
        deactivate CM

        Publisher->>Extension: _fetch_settings()
        Extension-->>Publisher: List ExtensionSettings
        Publisher->>Publisher: _collect_desired_streams()
        Publisher->>Publisher: _start_missing_streams()
        Publisher->>Publisher: _stop_removed_streams()
        deactivate Publisher
    end

    par per running, enabled extension
        loop log stream
            Publisher->>CM: get_container_log_by_name(container_name)
            activate CM
            CM->>Docker: stream container logs
            Docker-->>CM: log line (str | bytes)
            CM-->>Publisher: log line (str | bytes)
            deactivate CM

            Publisher->>Publisher: _format_log_payload(container_name, message)
            Publisher->>ZS: _publish(topic, payload)
            ZS->>Zenoh: put(topic, payload)
        end
    and on container stop
        Publisher->>Publisher: cancel stream task via _stop_removed_streams()
    end

    Main->>Kraken: stop()
    Kraken->>Publisher: shutdown()
    Publisher->>Publisher: cancel all log stream tasks
    Publisher-->>Kraken: shutdown complete
Loading

Class diagram for ExtensionLogPublisher integration into Kraken service

classDiagram
    class Kraken {
        - manager
        - _settings
        - is_running bool
        - manifest
        - extension_log_publisher ExtensionLogPublisher
        + start_cleaner_task() async
        + start_starter_task() async
        + start_extension_logs_task() async
        + stop() async
    }

    class ExtensionLogPublisher {
        - _zenoh_session ZenohSession
        - _tasks Dict~str, asyncio.Task~None~~
        + ExtensionLogPublisher()
        + sync_with_running_extensions() async
        + shutdown() async
        - _collect_desired_streams() async Optional~Dict~str, ExtensionSettings~~
        - _start_missing_streams(desired_streams Dict~str, ExtensionSettings~) void
        - _stop_removed_streams(desired_streams Dict~str, ExtensionSettings~) void
        - _make_cleanup_callback(container_name str) Callable~asyncio.Task~None~~, None~
        - _stream_logs(extension ExtensionSettings) async
        - _publish(topic str, log_line str) void
        + _topic_for(extension ExtensionSettings) str
        + _format_log_payload(container_name str, message str) str
        + _extract_level(message str) Tuple~int, str~
    }

    class ContainerManager {
        + get_running_containers() async List
        + get_container_log_by_name(container_name str) async AsyncGenerator~str | bytes, None~
    }

    class ZenohSession {
        + ZenohSession(service_name str)
        + session
    }

    class ExtensionSettings {
        + enabled bool
        + identifier str
        + name str
        + container_name() str
    }

    class Extension {
        + _fetch_settings() List~ExtensionSettings~
    }

    Kraken *-- ExtensionLogPublisher
    ExtensionLogPublisher ..> ContainerManager
    ExtensionLogPublisher ..> ZenohSession
    ExtensionLogPublisher ..> ExtensionSettings
    ExtensionLogPublisher ..> Extension
Loading

Architecture diagram for Kraken extension logs published to Zenoh

flowchart LR
    Kraken["Kraken service"]
    ExtMgr["Kraken extensions (ExtensionSettings)"]
    ExtContainers["Extension containers"]
    Docker["Docker daemon"]
    Pub["ExtensionLogPublisher"]
    ZenohSessionNode["ZenohSession (in-process)"]
    ZenohBroker["Zenoh broker / fabric"]

    Kraken -->|manages lifecycle| Pub
    Kraken --> ExtMgr

    ExtMgr -->|enabled extensions| Pub

    ExtContainers -->|run as containers| Docker
    Pub -->|discover running containers| Docker

    Pub -->|stream logs via get_container_log_by_name| Docker
    Pub -->|formats log payloads| Pub

    Pub -->|publish log_topic and payload| ZenohSessionNode
    ZenohSessionNode -->|put| ZenohBroker

    ZenohBroker -->|log topics extensions/logs/*| Consumers["Log consumers (e.g. UI, recorder)"]
Loading

File-Level Changes

Change Details Files
Introduce ExtensionLogPublisher for streaming extension container logs over Zenoh and manage per-container streaming tasks.
  • Create ExtensionLogPublisher class that maintains a Zenoh session and a mapping of container-name to asyncio Task
  • Determine which extensions should have active log streams by inspecting running Docker containers and enabled extension settings
  • Start new log streaming tasks for containers that should be streaming and stop tasks for containers that no longer match
  • For each active extension container, stream logs from Docker, normalize them, wrap them in a JSON payload with timestamp and level, and publish them to a Zenoh topic
core/services/kraken/extension_logs.py
Wire ExtensionLogPublisher into Kraken service lifecycle to continuously sync log streams and shut them down cleanly.
  • Instantiate ExtensionLogPublisher in Kraken service initialization
  • Add a periodic background task in Kraken to sync desired extension log streams with currently running containers
  • Ensure ExtensionLogPublisher tasks are cancelled and awaited on service shutdown
core/services/kraken/kraken.py
core/services/kraken/main.py
Adjust container log retrieval to support both string and bytes payloads for streaming to Zenoh.
  • Relax the return type of get_container_log_by_name to allow bytes in addition to strings to match Docker log output
  • Decode bytes to UTF-8 with replacement before formatting and publishing log messages
core/services/kraken/harbor/container.py
core/services/kraken/extension_logs.py

Possibly linked issues

  • #(tracker item: "log"): PR implements the Kraken "log" tracker item by adding Zenoh-based streaming of extension container logs and management tasks.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@joaomariolago joaomariolago changed the title core:services:kraken: Add load/inpect/install from tar core:services:kraken: Publish extension container logs to zenoh Dec 19, 2025
@joaomariolago joaomariolago force-pushed the add-ext-zenoh-logs branch 3 times, most recently from a66df79 to 9376b54 Compare December 19, 2025 12:44
@joaomariolago joaomariolago marked this pull request as ready for review December 19, 2025 12:50
Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • In ExtensionLogPublisher.shutdown, consider also explicitly closing the underlying ZenohSession (or exposing a close method on it) so that resources and connections are cleaned up when Kraken stops.
  • In _collect_desired_streams, errors from ContainerManager.get_running_containers are handled, but Extension._fetch_settings() can still raise and break the sync loop; consider wrapping that call in the same try/except to keep the log publisher resilient to transient settings issues.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `ExtensionLogPublisher.shutdown`, consider also explicitly closing the underlying `ZenohSession` (or exposing a close method on it) so that resources and connections are cleaned up when Kraken stops.
- In `_collect_desired_streams`, errors from `ContainerManager.get_running_containers` are handled, but `Extension._fetch_settings()` can still raise and break the sync loop; consider wrapping that call in the same try/except to keep the log publisher resilient to transient settings issues.

## Individual Comments

### Comment 1
<location> `core/services/kraken/extension_logs.py:28-29` </location>
<code_context>
+    }
+
+    def __init__(self) -> None:
+        self._zenoh_session = ZenohSession(SERVICE_NAME)
+        self._tasks: Dict[str, asyncio.Task[None]] = {}
+
+    async def sync_with_running_extensions(self) -> None:
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Consider explicitly closing the Zenoh session during shutdown to avoid resource leakage.

`ExtensionLogPublisher` instantiates a `ZenohSession` in `__init__`, but `shutdown` only cancels tasks and never closes the session. If `ZenohSession` provides an explicit close/shutdown API or context manager, please call it from `shutdown` so network resources are released deterministically.

Suggested implementation:

```python
    async def shutdown(self) -> None:
        for task in self._tasks.values():
            task.cancel()
        self._tasks.clear()

        # Explicitly close the Zenoh session to release network resources
        await self._zenoh_session.close()

```

If `ZenohSession.close()` is not an `async` method, remove the `await` keyword:

- `await self._zenoh_session.close()``self._zenoh_session.close()`

If `ZenohSession` uses a different shutdown API (e.g. `shutdown()`, `stop()`, or `aclose()`), replace `.close()` with the appropriate method name.

If there is a project-wide pattern for closing `ZenohSession` (e.g. used elsewhere in the codebase), align this shutdown implementation with that pattern (including any error handling or logging around close).
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@patrickelectric patrickelectric merged commit 4320b3b into bluerobotics:master Dec 19, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants