From 2c8404aa6568bb44d0cd93217bf53d78b7d53f2d Mon Sep 17 00:00:00 2001 From: cayossarian <23534755+cayossarian@users.noreply.github.com> Date: Mon, 9 Mar 2026 14:37:08 -0700 Subject: [PATCH] feat: derive panel_size from Homie schema during connect() Panel size is now sourced from the Homie schema's circuit space format ("1:N:1") fetched via GET /api/v2/homie/schema, replacing a non-deterministic heuristic that inferred size from the highest occupied breaker tab. SpanMqttClient.connect() fetches the schema internally so callers no longer need to pass panel_size. SpanPanelSnapshot.panel_size is now int (was int | None). --- CHANGELOG.md | 18 +++ README.md | 1 + pyproject.toml | 2 +- src/span_panel_api/__init__.py | 2 + src/span_panel_api/models.py | 31 ++++- src/span_panel_api/mqtt/client.py | 46 ++++--- src/span_panel_api/mqtt/homie.py | 30 ++--- tests/conftest.py | 13 ++ tests/test_auth_and_simulation_helpers.py | 2 +- tests/test_detection_auth.py | 54 ++++++++ tests/test_mqtt_homie.py | 156 ++++++++++++++-------- tests/test_protocol_models.py | 1 + 12 files changed, 259 insertions(+), 97 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 146b082..bf9df23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,23 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.2.3] - 03/2026 + +### Changed + +- **Panel size sourced from Homie schema** — `panel_size` is now derived from the circuit `space` property format in the Homie schema (`GET /api/v2/homie/schema`), which declares the valid range as `"1:N:1"` where N is the panel size. This replaces a + non-deterministic heuristic that inferred panel size from the highest occupied breaker tab, which would undercount when trailing positions were empty. +- **`SpanMqttClient.connect()` fetches schema internally** — the client automatically calls `get_homie_schema()` during `connect()` and passes the panel size to `HomieDeviceConsumer`. Callers no longer need to fetch or pass `panel_size`. +- **`SpanPanelSnapshot.panel_size`** — type changed from `int | None` to `int`; always populated from the schema +- **`V2HomieSchema.panel_size`** — new property that parses the schema's circuit space format to extract the authoritative panel size +- **`V2HomieSchema` exported** from package public API +- **`HomieDeviceConsumer` requires `panel_size`** — new required constructor parameter; unmapped tabs now fill to the schema-defined panel size rather than deriving from circuit data +- **`create_span_client()` simplified** — `panel_size` parameter removed; schema is fetched internally by `SpanMqttClient.connect()` + +### Removed + +- **MQTT `core/panel-size` topic parsing** — removed from `HomieDeviceConsumer`; panel size comes from the schema, not a runtime MQTT property + ## [2.0.0] - 02/2026 v2.0.0 is a ground-up rewrite. The REST/OpenAPI transport has been removed entirely in favor of MQTT/Homie — the SPAN Panel's native v2 protocol. This is a breaking change: all consumer code must be updated to use the new API surface. @@ -227,6 +244,7 @@ Package versions prior to 2.0.0 depend on the SPAN v1 REST API. SPAN will sunset | Version | Date | Transport | Summary | | ---------- | ------- | ---------- | ---------------------------------------------------------------------------------- | +| **2.2.3** | 03/2026 | MQTT/Homie | Panel size from Homie schema; `panel_size` always populated on snapshot | | **2.0.2** | 03/2026 | MQTT/Homie | EVSE (EV charger) snapshot model, Homie parsing, simulation support | | **2.0.1** | 03/2026 | MQTT/Homie | Full BESS metadata parsing, README documentation | | **2.0.0** | 02/2026 | MQTT/Homie | Ground-up rewrite: MQTT-only, protocol-based API, real-time push, PV/BESS metadata | diff --git a/README.md b/README.md index 9720c21..1b604e5 100644 --- a/README.md +++ b/README.md @@ -223,6 +223,7 @@ pem = await download_ca_cert("192.168.1.100") # Fetch the Homie property schema (unauthenticated) schema = await get_homie_schema("192.168.1.100") +print(f"Panel size: {schema.panel_size} spaces") print(f"Schema hash: {schema.types_schema_hash}") # Rotate MQTT broker password (invalidates previous password) diff --git a/pyproject.toml b/pyproject.toml index 33995d1..5b4683d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "span-panel-api" -version = "2.2.2" +version = "2.2.3" description = "A client library for SPAN Panel API" authors = [ {name = "SpanPanel"} diff --git a/src/span_panel_api/__init__.py b/src/span_panel_api/__init__.py index 76e9ec5..277dc44 100644 --- a/src/span_panel_api/__init__.py +++ b/src/span_panel_api/__init__.py @@ -24,6 +24,7 @@ SpanPanelSnapshot, SpanPVSnapshot, V2AuthResponse, + V2HomieSchema, V2StatusInfo, ) from .mqtt import MqttClientConfig, SpanMqttClient @@ -66,6 +67,7 @@ "detect_api_version", # v2 auth "V2AuthResponse", + "V2HomieSchema", "V2StatusInfo", "download_ca_cert", "get_homie_schema", diff --git a/src/span_panel_api/models.py b/src/span_panel_api/models.py index c354b69..b4758cd 100644 --- a/src/span_panel_api/models.py +++ b/src/span_panel_api/models.py @@ -110,6 +110,9 @@ class V2StatusInfo: firmware_version: str +_CIRCUIT_TYPE_KEY = "energy.ebus.device.circuit" + + @dataclass(frozen=True, slots=True) class V2HomieSchema: """Response from GET /api/v2/homie/schema.""" @@ -118,6 +121,32 @@ class V2HomieSchema: types_schema_hash: str # SHA-256, first 16 hex chars types: dict[str, dict[str, object]] # {type_name: {prop_name: {attr: value}}} + @property + def panel_size(self) -> int: + """Extract panel size from the circuit ``space`` property format. + + The Homie schema defines ``space`` with ``"format": "min:max:step"`` + (e.g. ``"1:32:1"``). The *max* value is the number of breaker spaces + in the panel. + + Raises: + ValueError: If the space format is missing or unparseable. + """ + circuit_type = self.types.get(_CIRCUIT_TYPE_KEY, {}) + space_prop = circuit_type.get("space") + if not isinstance(space_prop, dict): + raise ValueError(f"Schema missing '{_CIRCUIT_TYPE_KEY}/space' property") + fmt = space_prop.get("format") + if not isinstance(fmt, str): + raise ValueError(f"Schema '{_CIRCUIT_TYPE_KEY}/space' has no format string") + parts = fmt.split(":") + if len(parts) != 3: + raise ValueError(f"Unexpected space format '{fmt}', expected 'min:max:step'") + try: + return int(parts[1]) + except ValueError as exc: + raise ValueError(f"Cannot parse max from space format '{fmt}'") from exc + @dataclass(frozen=True, slots=True) class SpanPanelSnapshot: @@ -146,6 +175,7 @@ class SpanPanelSnapshot: eth0_link: bool # v1: direct | v2: core/ethernet wlan_link: bool # v1: direct | v2: core/wifi wwan_link: bool # v1: direct | v2: vendor-cloud == "CONNECTED" + panel_size: int # Total breaker spaces (from Homie schema space format) # v2-native fields — None for REST transport dominant_power_source: str | None = None # v2: core/dominant-power-source (settable) @@ -156,7 +186,6 @@ class SpanPanelSnapshot: main_breaker_rating_a: int | None = None # v2: core/breaker-rating (A) wifi_ssid: str | None = None # v2: core/wifi-ssid vendor_cloud: str | None = None # v2: core/vendor-cloud - panel_size: int | None = None # v2: core/panel-size (total breaker spaces) # Power flows (None when node not present) power_flow_pv: float | None = None # v2: power-flows/pv (W) diff --git a/src/span_panel_api/mqtt/client.py b/src/span_panel_api/mqtt/client.py index 882eae7..16ad093 100644 --- a/src/span_panel_api/mqtt/client.py +++ b/src/span_panel_api/mqtt/client.py @@ -11,6 +11,7 @@ from collections.abc import Awaitable, Callable import logging +from ..auth import get_homie_schema from ..exceptions import SpanPanelConnectionError, SpanPanelServerError from ..models import SpanPanelSnapshot from ..protocol import PanelCapability @@ -43,7 +44,7 @@ def __init__( self._snapshot_interval = snapshot_interval self._bridge: AsyncMqttBridge | None = None - self._homie = HomieDeviceConsumer(serial_number) + self._homie: HomieDeviceConsumer | None = None self._streaming = False self._snapshot_callbacks: list[Callable[[SpanPanelSnapshot], Awaitable[None]]] = [] self._ready_event: asyncio.Event | None = None @@ -51,6 +52,12 @@ def __init__( self._background_tasks: set[asyncio.Task[None]] = set() self._snapshot_timer: asyncio.TimerHandle | None = None + def _require_homie(self) -> HomieDeviceConsumer: + """Return the HomieDeviceConsumer, raising if not yet connected.""" + if self._homie is None: + raise SpanPanelConnectionError("Client not connected — call connect() first") + return self._homie + # -- SpanPanelClientProtocol ------------------------------------------- @property @@ -72,10 +79,11 @@ async def connect(self) -> None: """Connect to MQTT broker and wait for Homie device ready. Flow: - 1. Create AsyncMqttBridge with broker credentials - 2. Connect to MQTT broker - 3. Subscribe to ebus/5/{serial}/# - 4. Wait for $state==ready and $description parsed + 1. Fetch Homie schema to determine panel size + 2. Create AsyncMqttBridge with broker credentials + 3. Connect to MQTT broker + 4. Subscribe to ebus/5/{serial}/# + 5. Wait for $state==ready and $description parsed Raises: SpanPanelConnectionError: Cannot connect or device not ready @@ -84,6 +92,10 @@ async def connect(self) -> None: self._loop = asyncio.get_running_loop() self._ready_event = asyncio.Event() + # Fetch schema to determine panel size before processing any messages + schema = await get_homie_schema(self._host) + self._homie = HomieDeviceConsumer(self._serial_number, schema.panel_size) + _LOGGER.debug( "MQTT: Creating bridge to %s:%s (serial=%s)", self._broker_config.broker_host, @@ -145,7 +157,7 @@ async def close(self) -> None: async def ping(self) -> bool: """Check if MQTT connection is alive and device is ready.""" - if self._bridge is None: + if self._bridge is None or self._homie is None: return False return self._bridge.is_connected() and self._homie.is_ready() @@ -154,7 +166,7 @@ async def get_snapshot(self) -> SpanPanelSnapshot: No network call — snapshot is built from in-memory property values. """ - return self._homie.build_snapshot() + return self._require_homie().build_snapshot() # -- CircuitControlProtocol -------------------------------------------- @@ -188,7 +200,7 @@ async def set_dominant_power_source(self, value: str) -> None: Args: value: DPS enum value (GRID, BATTERY, NONE, GENERATOR, PV) """ - core_node = self._homie.find_node_by_type(TYPE_CORE) + core_node = self._require_homie().find_node_by_type(TYPE_CORE) if core_node is None: raise SpanPanelServerError("Core node not found in panel topology") topic = PROPERTY_SET_TOPIC_FMT.format(serial=self._serial_number, node=core_node, prop="dominant-power-source") @@ -228,15 +240,18 @@ async def stop_streaming(self) -> None: def _on_message(self, topic: str, payload: str) -> None: """Handle incoming MQTT message (called from asyncio loop).""" - was_ready = self._homie.is_ready() - self._homie.handle_message(topic, payload) + homie = self._homie + if homie is None: + return + was_ready = homie.is_ready() + homie.handle_message(topic, payload) # Check if device just became ready - if not was_ready and self._homie.is_ready() and self._ready_event is not None: + if not was_ready and homie.is_ready() and self._ready_event is not None: self._ready_event.set() # Dispatch snapshot callbacks if streaming - if self._streaming and self._homie.is_ready() and self._loop is not None: + if self._streaming and homie.is_ready() and self._loop is not None: if self._snapshot_interval <= 0: # No debounce — dispatch immediately (backward compat) self._create_dispatch_task() @@ -263,15 +278,16 @@ async def _wait_for_circuit_names(self, timeout: float) -> None: returns as soon as all circuit names are populated, or when the timeout elapses (non-fatal — entities will use fallback names). """ + homie = self._require_homie() deadline = asyncio.get_event_loop().time() + timeout while asyncio.get_event_loop().time() < deadline: - missing = self._homie.circuit_nodes_missing_names() + missing = homie.circuit_nodes_missing_names() if not missing: _LOGGER.debug("All circuit names received") return await asyncio.sleep(_CIRCUIT_NAMES_POLL_INTERVAL_S) - still_missing = self._homie.circuit_nodes_missing_names() + still_missing = homie.circuit_nodes_missing_names() if still_missing: _LOGGER.warning( "Timed out waiting for circuit names (%d still missing): %s", @@ -313,7 +329,7 @@ def set_snapshot_interval(self, interval: float) -> None: async def _dispatch_snapshot(self) -> None: """Build snapshot and send to all registered callbacks.""" - snapshot = self._homie.build_snapshot() + snapshot = self._require_homie().build_snapshot() for cb in list(self._snapshot_callbacks): try: await cb(snapshot) diff --git a/src/span_panel_api/mqtt/homie.py b/src/span_panel_api/mqtt/homie.py index 568d202..3148cb7 100644 --- a/src/span_panel_api/mqtt/homie.py +++ b/src/span_panel_api/mqtt/homie.py @@ -65,8 +65,9 @@ class HomieDeviceConsumer: (guaranteed by AsyncMqttBridge's call_soon_threadsafe dispatch). """ - def __init__(self, serial_number: str) -> None: + def __init__(self, serial_number: str, panel_size: int) -> None: self._serial_number = serial_number + self._panel_size = panel_size self._topic_prefix = f"{TOPIC_PREFIX}/{serial_number}" self._state: str = "" @@ -440,28 +441,21 @@ def _derive_run_config(self, dsm_state: str, grid_islandable: bool | None, dps: return "UNKNOWN" - def _build_unmapped_tabs(self, circuits: dict[str, SpanCircuitSnapshot]) -> dict[str, SpanCircuitSnapshot]: + def _build_unmapped_tabs( + self, + circuits: dict[str, SpanCircuitSnapshot], + ) -> dict[str, SpanCircuitSnapshot]: """Synthesize unmapped tab entries for breaker positions with no circuit. - Determines panel size from the highest occupied tab, then creates - zero-power SpanCircuitSnapshot entries for unoccupied positions. + Creates zero-power SpanCircuitSnapshot entries for unoccupied positions + up to ``self._panel_size``. """ - # Collect all occupied tabs from commissioned circuits occupied_tabs: set[int] = set() for circuit in circuits.values(): occupied_tabs.update(circuit.tabs) - if not occupied_tabs: - return {} - - # Panel size is the highest occupied tab (rounded up to even - # to cover both bus bar sides) - max_tab = max(occupied_tabs) - panel_size = max_tab if max_tab % 2 == 0 else max_tab + 1 - - # Synthesize entries for unoccupied positions unmapped: dict[str, SpanCircuitSnapshot] = {} - for tab in range(1, panel_size + 1): + for tab in range(1, self._panel_size + 1): if tab not in occupied_tabs: circuit_id = f"unmapped_tab_{tab}" unmapped[circuit_id] = SpanCircuitSnapshot( @@ -500,7 +494,6 @@ def _build_snapshot(self) -> SpanPanelSnapshot: main_breaker: int | None = None wifi_ssid: str | None = None vendor_cloud: str | None = None - panel_size: int | None = None if core_node is not None: firmware = self._get_prop(core_node, "software-version") @@ -531,9 +524,6 @@ def _build_snapshot(self) -> SpanPanelSnapshot: ws = self._get_prop(core_node, "wifi-ssid") wifi_ssid = ws if ws else None - ps = self._get_prop(core_node, "panel-size") - panel_size = _parse_int(ps) if ps else None - # Upstream lugs → main meter (grid connection) # imported-energy = energy imported from the grid = consumed by the house # exported-energy = energy exported to the grid = produced (solar) @@ -638,6 +628,7 @@ def _build_snapshot(self) -> SpanPanelSnapshot: eth0_link=eth0, wlan_link=wlan, wwan_link=wwan_connected, + panel_size=self._panel_size, dominant_power_source=dominant_power_source, grid_state=grid_state, grid_islandable=grid_islandable, @@ -646,7 +637,6 @@ def _build_snapshot(self) -> SpanPanelSnapshot: main_breaker_rating_a=main_breaker, wifi_ssid=wifi_ssid, vendor_cloud=vendor_cloud, - panel_size=panel_size, power_flow_pv=power_flow_pv, power_flow_battery=power_flow_battery, power_flow_grid=power_flow_grid, diff --git a/tests/conftest.py b/tests/conftest.py index 2e28533..9cb7e35 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,6 +13,7 @@ from paho.mqtt.client import ConnectFlags from paho.mqtt.reasoncodes import ReasonCode +from span_panel_api.models import V2HomieSchema from span_panel_api.mqtt.const import TOPIC_PREFIX, TYPE_CORE # --------------------------------------------------------------------------- @@ -25,6 +26,17 @@ # Minimal Homie description that makes the device "ready" MINIMAL_DESCRIPTION = json.dumps({"nodes": {"core": {"type": TYPE_CORE}}}) +# Mock schema for SpanMqttClient.connect() — panel_size=32 +_MOCK_SCHEMA = V2HomieSchema( + firmware_version="test", + types_schema_hash="sha256:test", + types={ + "energy.ebus.device.circuit": { + "space": {"datatype": "integer", "format": "1:32:1"}, + }, + }, +) + # --------------------------------------------------------------------------- # Mock MQTT client fixture @@ -89,6 +101,7 @@ def _reconnect() -> int: patch("span_panel_api.mqtt.connection.AsyncMQTTClient") as cls, patch("span_panel_api.mqtt.connection.download_ca_cert", return_value="FAKE-PEM"), patch("span_panel_api.mqtt.connection.tempfile") as mock_tempfile, + patch("span_panel_api.mqtt.client.get_homie_schema", return_value=_MOCK_SCHEMA), ): # Make tempfile return a mock file object mock_tmp = MagicMock() diff --git a/tests/test_auth_and_simulation_helpers.py b/tests/test_auth_and_simulation_helpers.py index 40b7588..8aecf29 100644 --- a/tests/test_auth_and_simulation_helpers.py +++ b/tests/test_auth_and_simulation_helpers.py @@ -102,7 +102,7 @@ def test_invalid_with_custom_default(self) -> None: class TestHomieCallbackUnregister: def test_unregister_removes_callback(self) -> None: - consumer = HomieDeviceConsumer("test-serial") + consumer = HomieDeviceConsumer("test-serial", panel_size=32) cb = AsyncMock() unregister = consumer.register_property_callback(cb) unregister() diff --git a/tests/test_detection_auth.py b/tests/test_detection_auth.py index eab8a20..3c1ada3 100644 --- a/tests/test_detection_auth.py +++ b/tests/test_detection_auth.py @@ -305,6 +305,60 @@ async def test_schema_frozen(self): with pytest.raises(AttributeError): result.firmware_version = "changed" # type: ignore[misc] + def test_panel_size_from_space_format(self): + """panel_size extracts max from circuit space format 'min:max:step'.""" + types = { + "energy.ebus.device.circuit": { + "space": {"datatype": "integer", "format": "1:32:1"}, + }, + } + schema = V2HomieSchema(firmware_version="fw", types_schema_hash="hash", types=types) + assert schema.panel_size == 32 + + def test_panel_size_different_max(self): + types = { + "energy.ebus.device.circuit": { + "space": {"datatype": "integer", "format": "1:40:1"}, + }, + } + schema = V2HomieSchema(firmware_version="fw", types_schema_hash="hash", types=types) + assert schema.panel_size == 40 + + def test_panel_size_missing_circuit_type_raises(self): + schema = V2HomieSchema(firmware_version="fw", types_schema_hash="hash", types={}) + with pytest.raises(ValueError, match="space"): + _ = schema.panel_size + + def test_panel_size_missing_space_property_raises(self): + types = {"energy.ebus.device.circuit": {"name": {"datatype": "string"}}} + schema = V2HomieSchema(firmware_version="fw", types_schema_hash="hash", types=types) + with pytest.raises(ValueError, match="space"): + _ = schema.panel_size + + def test_panel_size_bad_format_raises(self): + types = { + "energy.ebus.device.circuit": { + "space": {"datatype": "integer", "format": "invalid"}, + }, + } + schema = V2HomieSchema(firmware_version="fw", types_schema_hash="hash", types=types) + with pytest.raises(ValueError, match="format"): + _ = schema.panel_size + + def test_panel_size_from_live_fixture(self): + """panel_size works with the real panel schema fixture.""" + import json + from pathlib import Path + + fixture = Path(__file__).parent / "fixtures" / "v2" / "homie_schema.json" + data = json.loads(fixture.read_text()) + schema = V2HomieSchema( + firmware_version=data["firmwareVersion"], + types_schema_hash="sha256:test", + types=data["types"], + ) + assert schema.panel_size == 32 + # =================================================================== # regenerate_passphrase diff --git a/tests/test_mqtt_homie.py b/tests/test_mqtt_homie.py index a527a29..d4eb898 100644 --- a/tests/test_mqtt_homie.py +++ b/tests/test_mqtt_homie.py @@ -80,7 +80,7 @@ def _full_description() -> dict: def _build_ready_consumer(description_nodes: dict | None = None) -> HomieDeviceConsumer: """Create a HomieDeviceConsumer in ready state with given description.""" - consumer = HomieDeviceConsumer(SERIAL) + consumer = HomieDeviceConsumer(SERIAL, panel_size=32) consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) nodes = description_nodes or _full_description() consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) @@ -126,16 +126,16 @@ def test_frozen(self): class TestHomieConsumerState: def test_not_ready_initially(self): - consumer = HomieDeviceConsumer(SERIAL) + consumer = HomieDeviceConsumer(SERIAL, panel_size=32) assert not consumer.is_ready() def test_not_ready_state_only(self): - consumer = HomieDeviceConsumer(SERIAL) + consumer = HomieDeviceConsumer(SERIAL, panel_size=32) consumer.handle_message(f"{PREFIX}/$state", "ready") assert not consumer.is_ready() def test_not_ready_description_only(self): - consumer = HomieDeviceConsumer(SERIAL) + consumer = HomieDeviceConsumer(SERIAL, panel_size=32) consumer.handle_message(f"{PREFIX}/$description", _make_description(_core_description())) assert not consumer.is_ready() @@ -144,7 +144,7 @@ def test_ready_when_both(self): assert consumer.is_ready() def test_ignores_other_serial(self): - consumer = HomieDeviceConsumer(SERIAL) + consumer = HomieDeviceConsumer(SERIAL, panel_size=32) consumer.handle_message(f"{TOPIC_PREFIX}/other-serial/$state", "ready") assert not consumer.is_ready() @@ -766,16 +766,46 @@ def test_no_lugs_current(self): class TestHomiePanelSize: - def test_panel_size_parsed(self): - consumer = _build_ready_consumer() - consumer.handle_message(f"{PREFIX}/core/panel-size", "32") + def test_panel_size_from_constructor(self): + """panel_size in snapshot comes from constructor argument.""" + consumer = HomieDeviceConsumer(SERIAL, panel_size=32) + consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + consumer.handle_message(f"{PREFIX}/$description", _make_description(_core_description())) snapshot = consumer.build_snapshot() assert snapshot.panel_size == 32 - def test_panel_size_none_when_missing(self): - consumer = _build_ready_consumer() + def test_panel_size_40(self): + """Different panel sizes are propagated correctly.""" + consumer = HomieDeviceConsumer(SERIAL, panel_size=40) + consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + consumer.handle_message(f"{PREFIX}/$description", _make_description(_core_description())) + snapshot = consumer.build_snapshot() + assert snapshot.panel_size == 40 + + def test_unmapped_tabs_use_panel_size(self): + """Unmapped tabs fill up to panel_size, not highest occupied tab.""" + nodes = { + "core": {"type": TYPE_CORE}, + "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, + } + consumer = HomieDeviceConsumer(SERIAL, panel_size=8) + consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) + # Circuit at space 2 only — tabs 3-8 should be unmapped + consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "2") + consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "false") + snapshot = consumer.build_snapshot() - assert snapshot.panel_size is None + unmapped_ids = sorted(cid for cid in snapshot.circuits if cid.startswith("unmapped_tab_")) + assert unmapped_ids == [ + "unmapped_tab_1", + "unmapped_tab_3", + "unmapped_tab_4", + "unmapped_tab_5", + "unmapped_tab_6", + "unmapped_tab_7", + "unmapped_tab_8", + ] # --------------------------------------------------------------------------- @@ -933,6 +963,7 @@ async def test_set_dominant_power_source_publishes(self): config = MqttClientConfig(broker_host="h", username="u", password="p") client = SpanMqttClient(host="192.168.1.1", serial_number=SERIAL, broker_config=config) + client._homie = HomieDeviceConsumer(SERIAL, panel_size=32) # Populate the homie description so core node is known desc = _make_description(_core_description()) @@ -957,6 +988,7 @@ async def test_set_dominant_power_source_no_core_node_raises(self): config = MqttClientConfig(broker_host="h", username="u", password="p") client = SpanMqttClient(host="192.168.1.1", serial_number=SERIAL, broker_config=config) + client._homie = HomieDeviceConsumer(SERIAL, panel_size=32) # No description loaded — core node not found with pytest.raises(SpanPanelServerError, match="Core node not found"): @@ -975,6 +1007,7 @@ async def test_get_snapshot_returns_homie_state(self): config = MqttClientConfig(broker_host="h", username="u", password="p") client = SpanMqttClient(host="192.168.1.1", serial_number=SERIAL, broker_config=config) + client._homie = HomieDeviceConsumer(SERIAL, panel_size=32) # Manually ready the homie consumer client._homie.handle_message(f"{PREFIX}/$state", "ready") @@ -1003,6 +1036,7 @@ async def test_ping_true_when_connected_and_ready(self): mock_bridge = MagicMock() mock_bridge.is_connected.return_value = True client._bridge = mock_bridge + client._homie = HomieDeviceConsumer(SERIAL, panel_size=32) client._homie.handle_message(f"{PREFIX}/$state", "ready") client._homie.handle_message(f"{PREFIX}/$description", _make_description(_core_description())) @@ -1125,7 +1159,7 @@ def test_version_beta(self): class TestHomieEdgeCases: def test_invalid_description_json(self): - consumer = HomieDeviceConsumer(SERIAL) + consumer = HomieDeviceConsumer(SERIAL, panel_size=32) consumer.handle_message(f"{PREFIX}/$state", "ready") consumer.handle_message(f"{PREFIX}/$description", "not-json{{{") assert not consumer.is_ready() @@ -1151,7 +1185,8 @@ def test_multiple_circuits(self): consumer.handle_message(f"{PREFIX}/bbbbbbbb-5555-6666-7777-888888888888/name", "Circuit B") snapshot = consumer.build_snapshot() - assert len(snapshot.circuits) == 2 + real_circuits = {k: v for k, v in snapshot.circuits.items() if not k.startswith("unmapped_tab_")} + assert len(real_circuits) == 2 assert snapshot.circuits["aaaaaaaa11112222333344444444444" + "4"].name == "Circuit A" assert snapshot.circuits["bbbbbbbb55556666777788888888888" + "8"].name == "Circuit B" @@ -1169,7 +1204,11 @@ def test_current_and_breaker_none_when_empty(self): class TestUnmappedTabSynthesis: - """Tests for _build_unmapped_tabs and dipole tab derivation.""" + """Tests for _build_unmapped_tabs and dipole tab derivation. + + All tests use panel_size=32 (from _build_ready_consumer) unless a + smaller panel is constructed explicitly. + """ def test_single_pole_tabs(self): """Single-pole circuit gets tabs = [space].""" @@ -1217,13 +1256,16 @@ def test_dipole_even_side(self): assert circuit.tabs == [30, 32] def test_unmapped_tabs_generated(self): - """Unmapped positions should produce synthetic circuit entries.""" + """Unmapped positions fill up to panel_size (not highest tab).""" nodes = { "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, "bbbbbbbb-5555-6666-7777-888888888888": {"type": TYPE_CIRCUIT}, } - consumer = _build_ready_consumer(nodes) + # Use panel_size=6 so the test is tractable + consumer = HomieDeviceConsumer(SERIAL, panel_size=6) + consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) # Circuit A at space 1 (single-pole) consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "1") consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "false") @@ -1233,8 +1275,7 @@ def test_unmapped_tabs_generated(self): snapshot = consumer.build_snapshot() - # Highest occupied tab is 5 (odd), panel_size rounds to 6 - # Occupied: {1, 3, 5}, unmapped: {2, 4, 6} + # panel_size=6, occupied: {1, 3, 5}, unmapped: {2, 4, 6} assert "unmapped_tab_2" in snapshot.circuits assert "unmapped_tab_4" in snapshot.circuits assert "unmapped_tab_6" in snapshot.circuits @@ -1249,7 +1290,9 @@ def test_unmapped_tab_properties(self): "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, } - consumer = _build_ready_consumer(nodes) + consumer = HomieDeviceConsumer(SERIAL, panel_size=4) + consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "1") consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "false") @@ -1270,7 +1313,6 @@ def test_unmapped_tab_properties(self): def test_fully_occupied_panel_no_unmapped(self): """When all positions are occupied, no unmapped tabs are generated.""" - # Create 4 circuits occupying spaces 1, 2, 3, 4 nodes = { "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, @@ -1278,7 +1320,9 @@ def test_fully_occupied_panel_no_unmapped(self): "cccccccc-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, "dddddddd-5555-6666-7777-888888888888": {"type": TYPE_CIRCUIT}, } - consumer = _build_ready_consumer(nodes) + consumer = HomieDeviceConsumer(SERIAL, panel_size=4) + consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) for i, node in enumerate( [ "aaaaaaaa-1111-2222-3333-444444444444", @@ -1295,70 +1339,63 @@ def test_fully_occupied_panel_no_unmapped(self): unmapped_ids = [cid for cid in snapshot.circuits if cid.startswith("unmapped_tab_")] assert unmapped_ids == [] - def test_no_circuits_no_unmapped(self): - """When no circuits exist, no unmapped tabs are generated.""" + def test_no_circuits_all_unmapped(self): + """When no circuits exist, all positions up to panel_size are unmapped.""" nodes = {"core": {"type": TYPE_CORE}} - consumer = _build_ready_consumer(nodes) + consumer = HomieDeviceConsumer(SERIAL, panel_size=4) + consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) snapshot = consumer.build_snapshot() - unmapped_ids = [cid for cid in snapshot.circuits if cid.startswith("unmapped_tab_")] - assert unmapped_ids == [] - - def test_no_space_property_no_unmapped(self): - """Circuits without space property don't contribute to tab tracking.""" - nodes = { - "core": {"type": TYPE_CORE}, - "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, - } - consumer = _build_ready_consumer(nodes) - # Don't set space property - snapshot = consumer.build_snapshot() - unmapped_ids = [cid for cid in snapshot.circuits if cid.startswith("unmapped_tab_")] - assert unmapped_ids == [] + unmapped_ids = sorted(cid for cid in snapshot.circuits if cid.startswith("unmapped_tab_")) + assert unmapped_ids == [ + "unmapped_tab_1", + "unmapped_tab_2", + "unmapped_tab_3", + "unmapped_tab_4", + ] - def test_panel_size_rounds_to_even(self): - """Panel size rounds up to even when highest tab is odd.""" + def test_no_space_property_all_unmapped(self): + """Circuits without space property don't occupy any tabs.""" nodes = { "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, } - consumer = _build_ready_consumer(nodes) - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "7") - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "false") - + consumer = HomieDeviceConsumer(SERIAL, panel_size=4) + consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) + # Don't set space property — circuit has no tabs snapshot = consumer.build_snapshot() - # Highest tab=7 (odd) → panel_size=8 - # Occupied: {7}, unmapped: {1,2,3,4,5,6,8} unmapped_ids = sorted(cid for cid in snapshot.circuits if cid.startswith("unmapped_tab_")) assert unmapped_ids == [ "unmapped_tab_1", "unmapped_tab_2", "unmapped_tab_3", "unmapped_tab_4", - "unmapped_tab_5", - "unmapped_tab_6", - "unmapped_tab_8", ] - def test_panel_size_exact_when_even(self): - """Panel size stays as-is when highest tab is even.""" + def test_unmapped_fills_to_panel_size(self): + """Unmapped tabs fill up to panel_size even if circuit is at low tab.""" nodes = { "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, } - consumer = _build_ready_consumer(nodes) - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "6") + consumer = HomieDeviceConsumer(SERIAL, panel_size=8) + consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) + consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "2") consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "false") snapshot = consumer.build_snapshot() - # Highest tab=6 (even) → panel_size=6 - # Occupied: {6}, unmapped: {1,2,3,4,5} + # Occupied: {2}, unmapped: {1,3,4,5,6,7,8} unmapped_ids = sorted(cid for cid in snapshot.circuits if cid.startswith("unmapped_tab_")) assert unmapped_ids == [ "unmapped_tab_1", - "unmapped_tab_2", "unmapped_tab_3", "unmapped_tab_4", "unmapped_tab_5", + "unmapped_tab_6", + "unmapped_tab_7", + "unmapped_tab_8", ] def test_dipole_occupies_correct_tabs_in_unmapped_calc(self): @@ -1367,14 +1404,15 @@ def test_dipole_occupies_correct_tabs_in_unmapped_calc(self): "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, } - consumer = _build_ready_consumer(nodes) + consumer = HomieDeviceConsumer(SERIAL, panel_size=4) + consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) # Dipole at space 1 → occupies 1 and 3 consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "1") consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "true") snapshot = consumer.build_snapshot() - # Highest tab = 3 (odd) → panel_size = 4 - # Occupied: {1, 3}, unmapped: {2, 4} + # panel_size=4, occupied: {1, 3}, unmapped: {2, 4} assert "unmapped_tab_1" not in snapshot.circuits assert "unmapped_tab_2" in snapshot.circuits assert "unmapped_tab_3" not in snapshot.circuits diff --git a/tests/test_protocol_models.py b/tests/test_protocol_models.py index a16fbc5..7508aef 100644 --- a/tests/test_protocol_models.py +++ b/tests/test_protocol_models.py @@ -120,6 +120,7 @@ def _make_panel_snapshot(**overrides) -> SpanPanelSnapshot: "eth0_link": True, "wlan_link": True, "wwan_link": False, + "panel_size": 32, } defaults.update(overrides) return SpanPanelSnapshot(**defaults)