diff --git a/meta/examples/subscriber_registration/all_in_one_function.py b/meta/examples/subscriber_registration/all_in_one_function.py new file mode 100644 index 0000000..1ae4fb6 --- /dev/null +++ b/meta/examples/subscriber_registration/all_in_one_function.py @@ -0,0 +1,31 @@ +async def add_company_projector( + event_store, projection_store, event_broker, service_manager +): + projection_subscriber = make_subscriber( + subscriber_group="search-company-projection", + subscription_request=CategoryIdentifier(category="company"), + subscriber_state_category=event_store.category( + category="search-company-projection-state" + ), + subscriber_state_persistence_interval=EventCount(10000), + event_processor=CompanySearchEventProcessor( + projector=CompanySearchProjector(), + projection_store=projection_store, + ), + ) + + error_handling_service = ErrorHandlingService( + callable=projection_subscriber.consume_all, + error_handler=ContinueErrorHandler(), + ) + + projection_service = PollingService( + callable=error_handling_service.execute, + poll_interval=timedelta(seconds=1), + ) + + await event_broker.register(projection_subscriber) + service_manager.register( + projection_service, + execution_mode=ExecutionMode.BACKGROUND, + ) diff --git a/meta/examples/subscriber_registration/consumer_registrar.py b/meta/examples/subscriber_registration/consumer_registrar.py new file mode 100644 index 0000000..c93fbec --- /dev/null +++ b/meta/examples/subscriber_registration/consumer_registrar.py @@ -0,0 +1,82 @@ +class ConsumerRegistrar(ABC): + subscriber_group: ClassVar[LiteralString] + subscriber_state_category: ClassVar[LiteralString] + category: ClassVar[CategoryDefinition[Any, Any]] + + def __repr__(self): + return ( + f"{self.__class__.__name__}(" + f"subscriber_group={self.subscriber_group}, " + f"category={self.category}, " + f"event_processor={repr(self.event_processor)}" + ")" + ) + + def __init__( + self, + event_store: EventStore, + event_processor: EventProcessor, + ): + self._event_store = event_store + self._event_processor = event_processor + + self._state_store = EventConsumerStateStore( + category=event_store.category(category=self.subscriber_group), + ) + + @property + def event_processor(self) -> EventProcessor: + return self._event_processor + + def _consumer_for_source(self, source: EventSource) -> EventSourceConsumer: + return EventSourceConsumer( + source=source, + processor=self._event_processor, + state_store=self._state_store, + ) + + async def consume_all(self) -> None: + source = self._event_store.category(category=self.category.name) + consumer = self._consumer_for_source(source) + return await consumer.consume_all() + + async def register_as_service( + self, event_broker: EventBroker, service_manager: ServiceManager + ) -> None: + subscriber = EventSubscriptionConsumer( + group=self.subscriber_group, + id=str(uuid4()), + subscription_requests=[ + CategoryIdentifier(category=self.category.name) + ], + delegate_factory=self._consumer_for_source, + ) + + await event_broker.register(subscriber) + + service = PollingService( + callable=subscriber.consume_all, + poll_interval=timedelta(seconds=1), + ) + + service_manager.register( + service, + execution_mode=ExecutionMode.BACKGROUND, + ) + + +async def register_tasks( + service_manager: ServiceManager, + event_broker: EventBroker, + registrars: Iterable[ConsumerRegistrar], +) -> None: + async with asyncio.TaskGroup() as tg: + service_manager.register(event_broker) + logger.info("Registering consumers", registrars=registrars) + for registrar in registrars: + tg.create_task( + registrar.register_as_service( + service_manager=service_manager, + event_broker=event_broker, + ) + ) diff --git a/meta/examples/subscriber_registration/register_many_function.py b/meta/examples/subscriber_registration/register_many_function.py new file mode 100644 index 0000000..831570e --- /dev/null +++ b/meta/examples/subscriber_registration/register_many_function.py @@ -0,0 +1,52 @@ +async def register_subscribers( + event_broker: EventBroker, + service_manager: ServiceManager, + consumers: List[EventSubscriptionConsumer], +) -> None: + for consumer in consumers: + await event_broker.register(consumer) + + error_handling_service = LoggingErrorHandlingService( + callable=consumer.consume_all, + error_handler=RaiseErrorHandler(), + ) + polling_service = PollingService( + callable=error_handling_service.execute, + poll_interval=timedelta(milliseconds=100), + ) + + service_manager.register( + polling_service, execution_mode=ExecutionMode.BACKGROUND + ) + + +async def init_event_services( + db_type: DBType, + db_settings: DBSettings, + connection_pool: AsyncConnectionPool[AsyncConnection], + db: DB, + publisher: EventPublisher, + clock: Clock, + metrics: Metrics +) -> ServiceManager: + service_manager = ServiceManager() # type: ignore[no-untyped-call] + event_broker = _make_event_broker( + db_type, db_settings, connection_pool, db.event_store_adapter + ) + service_manager.register(event_broker) + event_store = db.event_store + subscription_consumers = [ + *approval_subscribers.make_subscribers( + event_store, db, clock, metrics + ), + *payment_subscribers.make_subscribers( + event_store, publisher, db, clock + ), + *settings_subscribers.make_subscribers( + event_store, publisher, db, metrics, + ) + ] + await register_subscribers( + event_broker, service_manager, subscription_consumers + ) + return service_manager diff --git a/meta/examples/subscriber_registration/register_one_function.py b/meta/examples/subscriber_registration/register_one_function.py new file mode 100644 index 0000000..8e840dd --- /dev/null +++ b/meta/examples/subscriber_registration/register_one_function.py @@ -0,0 +1,57 @@ +async def register_subscriber( + subscriber_group: str, + subscription_request: EventSourceIdentifier, + subscriber_state_category_name: SubscriberStateCategory, + processor: SupportedProcessors, + event_broker: EventBroker, + event_store: EventStore, + service_manager: ServiceManager, + retryable_exceptions: list[Type[BaseException]] = (), +): + subscriber_id = str(uuid4()) + subscriber_state_category = event_store.category( + category=subscriber_state_category_name + ) + + state_store = EventConsumerStateStore( + category=subscriber_state_category, + converter=StoredEventEventConsumerStateConverter(), + persistence_interval=EventCount(1), + ) + + def delegate_factory[I: EventSourceIdentifier]( + source: EventSource[I, StoredEvent], + ) -> EventSourceConsumer[I, StoredEvent]: + return EventSourceConsumer( + source=source, + processor=processor, + state_store=state_store, + ) + + subscriber = EventSubscriptionConsumer( + group=subscriber_group, + id=subscriber_id, + subscription_requests=[subscription_request], + delegate_factory=delegate_factory, + ) + + await event_broker.register(subscriber) + + error_handling_service = ErrorHandlingService( + callable=subscriber.consume_all, + error_handler=TypeMappingErrorHandler( + type_mappings=error_handler_type_mappings( + continue_execution=retryable_exceptions + ) + ), + ) + + polling_service = PollingService( + callable=error_handling_service.execute, + poll_interval=timedelta(seconds=1), + ) + + service_manager.register( + polling_service, + execution_mode=ExecutionMode.BACKGROUND, + ) diff --git a/meta/examples/subscriber_registration/subsytem_creator.py b/meta/examples/subscriber_registration/subsytem_creator.py new file mode 100644 index 0000000..f64a952 --- /dev/null +++ b/meta/examples/subscriber_registration/subsytem_creator.py @@ -0,0 +1,138 @@ +class ProviderService[EntityRecord: RecordBaseModel]: + def __init__( + self, + event_store: EventStore, + projection_store: ProjectionStore, + loader: Loader[EntityRecord], + record_type: type[EntityRecord], + constraints: Sequence[Constraint], + ): + self._event_store = event_store + self._projection_store = projection_store + self._loader = loader + self._constraints = constraints + self._record_type = record_type + + async def _make_projection_service( + self, + config: ProjectionConfig, + event_broker: EventBroker, + service_manager: ServiceManager, + enabled: bool, + event_processor: EventProcessor, + ): + projection_subscriber = make_subscriber( + subscriber_group=config.subscriber_config.subscriber_group, + subscription_request=CategoryIdentifier( + category=config.subscriber_config.subscription_request_category + ), + subscriber_state_category=self._event_store.category( + category=config.subscriber_config.subscriber_state_category + ), + subscriber_state_persistence_interval=EventCount(10000), + event_processor=event_processor, + ) + + error_handling_service = ErrorHandlingService( + callable=projection_subscriber.consume_all, + error_handler=ContinueErrorHandler(), + ) + + projection_service = PollingService( + callable=error_handling_service.execute, + poll_interval=timedelta(seconds=1), + ) + + if enabled: + await event_broker.register(projection_subscriber) + service_manager.register( + projection_service, + execution_mode=ExecutionMode.BACKGROUND, + ) + + def _make_event_processor(self, config: IngestionConfig): + return ProjectionEventProcessor[RecordLog | None]( + projector=RecordLogProjector[EntityRecord]( + projection_name=config.projection_name, + payload_type=self._record_type, + ), + projection_store=self._projection_store, + state_type=RecordLog, + ) + + def _make_ingestion_service( + self, + config: IngestionConfig, + service_manager: ServiceManager, + enabled: bool, + ): + record_log_cache = RecordLogCache( + event_store=self._event_store, + projection_store=self._projection_store, + projection_name=config.projection_name, + record_category_name=config.record_category_name, + projection_consumer_state_category_name=( + config.projection_consumer_state_category_name + ), + ) + ingestion_service = IngestionService( + event_store=self._event_store, + projection_store=self._projection_store, + loader=self._loader, + record_log_cache=record_log_cache, + constraints=self._constraints, + projection_name=config.projection_name, + event_name=config.event_name, + category_name=config.record_category_name, + ) + + error_handling_ingestion_service = ErrorHandlingService( + callable=ingestion_service.execute, + error_handler=RetryErrorHandler(), + ) + + if enabled: + service_manager.register( + error_handling_ingestion_service, + execution_mode=ExecutionMode.BACKGROUND, + ) + + async def make( + self, + event_broker: EventBroker, + service_manager: ServiceManager, + config: ProviderConfig, + ): + if config.ingestion_config is not None: + self._make_ingestion_service( + config=config.ingestion_config, + service_manager=service_manager, + enabled=config.ingestion_config.enabled, + ) + + if config.record_log_config is not None: + await self._make_projection_service( + config=config.record_log_config, + event_broker=event_broker, + service_manager=service_manager, + event_processor=config.record_log_config.event_processor, + enabled=config.record_log_config.enabled, + ) + + if config.changeset_config is not None: + await self._make_projection_service( + config=config.changeset_config, + event_broker=event_broker, + service_manager=service_manager, + event_processor=config.changeset_config.event_processor, + enabled=config.changeset_config.enabled, + ) + + if config.entity_config is not None: + await self._make_projection_service( + config=config.entity_config, + event_broker=event_broker, + service_manager=service_manager, + event_processor=config.entity_config.event_processor, + enabled=config.entity_config.enabled, + ) diff --git a/meta/plans/2025-12-02-broker-role-support.md b/meta/plans/2025-12-02-broker-role-support.md new file mode 100644 index 0000000..1c802d2 --- /dev/null +++ b/meta/plans/2025-12-02-broker-role-support.md @@ -0,0 +1,517 @@ +# BrokerRole Support for DistributedEventBroker + +## Overview + +Add `BrokerRole` support to `DistributedEventBroker` to enable coordinator-only, +observer-only, or full participation modes. This allows deploying nodes that +only coordinate work allocation (without local processing) or only observe and +process (without coordination). + +## Current State Analysis + +### Problem + +The `DistributedEventBroker` currently always runs all three components +(coordinator, observer, subscriber manager) together in `_do_execute()` at +`broker/strategies/distributed/broker.py:44-57`. There's no way to: + +- Run a coordinator-only node for work allocation without local processing +- Run an observer-only node for processing without coordination overhead + +### Key Discoveries + +- `_do_execute()` unconditionally creates tasks for all three components +- The `asyncio.TaskGroup` pattern requires all tasks to be created together +- Status property aggregates coordinator and observer statuses +- Builder and factories don't support mode configuration + +## Desired End State + +`DistributedEventBroker` supports three roles: + +```python +class BrokerRole(StrEnum): + COORDINATOR = "coordinator" # Work allocation only, no local subscribers + OBSERVER = "observer" # Local processing only, no coordination + FULL = "full" # Both coordination and observation +``` + +Usage: + +```python +# Coordinator-only node +coordinator_broker = make_event_broker( + node_id=node_id, + broker_type=EventBrokerType.Distributed, + storage_type=EventBrokerStorageType.Postgres, + settings=DistributedEventBrokerSettings(role=BrokerRole.COORDINATOR), + ... +) + +# Observer-only node +observer_broker = make_event_broker( + node_id=node_id, + broker_type=EventBrokerType.Distributed, + storage_type=EventBrokerStorageType.Postgres, + settings=DistributedEventBrokerSettings(role=BrokerRole.OBSERVER), + ... +) + +# Full participation (default, current behaviour) +full_broker = make_event_broker( + node_id=node_id, + broker_type=EventBrokerType.Distributed, + storage_type=EventBrokerStorageType.Postgres, + settings=DistributedEventBrokerSettings(role=BrokerRole.FULL), + ... +) +``` + +### Verification + +- Unit tests verify each `BrokerRole` runs correct components +- Unit tests verify `register()` raises for coordinator-only brokers +- Integration tests verify coordinator-only and observer-only nodes work together + +## What We're NOT Doing + +- Changes to `SingletonEventBroker` (no coordinator/observer concept) +- Adding mode property to `EventBroker` base interface +- Dynamic role switching at runtime + +## Implementation Approach + +Follow TDD: write failing tests first, then implement minimum code to pass, +then refactor. + +--- + +## Changes Required + +### 1. Add BrokerRole Enum + +**File**: `src/logicblocks/event/processing/broker/types.py` + +Add to existing file or create new: + +```python +from enum import StrEnum + + +class BrokerRole(StrEnum): + COORDINATOR = "coordinator" + OBSERVER = "observer" + FULL = "full" +``` + +### 2. Update DistributedEventBroker + +**File**: `src/logicblocks/event/processing/broker/strategies/distributed/broker.py` + +**Changes**: + +- Add `role: BrokerRole` parameter to constructor (default `BrokerRole.FULL`) +- Store role as instance variable +- Add `role` property +- Modify `register()` to raise if role is `COORDINATOR` +- Modify `status` property to handle single-component roles +- Modify `_do_execute()` to conditionally create tasks based on role + +```python +class DistributedEventBroker[E: Event]( + EventBroker[E], ErrorHandlingServiceMixin[NoneType] +): + def __init__( + self, + event_subscriber_manager: EventSubscriberManager[E], + event_subscription_coordinator: EventSubscriptionCoordinator, + event_subscription_observer: EventSubscriptionObserver[E], + role: BrokerRole = BrokerRole.FULL, + error_handler: ErrorHandler[NoneType] = RetryErrorHandler(), + ): + super().__init__(error_handler) + self._event_subscriber_manager = event_subscriber_manager + self._event_subscription_coordinator = event_subscription_coordinator + self._event_subscription_observer = event_subscription_observer + self._role = role + + @property + def role(self) -> BrokerRole: + return self._role + + async def register(self, subscriber: EventSubscriber[E]) -> None: + if self._role == BrokerRole.COORDINATOR: + raise RuntimeError( + "Cannot register subscribers on a coordinator-only broker" + ) + await self._event_subscriber_manager.add(subscriber) + + @property + def status(self) -> ProcessStatus: + match self._role: + case BrokerRole.COORDINATOR: + return self._event_subscription_coordinator.status + case BrokerRole.OBSERVER: + return self._event_subscription_observer.status + case BrokerRole.FULL: + return determine_multi_process_status( + self._event_subscription_coordinator.status, + self._event_subscription_observer.status, + ) + + async def _do_execute(self) -> None: + subscriber_manager = self._event_subscriber_manager + coordinator = self._event_subscription_coordinator + observer = self._event_subscription_observer + + try: + await subscriber_manager.start() + + async with asyncio.TaskGroup() as tg: + tg.create_task(subscriber_manager.maintain()) + + if self._role in (BrokerRole.COORDINATOR, BrokerRole.FULL): + tg.create_task(coordinator.coordinate()) + + if self._role in (BrokerRole.OBSERVER, BrokerRole.FULL): + tg.create_task(observer.observe()) + finally: + await subscriber_manager.stop() +``` + +### 3. Update Builder + +**File**: `src/logicblocks/event/processing/broker/strategies/distributed/builder.py` + +**Changes**: + +- Add `role: BrokerRole` to `DistributedEventBrokerSettings` with default `FULL` +- Pass role through to broker construction in `build()` + +```python +@dataclass(frozen=True) +class DistributedEventBrokerSettings: + role: BrokerRole = BrokerRole.FULL + subscriber_manager_heartbeat_interval: timedelta = timedelta(seconds=10) + subscriber_manager_purge_interval: timedelta = timedelta(minutes=1) + subscriber_manager_subscriber_max_age: timedelta = timedelta(minutes=10) + coordinator_subscriber_max_time_since_last_seen: timedelta = timedelta( + seconds=60 + ) + coordinator_distribution_interval: timedelta = timedelta(seconds=20) + coordinator_leadership_max_duration: timedelta = timedelta(minutes=15) + coordinator_leadership_attempt_interval: timedelta = timedelta(seconds=5) + observer_synchronisation_interval: timedelta = timedelta(seconds=20) +``` + +Update `build()` method to pass `settings.role` to `DistributedEventBroker`. + +### 4. Update Factories + +**File**: `src/logicblocks/event/processing/broker/strategies/distributed/factories.py` + +**Changes**: + +- Ensure `role` flows through `make_in_memory_distributed_event_broker()` and + `make_postgres_distributed_event_broker()` via settings + +### 5. Export New Types + +**File**: `src/logicblocks/event/processing/__init__.py` + +**Changes**: + +- Export `BrokerRole` from public API + +--- + +## Testing Strategy + +### Unit Tests + +**File**: `tests/unit/logicblocks/event/processing/brokers/strategies/distributed/test_broker.py` + +**TDD Approach**: Write each test first, verify it fails, then implement. + +**New Test Class**: + +```python +class TestDistributedEventBrokerRoles: + async def test_full_role_runs_coordinator_and_observer(self): + """Verify both coordinator.coordinate() and observer.observe() are called.""" + coordinator = MockEventSubscriptionCoordinator() + observer = MockEventSubscriptionObserver() + manager = MockEventSubscriberManager() + + broker = DistributedEventBroker( + event_subscriber_manager=manager, + event_subscription_coordinator=coordinator, + event_subscription_observer=observer, + role=BrokerRole.FULL, + ) + + task = asyncio.create_task(broker.execute()) + + async with task_shutdown(task): + await assert_start_count_eventually(coordinator, 1) + await assert_start_count_eventually(observer, 1) + + async def test_coordinator_role_runs_only_coordinator(self): + """Verify only coordinator.coordinate() called, not observer.observe().""" + coordinator = MockEventSubscriptionCoordinator() + observer = MockEventSubscriptionObserver() + manager = MockEventSubscriberManager() + + broker = DistributedEventBroker( + event_subscriber_manager=manager, + event_subscription_coordinator=coordinator, + event_subscription_observer=observer, + role=BrokerRole.COORDINATOR, + ) + + task = asyncio.create_task(broker.execute()) + + async with task_shutdown(task): + await assert_start_count_eventually(coordinator, 1) + # Give time for observer to start if it was going to + await asyncio.sleep(0.1) + assert observer.start_count == 0 + + async def test_observer_role_runs_only_observer(self): + """Verify only observer.observe() called, not coordinator.coordinate().""" + coordinator = MockEventSubscriptionCoordinator() + observer = MockEventSubscriptionObserver() + manager = MockEventSubscriberManager() + + broker = DistributedEventBroker( + event_subscriber_manager=manager, + event_subscription_coordinator=coordinator, + event_subscription_observer=observer, + role=BrokerRole.OBSERVER, + ) + + task = asyncio.create_task(broker.execute()) + + async with task_shutdown(task): + await assert_start_count_eventually(observer, 1) + # Give time for coordinator to start if it was going to + await asyncio.sleep(0.1) + assert coordinator.start_count == 0 + + async def test_coordinator_role_rejects_subscriber_registration(self): + """Verify register() raises RuntimeError for coordinator-only broker.""" + broker = DistributedEventBroker( + event_subscriber_manager=Mock(spec=DefaultEventSubscriberManager), + event_subscription_coordinator=Mock(spec=EventSubscriptionCoordinator), + event_subscription_observer=Mock(spec=EventSubscriptionObserver), + role=BrokerRole.COORDINATOR, + ) + + subscriber = DummyEventSubscriber() + + with pytest.raises(RuntimeError, match="coordinator-only"): + await broker.register(subscriber) + + async def test_observer_role_accepts_subscriber_registration(self): + """Verify register() succeeds for observer-only broker.""" + manager = Mock(spec=DefaultEventSubscriberManager) + broker = DistributedEventBroker( + event_subscriber_manager=manager, + event_subscription_coordinator=Mock(spec=EventSubscriptionCoordinator), + event_subscription_observer=Mock(spec=EventSubscriptionObserver), + role=BrokerRole.OBSERVER, + ) + + subscriber = DummyEventSubscriber() + await broker.register(subscriber) + + manager.add.assert_called_once_with(subscriber) + + async def test_full_role_accepts_subscriber_registration(self): + """Verify register() succeeds for full broker.""" + manager = Mock(spec=DefaultEventSubscriberManager) + broker = DistributedEventBroker( + event_subscriber_manager=manager, + event_subscription_coordinator=Mock(spec=EventSubscriptionCoordinator), + event_subscription_observer=Mock(spec=EventSubscriptionObserver), + role=BrokerRole.FULL, + ) + + subscriber = DummyEventSubscriber() + await broker.register(subscriber) + + manager.add.assert_called_once_with(subscriber) + + async def test_status_reflects_coordinator_only_when_coordinator_role(self): + """Verify status property returns only coordinator status.""" + coordinator = Mock(spec=EventSubscriptionCoordinator) + coordinator.status = ProcessStatus.RUNNING + + broker = DistributedEventBroker( + event_subscriber_manager=Mock(spec=DefaultEventSubscriberManager), + event_subscription_coordinator=coordinator, + event_subscription_observer=Mock(spec=EventSubscriptionObserver), + role=BrokerRole.COORDINATOR, + ) + + assert broker.status == ProcessStatus.RUNNING + + async def test_status_reflects_observer_only_when_observer_role(self): + """Verify status property returns only observer status.""" + observer = Mock(spec=EventSubscriptionObserver) + observer.status = ProcessStatus.RUNNING + + broker = DistributedEventBroker( + event_subscriber_manager=Mock(spec=DefaultEventSubscriberManager), + event_subscription_coordinator=Mock(spec=EventSubscriptionCoordinator), + event_subscription_observer=observer, + role=BrokerRole.OBSERVER, + ) + + assert broker.status == ProcessStatus.RUNNING + + def test_role_property_returns_configured_role(self): + """Verify role property returns the configured role.""" + broker = DistributedEventBroker( + event_subscriber_manager=Mock(spec=DefaultEventSubscriberManager), + event_subscription_coordinator=Mock(spec=EventSubscriptionCoordinator), + event_subscription_observer=Mock(spec=EventSubscriptionObserver), + role=BrokerRole.OBSERVER, + ) + + assert broker.role == BrokerRole.OBSERVER + + def test_default_role_is_full(self): + """Verify default role is FULL for backwards compatibility.""" + broker = DistributedEventBroker( + event_subscriber_manager=Mock(spec=DefaultEventSubscriberManager), + event_subscription_coordinator=Mock(spec=EventSubscriptionCoordinator), + event_subscription_observer=Mock(spec=EventSubscriptionObserver), + ) + + assert broker.role == BrokerRole.FULL +``` + +### Integration Tests + +**File**: `tests/integration/logicblocks/event/processing/broker/strategies/distributed/test_broker.py` + +**New Test Class**: + +```python +class TestDistributedEventBrokerRoleIntegration: + @pytest_asyncio.fixture(autouse=True) + async def reinitialise_storage(self, open_connection_pool): + await drop_table(open_connection_pool, "events") + await drop_table(open_connection_pool, "subscribers") + await drop_table(open_connection_pool, "subscriptions") + await create_table(open_connection_pool, "events") + await create_table(open_connection_pool, "subscribers") + await create_table(open_connection_pool, "subscriptions") + + async def test_coordinator_and_observer_nodes_work_together( + self, open_connection_pool + ): + """Verify coordinator allocates work to observer node.""" + event_processor = CapturingEventProcessor() + adapter = PostgresEventStorageAdapter(connection_source=open_connection_pool) + event_store = EventStore(adapter=adapter) + + category = random_category_name() + node_id_coordinator = random_node_id() + node_id_observer = random_node_id() + + # Create coordinator-only broker + coordinator_broker = make_postgres_distributed_event_broker( + node_id=node_id_coordinator, + connection_settings=connection_settings, + connection_pool=open_connection_pool, + adapter=adapter, + settings=DistributedEventBrokerSettings( + role=BrokerRole.COORDINATOR, + coordinator_distribution_interval=timedelta(milliseconds=200), + ), + ) + + # Create observer-only broker with subscriber + observer_broker = make_postgres_distributed_event_broker( + node_id=node_id_observer, + connection_settings=connection_settings, + connection_pool=open_connection_pool, + adapter=adapter, + settings=DistributedEventBrokerSettings( + role=BrokerRole.OBSERVER, + observer_synchronisation_interval=timedelta(milliseconds=200), + ), + ) + + subscriber = make_subscriber( + subscriber_group=f"test-group-{category}", + subscription_request=CategoryIdentifier(category=category), + subscriber_state_category=event_store.category( + category=f"test-state-{category}" + ), + event_processor=event_processor, + ... + ) + + await observer_broker.register(subscriber) + + # Publish event + await publish_event_to_category(event_store, category) + + # Start both brokers + coordinator_task = asyncio.create_task(coordinator_broker.execute()) + observer_task = asyncio.create_task(observer_broker.execute()) + + try: + # Wait for event to be processed + await asyncio.wait_for( + consume_until_event_count(event_processor, [subscriber], 1), + timeout=10.0, + ) + + assert len(event_processor.events) == 1 + finally: + coordinator_task.cancel() + observer_task.cancel() + await asyncio.gather( + coordinator_task, observer_task, return_exceptions=True + ) + + async def test_multiple_observer_nodes_with_single_coordinator( + self, open_connection_pool + ): + """Verify coordinator distributes work across multiple observers.""" + # Similar pattern with multiple observer nodes + pass +``` + +--- + +## Success Criteria + +### Automated Verification + +- [ ] All existing broker tests pass: `mise run test:unit` +- [ ] New role-specific unit tests pass +- [ ] Integration tests pass: `mise run test:integration` +- [ ] Type checking passes: `mise run type:check` +- [ ] Linting passes: `mise run lint:fix` +- [ ] Formatting passes: `mise run format:fix` + +### Manual Verification + +- [ ] Coordinator-only broker starts without error +- [ ] Observer-only broker starts without error +- [ ] Full broker behaves same as before (backwards compatible) +- [ ] Coordinator-only broker raises on `register()` call + +--- + +## References + +- Research document: `meta/research/2025-12-02-event-processing-node-abstraction.md` +- DistributedEventBroker: `src/logicblocks/event/processing/broker/strategies/distributed/broker.py:19-57` +- Builder: `src/logicblocks/event/processing/broker/strategies/distributed/builder.py` +- Existing broker tests: `tests/unit/logicblocks/event/processing/brokers/strategies/distributed/test_broker.py` diff --git a/meta/plans/2025-12-02-event-runtime-abstraction.md b/meta/plans/2025-12-02-event-runtime-abstraction.md new file mode 100644 index 0000000..828f04b --- /dev/null +++ b/meta/plans/2025-12-02-event-runtime-abstraction.md @@ -0,0 +1,892 @@ +# EventRuntime Abstraction + +## Overview + +Introduce an `EventRuntime` abstraction that sits over the event broker and +service manager, simplifying subscriber and consumer registration while +providing a clean lifecycle API. + +## Current State Analysis + +### Problem + +Registering subscribers currently requires 5+ steps across multiple abstractions: + +```python +# 1. Create subscriber +subscriber = make_subscriber( + subscriber_group="projections", + subscription_request=CategoryIdentifier(category="orders"), + subscriber_state_category=event_store.category(...), + event_processor=processor, + ... +) + +# 2. Wrap with error handling +error_service = ErrorHandlingService( + callable=subscriber.consume_all, + error_handler=ContinueErrorHandler(), +) + +# 3. Wrap with polling +polling_service = PollingService( + callable=error_service.execute, + poll_interval=timedelta(seconds=1), +) + +# 4. Register with broker +await event_broker.register(subscriber) + +# 5. Register with service manager +service_manager.register(polling_service, execution_mode=ExecutionMode.BACKGROUND) +``` + +Each consumer in the codebase solves this differently (see examples in +`meta/examples/subscriber_registration/`), leading to inconsistency and +boilerplate. + +### Key Discoveries + +- `EventSubscriber` interface is for work allocation (accept/withdraw sources) +- `EventConsumer` interface is for event consumption (consume_all) +- `EventSubscriptionConsumer` implements both interfaces +- Error handling and polling wrapping applies to consumption, not subscription +- `node_id` is injected into broker at construction time + +## Desired End State + +A simplified API for event processing setup: + +```python +@dataclass +class EventConsumerSettings: + error_handler: ErrorHandler = field( + default_factory=lambda: ContinueErrorHandler() + ) + poll_interval: timedelta = timedelta(seconds=1) + isolation_mode: IsolationMode = IsolationMode.MAIN_THREAD + + +# Create runtime with injected dependencies +runtime = EventRuntime( + node_id=node_id, + event_broker=event_broker, + service_manager=service_manager, + default_consumer_settings=EventConsumerSettings(...), # Optional defaults +) + +# Register subscription consumer (common case) - handles both broker +# registration and consumption wrapping +await runtime.register_subscription_consumer( + subscription_consumer, + settings=EventConsumerSettings(poll_interval=timedelta(milliseconds=100)), +) + +# Or register subscriber only (work allocation, no consumption wrapping) +await runtime.register_subscriber(subscriber) + +# Or register consumer only (consumption wrapping, no broker registration) +await runtime.register_consumer(consumer, settings=EventConsumerSettings(...)) + +# Start processing +await runtime.start() + +# ... running ... + +await runtime.stop() +``` + +### Verification + +- Unit tests verify registration methods delegate correctly +- Unit tests verify error handling and polling wrapping +- Unit tests verify lifecycle methods +- Integration tests verify end-to-end event processing +- Component tests verify full PostgreSQL-backed processing + +## What We're NOT Doing + +- Dynamic registration while running (future enhancement) +- Factory functions or builder pattern for `EventRuntime` (can add later) +- Health/status aggregation API (future enhancement) + +## Implementation Approach + +Follow TDD: write failing tests first, then implement minimum code to pass, +then refactor. + +--- + +## Changes Required + +### 1. Add EventConsumerSettings + +**File**: `src/logicblocks/event/processing/runtime/settings.py` (new file) + +```python +from dataclasses import dataclass, field +from datetime import timedelta + +from logicblocks.event.processing.services import IsolationMode +from logicblocks.event.processing.services.error import ( + ContinueErrorHandler, + ErrorHandler, +) + + +@dataclass(frozen=True) +class EventConsumerSettings: + """Settings for consumer error handling and polling behaviour. + + Attributes: + error_handler: Handler for exceptions during consumption. + Defaults to ContinueErrorHandler which logs and continues. + poll_interval: Interval between consume_all() calls. + Defaults to 1 second. + isolation_mode: Thread isolation for the polling service. + Defaults to MAIN_THREAD. + """ + + error_handler: ErrorHandler = field( + default_factory=lambda: ContinueErrorHandler() + ) + poll_interval: timedelta = timedelta(seconds=1) + isolation_mode: IsolationMode = IsolationMode.MAIN_THREAD +``` + +### 2. Add EventRuntime Class + +**File**: `src/logicblocks/event/processing/runtime/runtime.py` (new file) + +```python +from typing import Any + +from logicblocks.event.processing.broker import EventBroker +from logicblocks.event.processing.broker.types import EventSubscriber +from logicblocks.event.processing.consumers import EventConsumer +from logicblocks.event.processing.consumers.subscription import ( + EventSubscriptionConsumer, +) +from logicblocks.event.processing.services import ( + ErrorHandlingService, + ExecutionMode, + PollingService, + ServiceManager, +) + +from .settings import EventConsumerSettings + + +class EventRuntime: + """Runtime for event processing that simplifies registration and lifecycle. + + Encapsulates the event broker and service manager, providing a simplified + API for registering subscribers and consumers with appropriate error + handling and polling. + + Attributes: + node_id: Identifier for this deployed instance. + """ + + def __init__( + self, + node_id: str, + event_broker: EventBroker[Any], + service_manager: ServiceManager, + default_consumer_settings: EventConsumerSettings | None = None, + ): + """Initialise the runtime. + + Args: + node_id: Identifier for this deployed instance. + event_broker: The event broker for subscriber registration. + service_manager: The service manager for service lifecycle. + default_consumer_settings: Default settings for consumers. + Used when no settings provided to registration methods. + """ + self._node_id = node_id + self._event_broker = event_broker + self._service_manager = service_manager + self._default_consumer_settings = ( + default_consumer_settings or EventConsumerSettings() + ) + self._started = False + + @property + def node_id(self) -> str: + """The identifier for this deployed instance.""" + return self._node_id + + def _resolve_settings( + self, settings: EventConsumerSettings | None + ) -> EventConsumerSettings: + """Resolve settings, using defaults if none provided.""" + return settings or self._default_consumer_settings + + def _check_not_started(self) -> None: + """Raise if runtime has already started.""" + if self._started: + raise RuntimeError("Cannot register after runtime has started") + + async def register_subscriber( + self, + subscriber: EventSubscriber[Any], + ) -> None: + """Register a subscriber for work allocation only. + + The subscriber will receive event source allocations from the broker + but no consumption wrapping (error handling, polling) is applied. + + Use this when: + - You have a custom subscriber that manages its own consumption + - You need broker registration but handle polling yourself + + Args: + subscriber: The subscriber to register with the broker. + + Raises: + RuntimeError: If called after start(). + RuntimeError: If broker does not accept subscribers (e.g., + coordinator-only broker). + """ + self._check_not_started() + await self._event_broker.register(subscriber) + + async def register_consumer( + self, + consumer: EventConsumer[Any], + settings: EventConsumerSettings | None = None, + ) -> None: + """Register a consumer for continuous, error-resilient consumption. + + Wraps the consumer with error handling and polling services and + registers with the service manager. Does not register with the broker + for work allocation. + + Use this when: + - You have a consumer that doesn't need broker work allocation + - You're consuming from a known source directly + + Args: + consumer: The consumer to wrap and register. + settings: Consumer settings. Uses runtime defaults if not provided. + + Raises: + RuntimeError: If called after start(). + """ + self._check_not_started() + resolved_settings = self._resolve_settings(settings) + + error_handling_service = ErrorHandlingService( + callable=consumer.consume_all, + error_handler=resolved_settings.error_handler, + ) + + polling_service = PollingService( + callable=error_handling_service.execute, + poll_interval=resolved_settings.poll_interval, + ) + + self._service_manager.register( + polling_service, + execution_mode=ExecutionMode.BACKGROUND, + isolation_mode=resolved_settings.isolation_mode, + ) + + async def register_subscription_consumer( + self, + subscription_consumer: EventSubscriptionConsumer[Any], + settings: EventConsumerSettings | None = None, + ) -> None: + """Register a subscription consumer for allocation and consumption. + + Registers with the broker for work allocation and wraps with error + handling and polling for continuous consumption. This is the common + case for event processing. + + Args: + subscription_consumer: The subscription consumer to register. + settings: Consumer settings. Uses runtime defaults if not provided. + + Raises: + RuntimeError: If called after start(). + RuntimeError: If broker does not accept subscribers. + """ + self._check_not_started() + await self._event_broker.register(subscription_consumer) + await self.register_consumer(subscription_consumer, settings) + + async def start(self) -> None: + """Start the runtime. + + Registers the broker with the service manager and starts all services. + No further registrations are allowed after calling start(). + + Raises: + RuntimeError: If already started. + """ + self._check_not_started() + self._service_manager.register(self._event_broker) + self._started = True + await self._service_manager.start() + + async def stop(self) -> None: + """Stop the runtime. + + Stops all services including the broker. + """ + await self._service_manager.stop() +``` + +### 3. Add Module Structure + +**File**: `src/logicblocks/event/processing/runtime/__init__.py` (new file) + +```python +from .runtime import EventRuntime +from .settings import EventConsumerSettings + +__all__ = [ + "EventRuntime", + "EventConsumerSettings", +] +``` + +### 4. Export from Processing Package + +**File**: `src/logicblocks/event/processing/__init__.py` + +**Changes**: + +- Add imports for `EventRuntime` and `EventConsumerSettings` +- Add to `__all__` list + +--- + +## Testing Strategy + +### Unit Tests + +**File**: `tests/unit/logicblocks/event/processing/runtime/test_settings.py` (new) + +```python +from datetime import timedelta + +import pytest + +from logicblocks.event.processing.runtime import EventConsumerSettings +from logicblocks.event.processing.services import IsolationMode +from logicblocks.event.processing.services.error import ContinueErrorHandler + + +class TestEventConsumerSettings: + def test_default_error_handler_is_continue_error_handler(self): + settings = EventConsumerSettings() + assert isinstance(settings.error_handler, ContinueErrorHandler) + + def test_default_poll_interval_is_one_second(self): + settings = EventConsumerSettings() + assert settings.poll_interval == timedelta(seconds=1) + + def test_default_isolation_mode_is_main_thread(self): + settings = EventConsumerSettings() + assert settings.isolation_mode == IsolationMode.MAIN_THREAD + + def test_custom_values_are_preserved(self): + from logicblocks.event.processing.services.error import RaiseErrorHandler + + settings = EventConsumerSettings( + error_handler=RaiseErrorHandler(), + poll_interval=timedelta(milliseconds=500), + isolation_mode=IsolationMode.DEDICATED_THREAD, + ) + + assert isinstance(settings.error_handler, RaiseErrorHandler) + assert settings.poll_interval == timedelta(milliseconds=500) + assert settings.isolation_mode == IsolationMode.DEDICATED_THREAD + + def test_settings_are_immutable(self): + settings = EventConsumerSettings() + + with pytest.raises(AttributeError): + settings.poll_interval = timedelta(seconds=5) +``` + +**File**: `tests/unit/logicblocks/event/processing/runtime/test_runtime.py` (new) + +```python +from datetime import timedelta +from unittest.mock import AsyncMock, Mock + +import pytest + +from logicblocks.event.processing.broker import EventBroker +from logicblocks.event.processing.consumers import EventConsumer +from logicblocks.event.processing.consumers.subscription import ( + EventSubscriptionConsumer, +) +from logicblocks.event.processing.runtime import ( + EventConsumerSettings, + EventRuntime, +) +from logicblocks.event.processing.services import ( + ExecutionMode, + IsolationMode, + ServiceManager, +) +from logicblocks.event.processing.services.error import RaiseErrorHandler + + +class TestEventRuntimeConstruction: + def test_stores_node_id(self): + runtime = EventRuntime( + node_id="test-node", + event_broker=Mock(spec=EventBroker), + service_manager=Mock(spec=ServiceManager), + ) + + assert runtime.node_id == "test-node" + + def test_uses_provided_default_settings(self): + custom_settings = EventConsumerSettings( + poll_interval=timedelta(milliseconds=100) + ) + + runtime = EventRuntime( + node_id="test-node", + event_broker=Mock(spec=EventBroker), + service_manager=Mock(spec=ServiceManager), + default_consumer_settings=custom_settings, + ) + + assert runtime._default_consumer_settings == custom_settings + + def test_creates_default_settings_when_none_provided(self): + runtime = EventRuntime( + node_id="test-node", + event_broker=Mock(spec=EventBroker), + service_manager=Mock(spec=ServiceManager), + ) + + assert runtime._default_consumer_settings is not None + assert runtime._default_consumer_settings.poll_interval == timedelta( + seconds=1 + ) + + +class TestEventRuntimeRegisterSubscriber: + async def test_delegates_to_broker_register(self): + broker = Mock(spec=EventBroker) + broker.register = AsyncMock() + subscriber = Mock(spec=EventSubscriptionConsumer) + + runtime = EventRuntime( + node_id="test-node", + event_broker=broker, + service_manager=Mock(spec=ServiceManager), + ) + + await runtime.register_subscriber(subscriber) + + broker.register.assert_called_once_with(subscriber) + + async def test_raises_after_start(self): + broker = Mock(spec=EventBroker) + broker.register = AsyncMock() + service_manager = Mock(spec=ServiceManager) + service_manager.register = Mock(return_value=service_manager) + service_manager.start = AsyncMock() + + runtime = EventRuntime( + node_id="test-node", + event_broker=broker, + service_manager=service_manager, + ) + + await runtime.start() + + with pytest.raises(RuntimeError, match="after runtime has started"): + await runtime.register_subscriber(Mock(spec=EventSubscriptionConsumer)) + + async def test_propagates_broker_registration_error(self): + broker = Mock(spec=EventBroker) + broker.register = AsyncMock( + side_effect=RuntimeError("coordinator-only broker") + ) + + runtime = EventRuntime( + node_id="test-node", + event_broker=broker, + service_manager=Mock(spec=ServiceManager), + ) + + with pytest.raises(RuntimeError, match="coordinator-only"): + await runtime.register_subscriber(Mock(spec=EventSubscriptionConsumer)) + + +class TestEventRuntimeRegisterConsumer: + async def test_registers_polling_service_with_service_manager(self): + service_manager = Mock(spec=ServiceManager) + service_manager.register = Mock(return_value=service_manager) + consumer = Mock(spec=EventConsumer) + consumer.consume_all = AsyncMock() + + runtime = EventRuntime( + node_id="test-node", + event_broker=Mock(spec=EventBroker), + service_manager=service_manager, + ) + + await runtime.register_consumer(consumer) + + service_manager.register.assert_called_once() + call_args = service_manager.register.call_args + assert call_args.kwargs["execution_mode"] == ExecutionMode.BACKGROUND + + async def test_uses_provided_settings(self): + service_manager = Mock(spec=ServiceManager) + service_manager.register = Mock(return_value=service_manager) + consumer = Mock(spec=EventConsumer) + consumer.consume_all = AsyncMock() + + custom_settings = EventConsumerSettings( + isolation_mode=IsolationMode.DEDICATED_THREAD + ) + + runtime = EventRuntime( + node_id="test-node", + event_broker=Mock(spec=EventBroker), + service_manager=service_manager, + ) + + await runtime.register_consumer(consumer, settings=custom_settings) + + call_args = service_manager.register.call_args + assert call_args.kwargs["isolation_mode"] == IsolationMode.DEDICATED_THREAD + + async def test_uses_default_settings_when_none_provided(self): + service_manager = Mock(spec=ServiceManager) + service_manager.register = Mock(return_value=service_manager) + consumer = Mock(spec=EventConsumer) + consumer.consume_all = AsyncMock() + + runtime = EventRuntime( + node_id="test-node", + event_broker=Mock(spec=EventBroker), + service_manager=service_manager, + ) + + await runtime.register_consumer(consumer) + + call_args = service_manager.register.call_args + assert call_args.kwargs["isolation_mode"] == IsolationMode.MAIN_THREAD + + async def test_raises_after_start(self): + service_manager = Mock(spec=ServiceManager) + service_manager.register = Mock(return_value=service_manager) + service_manager.start = AsyncMock() + + runtime = EventRuntime( + node_id="test-node", + event_broker=Mock(spec=EventBroker), + service_manager=service_manager, + ) + + await runtime.start() + + with pytest.raises(RuntimeError, match="after runtime has started"): + await runtime.register_consumer(Mock(spec=EventConsumer)) + + +class TestEventRuntimeRegisterSubscriptionConsumer: + async def test_registers_with_broker(self): + broker = Mock(spec=EventBroker) + broker.register = AsyncMock() + service_manager = Mock(spec=ServiceManager) + service_manager.register = Mock(return_value=service_manager) + subscription_consumer = Mock(spec=EventSubscriptionConsumer) + subscription_consumer.consume_all = AsyncMock() + + runtime = EventRuntime( + node_id="test-node", + event_broker=broker, + service_manager=service_manager, + ) + + await runtime.register_subscription_consumer(subscription_consumer) + + broker.register.assert_called_once_with(subscription_consumer) + + async def test_registers_as_consumer_for_polling(self): + broker = Mock(spec=EventBroker) + broker.register = AsyncMock() + service_manager = Mock(spec=ServiceManager) + service_manager.register = Mock(return_value=service_manager) + subscription_consumer = Mock(spec=EventSubscriptionConsumer) + subscription_consumer.consume_all = AsyncMock() + + runtime = EventRuntime( + node_id="test-node", + event_broker=broker, + service_manager=service_manager, + ) + + await runtime.register_subscription_consumer(subscription_consumer) + + service_manager.register.assert_called_once() + + async def test_uses_provided_settings(self): + broker = Mock(spec=EventBroker) + broker.register = AsyncMock() + service_manager = Mock(spec=ServiceManager) + service_manager.register = Mock(return_value=service_manager) + subscription_consumer = Mock(spec=EventSubscriptionConsumer) + subscription_consumer.consume_all = AsyncMock() + + custom_settings = EventConsumerSettings( + isolation_mode=IsolationMode.SHARED_THREAD + ) + + runtime = EventRuntime( + node_id="test-node", + event_broker=broker, + service_manager=service_manager, + ) + + await runtime.register_subscription_consumer( + subscription_consumer, settings=custom_settings + ) + + call_args = service_manager.register.call_args + assert call_args.kwargs["isolation_mode"] == IsolationMode.SHARED_THREAD + + +class TestEventRuntimeLifecycle: + async def test_start_registers_broker_with_service_manager(self): + broker = Mock(spec=EventBroker) + service_manager = Mock(spec=ServiceManager) + service_manager.register = Mock(return_value=service_manager) + service_manager.start = AsyncMock() + + runtime = EventRuntime( + node_id="test-node", + event_broker=broker, + service_manager=service_manager, + ) + + await runtime.start() + + # Broker should be registered + service_manager.register.assert_called_with(broker) + + async def test_start_starts_service_manager(self): + service_manager = Mock(spec=ServiceManager) + service_manager.register = Mock(return_value=service_manager) + service_manager.start = AsyncMock() + + runtime = EventRuntime( + node_id="test-node", + event_broker=Mock(spec=EventBroker), + service_manager=service_manager, + ) + + await runtime.start() + + service_manager.start.assert_called_once() + + async def test_start_raises_if_already_started(self): + service_manager = Mock(spec=ServiceManager) + service_manager.register = Mock(return_value=service_manager) + service_manager.start = AsyncMock() + + runtime = EventRuntime( + node_id="test-node", + event_broker=Mock(spec=EventBroker), + service_manager=service_manager, + ) + + await runtime.start() + + with pytest.raises(RuntimeError, match="after runtime has started"): + await runtime.start() + + async def test_stop_stops_service_manager(self): + service_manager = Mock(spec=ServiceManager) + service_manager.register = Mock(return_value=service_manager) + service_manager.start = AsyncMock() + service_manager.stop = AsyncMock() + + runtime = EventRuntime( + node_id="test-node", + event_broker=Mock(spec=EventBroker), + service_manager=service_manager, + ) + + await runtime.start() + await runtime.stop() + + service_manager.stop.assert_called_once() +``` + +### Integration Tests + +**File**: `tests/integration/logicblocks/event/processing/runtime/test_runtime.py` (new) + +```python +class TestEventRuntimeIntegration: + @pytest_asyncio.fixture(autouse=True) + async def reinitialise_storage(self, open_connection_pool): + await drop_table(open_connection_pool, "events") + await drop_table(open_connection_pool, "subscribers") + await drop_table(open_connection_pool, "subscriptions") + await create_table(open_connection_pool, "events") + await create_table(open_connection_pool, "subscribers") + await create_table(open_connection_pool, "subscriptions") + + async def test_processes_events_through_subscription_consumer( + self, open_connection_pool + ): + """Verify events are processed when using register_subscription_consumer.""" + adapter = PostgresEventStorageAdapter(connection_source=open_connection_pool) + event_store = EventStore(adapter=adapter) + event_processor = CapturingEventProcessor() + + node_id = random_node_id() + category = random_category_name() + + event_broker = make_event_broker( + node_id=node_id, + broker_type=EventBrokerType.Distributed, + storage_type=EventBrokerStorageType.Postgres, + settings=DistributedEventBrokerSettings( + observer_synchronisation_interval=timedelta(milliseconds=100), + coordinator_distribution_interval=timedelta(milliseconds=100), + ), + connection_settings=connection_settings, + connection_pool=open_connection_pool, + adapter=adapter, + ) + + service_manager = ServiceManager() + + runtime = EventRuntime( + node_id=node_id, + event_broker=event_broker, + service_manager=service_manager, + default_consumer_settings=EventConsumerSettings( + poll_interval=timedelta(milliseconds=50), + ), + ) + + subscription_consumer = make_subscriber( + subscriber_group=f"test-{category}", + subscription_request=CategoryIdentifier(category=category), + subscriber_state_category=event_store.category( + category=f"state-{category}" + ), + event_processor=event_processor, + ... + ) + + await runtime.register_subscription_consumer(subscription_consumer) + + # Publish event before starting + await event_store.category(category=category).publish( + NewEvent(name="test-event", payload={"key": "value"}) + ) + + # Start runtime + start_task = asyncio.create_task(runtime.start()) + + try: + # Wait for event to be processed + await asyncio.wait_for( + wait_for_event_count(event_processor, 1), + timeout=10.0, + ) + + assert len(event_processor.events) == 1 + finally: + await runtime.stop() + start_task.cancel() + await asyncio.gather(start_task, return_exceptions=True) + + async def test_error_handling_continues_on_exception( + self, open_connection_pool + ): + """Verify processing continues after exception with ContinueErrorHandler.""" + # Similar setup with a processor that throws once then succeeds + pass +``` + +### Component Tests + +**File**: `tests/component/test_event_runtime.py` (new) + +```python +class TestEventRuntimeComponent: + async def test_end_to_end_event_processing_with_postgres(self): + """Full integration test with PostgreSQL storage.""" + # Complete setup with real storage + # Multiple subscription consumers + # Verify all events processed + pass +``` + +--- + +## Success Criteria + +### Automated Verification + +- [ ] All existing tests pass: `mise run test:unit` +- [ ] New runtime unit tests pass +- [ ] New settings unit tests pass +- [ ] Integration tests pass: `mise run test:integration` +- [ ] Component tests pass: `mise run test:component` +- [ ] Type checking passes: `mise run type:check` +- [ ] Linting passes: `mise run lint:fix` +- [ ] Formatting passes: `mise run format:fix` + +### Manual Verification + +- [ ] EventRuntime can be constructed with all required dependencies +- [ ] Registration methods work with real subscribers/consumers +- [ ] Start/stop lifecycle works correctly +- [ ] Error handling wrapper catches and handles exceptions +- [ ] Polling continuously calls consume_all at configured interval +- [ ] Registration after start raises clear error + +--- + +## Future Enhancements + +The following are explicitly out of scope for this implementation but are +documented for future consideration: + +1. **Dynamic Registration**: Allow `register_*` methods to be called after + `start()`. Requires changes to broker and service manager. + +2. **Factory Functions**: `make_event_runtime()` factory with overloads for + type-safe construction similar to `make_event_broker()`. + +3. **Builder Pattern**: `EventRuntimeBuilder` for fluent construction if + configuration becomes more complex. + +4. **Health/Status API**: Expose aggregate health status from broker and + registered consumers. + +5. **Graceful Shutdown**: Coordinate shutdown to drain in-flight events before + stopping. + +--- + +## References + +- Research document: `meta/research/2025-12-02-event-processing-node-abstraction.md` +- Example patterns: `meta/examples/subscriber_registration/` +- EventBroker interface: `src/logicblocks/event/processing/broker/base.py:11-14` +- ServiceManager: `src/logicblocks/event/processing/services/manager.py:174-224` +- EventSubscriptionConsumer: `src/logicblocks/event/processing/consumers/subscription.py:95` +- ErrorHandlingService: `src/logicblocks/event/processing/services/error.py:475-486` +- PollingService: `src/logicblocks/event/processing/services/polling.py:9-24` diff --git a/meta/research/2025-12-02-event-processing-node-abstraction.md b/meta/research/2025-12-02-event-processing-node-abstraction.md new file mode 100644 index 0000000..c6f74bc --- /dev/null +++ b/meta/research/2025-12-02-event-processing-node-abstraction.md @@ -0,0 +1,609 @@ +--- +date: 2025-12-02T01:39:06+00:00 +researcher: Claude Code +git_commit: 2af43d9b064e568e635e157f18679509bae8e4f3 +branch: event-processing-node-abstraction +repository: event.store +topic: "Event Processing Infrastructure and EventProcessingNode Abstraction Research" +tags: [ research, codebase, event-processing, event-broker, service-manager, subscribers, coordination, observation ] +status: complete +last_updated: 2025-12-02 +last_updated_by: Claude Code +--- + +# Research: Event Processing Infrastructure and EventProcessingNode Abstraction + +**Date**: 2025-12-02T01:39:06+00:00 +**Researcher**: Claude Code +**Git Commit**: 2af43d9b064e568e635e157f18679509bae8e4f3 +**Branch**: event-processing-node-abstraction +**Repository**: event.store + +## Research Question + +Perform deep research of the event processing infrastructure to understand: + +1. All components relevant to subscribing and consuming from event sources +2. Patterns used for wiring components together +3. Key abstractions involved (EventBroker, ServiceManager, subscribers, etc.) +4. The coordinator vs observer patterns +5. Testing strategy employed + +The goal is to inform the design of an `EventProcessingNode` abstraction that: + +- Sits over the top of the event broker and service manager +- Makes it simpler to register subscribers +- Encapsulates the broker instance +- Supports three node modes: coordinator-only, observer-only, or both + +## Summary + +The codebase implements a sophisticated distributed event processing system with +clear separation of concerns: + +1. **EventBroker**: Coordinates work allocation among subscribers using two + strategies: + - `SingletonEventBroker`: For single-process scenarios + - `DistributedEventBroker`: For multi-node distributed scenarios with leader + election + +2. **ServiceManager**: Orchestrates service lifecycle with execution modes ( + foreground/background) and isolation modes (main thread, shared thread, + dedicated thread) + +3. **Subscriber/Consumer Abstractions**: Two-tier consumer model with + `EventSubscriptionConsumer` (broker-mediated) delegating to + `EventSourceConsumer` (direct source consumption) + +4. **Coordinator/Observer Pattern**: The distributed broker separates: + - **Coordination**: Single leader allocates work via distributed lock + - **Observation**: All nodes apply allocation changes locally + +5. **Testing Strategy**: Three-tier approach (unit, integration, component) with + extensive use of mocks, test doubles, and real PostgreSQL for integration + tests + +## Detailed Findings + +### EventBroker Component + +The EventBroker is the central coordination mechanism for event distribution. + +#### Base Interface + +**File**: `src/logicblocks/event/processing/broker/base.py:11-14` + +```python +class EventBroker[E](Service[NoneType], Process, ABC): + @abstractmethod + async def register(self, subscriber: EventSubscriber[E]) -> None: + raise NotImplementedError +``` + +#### Strategy Implementations + +**SingletonEventBroker** (`broker/strategies/singleton/broker.py:26`): + +- Simple in-process distribution +- Maintains in-memory subscriber store +- Periodically iterates subscribers and distributes event sources +- Configuration: `distribution_interval` (default 60 seconds) + +**DistributedEventBroker** (`broker/strategies/distributed/broker.py:19`): + +- Three concurrent components: + 1. `EventSubscriberManager`: Manages subscriber lifecycle and heartbeats + 2. `EventSubscriptionCoordinator`: Leader-elected work allocator + 3. `EventSubscriptionObserver`: Applies allocation changes locally + +#### Coordinator vs Observer Architecture + +| Aspect | Coordinator | Observer | +|--------------------|---------------------------------------|-------------------------| +| **File** | `coordinator.py:112` | `observer.py:28` | +| **Concurrency** | One active (leader) | All nodes active | +| **State Access** | Read subscribers, write subscriptions | Read subscriptions only | +| **Responsibility** | Decide work allocation | Apply work allocation | +| **Uses Lock** | Yes (leader election) | No | +| **Operates On** | All subscribers globally | Local subscribers only | + +**Coordinator Work Distribution** (`coordinator.py:199-364`): + +1. Fetches existing subscriptions and active subscribers +2. Groups subscribers by `subscriber_group` +3. For each group, identifies eligible subscribers (matching subscription + requests) +4. Chunks new event sources across eligible subscribers using round-robin +5. Creates ADD/REMOVE/REPLACE changes and applies atomically + +**Observer Synchronisation** (`observer.py:77-113`): + +1. Fetches updated subscriptions from state store +2. Computes diff against cached subscriptions +3. Calls `subscriber.withdraw(source)` for revocations +4. Calls `subscriber.accept(source)` for allocations + +#### Configuration Options + +**DistributedEventBrokerSettings** (`distributed/builder.py:22-33`): + +- `subscriber_manager_heartbeat_interval`: 10 seconds +- `subscriber_manager_purge_interval`: 1 minute +- `subscriber_manager_subscriber_max_age`: 10 minutes +- `coordinator_distribution_interval`: 20 seconds +- `coordinator_leadership_max_duration`: 15 minutes +- `observer_synchronisation_interval`: 20 seconds + +--- + +### ServiceManager Component + +**File**: `src/logicblocks/event/processing/services/manager.py:174` + +The ServiceManager orchestrates service lifecycle with configurable execution +and isolation modes. + +#### Service Protocol + +```python +class Service[T = Any](ABC): + @abstractmethod + async def execute(self) -> T: + raise NotImplementedError() +``` + +#### Execution Modes (`manager.py:14-16`) + +- **FOREGROUND**: `ServiceManager.start()` blocks until service completes +- **BACKGROUND**: Service runs concurrently without blocking + +#### Isolation Modes (`manager.py:19-22`) + +- **MAIN_THREAD**: Runs on main event loop thread via `asyncio.create_task()` +- **SHARED_THREAD**: Multiple services share a background thread with `uvloop` +- **DEDICATED_THREAD**: Each service gets its own background thread + +#### Service Lifecycle + +1. **Registration**: + `manager.register(service, execution_mode, isolation_mode)` - stores service + definition +2. **Start**: `await manager.start()` - starts executor, schedules all services, + waits for foreground +3. **Stop**: `await manager.stop()` - cancels tasks, shuts down thread pools + +#### Signal Handling + +```python +manager.stop_on([signal.SIGINT, signal.SIGTERM]) +``` + +Configures graceful shutdown on OS signals. + +--- + +### Subscriber/Consumer Abstractions + +#### Two-Tier Consumer Model + +**EventSubscriptionConsumer** (`consumers/subscription.py:95`): + +- Receives event sources from broker via `accept(source)` / `withdraw(source)` +- Maintains dict of delegates keyed by source identifier +- `consume_all()` iterates all delegates and calls their `consume_all()` + +**EventSourceConsumer** (`consumers/source.py:139`): + +- Consumes events from a single `EventSource` +- Tracks state via `EventConsumerStateStore` +- Supports three processor types: + - `EventProcessor`: Simple callback-based + - `AutoCommitEventIteratorProcessor`: Auto-acknowledges each event + - `ManagedEventIteratorProcessor`: Manual acknowledgment control + +#### Factory Pattern: `make_subscriber` + +**File**: `consumers/subscription.py:54-92` + +Creates a fully-configured subscription consumer: + +1. Generates subscriber ID (UUID) +2. Creates `EventConsumerStateStore` with persistence interval +3. Defines delegate factory closure capturing shared state +4. Returns `EventSubscriptionConsumer` with factory + +#### Subscription Request Types + +**CategoryIdentifier** (`types/identifier.py:84-98`): + +```python +CategoryIdentifier(category="orders") +``` + +**StreamIdentifier** (`types/identifier.py:127-150`): + +```python +StreamIdentifier(category="orders", stream="order-123") +``` + +**LogIdentifier** (`types/identifier.py:49-61`): + +```python +LogIdentifier() # Entire event log +``` + +#### State Persistence + +**EventConsumerStateStore** (`consumers/state/base.py:76`): + +- `record_processed(event)`: Records event processing with lag counter +- `save_if_needed()`: Saves when lag >= persistence_interval +- `save()`: Forces immediate save with optimistic concurrency +- `load_to_query_constraint()`: Returns resumption constraint for iteration + +--- + +### Error Handling and Polling Services + +#### Error Handler Protocol + +```python +class ErrorHandler[T](ABC): + @abstractmethod + def handle(self, exception: BaseException) -> ErrorHandlerDecision[T]: + raise NotImplementedError +``` + +#### Concrete Handlers + +| Handler | Behaviour | +|---------------------------|-------------------------------------------------| +| `ExitErrorHandler` | Raises `SystemExit` with configurable exit code | +| `RaiseErrorHandler` | Re-raises exception (optionally transformed) | +| `ContinueErrorHandler` | Returns default value, continues execution | +| `RetryErrorHandler` | Retries operation in loop | +| `TypeMappingErrorHandler` | Maps exception types to specific handlers | + +#### ErrorHandlingService (`services/error.py:475-486`) + +Wraps any callable with error handling: + +```python +error_service = ErrorHandlingService( + callable=subscriber.consume_all, + error_handler=ContinueErrorHandler(), +) +``` + +The `execute()` method implements retry loop with decision pattern matching: + +- `RetryErrorHandlerDecision` → `continue` (retry) +- `ContinueErrorHandlerDecision` → return value +- `RaiseErrorHandlerDecision` → re-raise +- `ExitErrorHandlerDecision` → raise `SystemExit` + +#### PollingService (`services/polling.py:9-24`) + +Continuously executes a callable at intervals: + +```python +polling_service = PollingService( + callable=error_handling_service.execute, + poll_interval=timedelta(seconds=1), +) +``` + +#### Composition Pattern (from examples) + +```python +# 1. Create subscriber +subscriber = make_subscriber(...) + +# 2. Wrap with error handling +error_service = ErrorHandlingService( + callable=subscriber.consume_all, + error_handler=ContinueErrorHandler(), +) + +# 3. Wrap with polling +polling_service = PollingService( + callable=error_service.execute, + poll_interval=timedelta(seconds=1), +) + +# 4. Register with broker and service manager +await event_broker.register(subscriber) +service_manager.register( + polling_service, + execution_mode=ExecutionMode.BACKGROUND, +) +``` + +--- + +### Wiring Patterns from Examples + +The `meta/examples/subscriber_registration/` directory shows several patterns: + +#### Pattern 1: All-in-One Function (`all_in_one_function.py`) + +Single function that creates subscriber, wraps with error handling and polling, +registers with broker and service manager. + +#### Pattern 2: Consumer Registrar (`consumer_registrar.py`) + +Abstract base class with: + +- Class-level configuration (`subscriber_group`, `category`) +- Instance-level dependencies (`event_store`, `event_processor`) +- `register_as_service()` method encapsulating wiring + +#### Pattern 3: Register Many Function (`register_many_function.py`) + +Batch registration with `init_event_services()` function that: + +- Creates broker and service manager +- Collects all subscription consumers from domain modules +- Registers each with error handling and polling + +#### Pattern 4: Subsystem Creator (`subsytem_creator.py`) + +`ProviderService` class that creates multiple related services: + +- Projection services +- Ingestion services +- Optional enable/disable per service + +--- + +### Testing Strategy + +#### Three-Tier Approach + +**Unit Tests** (`tests/unit/`): + +- Mock all dependencies with `Mock(spec=Interface)` +- Test single component behaviour +- Fast execution, no external dependencies +- Example: `test_broker.py` uses mocked coordinator/observer/manager + +**Integration Tests** (`tests/integration/`): + +- Use real PostgreSQL via `AsyncConnectionPool` +- Test interactions between components +- Database fixtures reset schema between tests +- Example: `test_broker.py` tests 50+ subscribers across multiple nodes + +**Component Tests** (`tests/component/`): + +- Combine multiple real components +- Validate end-to-end workflows +- Use `ServiceManager` to orchestrate lifecycle +- Example: `test_asynchronous_projections.py` tests full projection pipeline + +#### Test Patterns + +**Capturing Test Doubles**: + +```python +class CapturingEventProcessor(EventProcessor): + def __init__(self): + self.events = [] + + async def process_event(self, event): + self.events.append(event) +``` + +**NodeSet Pattern** (integration tests): +Manages multiple broker nodes for distributed testing with dynamic +start/stop/replacement. + +**Builder Pattern**: + +```python +NewEventBuilder().with_category("orders").with_payload({...}).build() +``` + +**Shared Test Cases**: +Abstract base test classes extended by adapter-specific tests (e.g., +`LockManagerCases`). + +--- + +## Architecture Insights + +### Key Design Patterns + +1. **Strategy Pattern**: Two broker implementations (Singleton vs Distributed) + behind common interface + +2. **Factory Pattern**: `make_subscriber()`, `make_event_broker()` construct + complex object graphs + +3. **Builder Pattern**: `DistributedEventBrokerBuilder` for fluent construction + +4. **Observer Pattern**: Distributed observer watches subscription state and + propagates to local subscribers + +5. **Decorator/Wrapper Pattern**: `ErrorHandlingService` wraps callable, + `PollingService` wraps service + +6. **Leader Election**: Coordinator uses distributed advisory lock for mutual + exclusion + +7. **Heartbeat Pattern**: Subscriber manager sends periodic heartbeats for + liveness tracking + +### node_id Concept + +A string identifier representing a physical instance/host in the distributed +system. Used by: + +- Coordinator: For logging which node is coordinating +- Observer: For logging which node is observing +- Subscriber Manager: For tracking which node owns subscribers +- State stores: For associating subscriptions with nodes + +### Process Abstraction + +All long-running components implement `Process` with: + +- `status` property returning `ProcessStatus` (INITIALISED, STARTING, RUNNING, + STOPPED, ERRORED) +- Lifecycle tracking through status transitions + +--- + +## Implications for EventProcessingNode + +Based on this research, an `EventProcessingNode` abstraction could: + +### Encapsulation + +- Wrap `EventBroker`, `ServiceManager`, and related infrastructure +- Own the `node_id` concept +- Manage lifecycle of all node-scoped components + +### Node Modes + +| Mode | Coordinator | Observer | Subscribers | +|----------------------|-------------|----------|-------------| +| **Coordinator-only** | Yes | No | No | +| **Observer-only** | No | Yes | Yes | +| **Both** | Yes | Yes | Yes | + +For coordinator-only nodes: + +- Run only `EventSubscriberManager` (for heartbeats) and + `EventSubscriptionCoordinator` +- No local subscribers registered +- Could use `SingletonEventBroker` if no distribution needed + +For observer-only nodes: + +- Run `EventSubscriberManager` and `EventSubscriptionObserver` +- No `EventSubscriptionCoordinator` +- Register and process subscribers locally + +For both: + +- Current `DistributedEventBroker` behaviour +- Full participation in coordination and observation + +### Registration Simplification + +Current pattern: + +```python +subscriber = make_subscriber(...) +error_service = ErrorHandlingService(callable=subscriber.consume_all, ...) +polling_service = PollingService(callable=error_service.execute, ...) +await event_broker.register(subscriber) +service_manager.register(polling_service, ...) +``` + +Simplified with `EventProcessingNode`: + +```python +node = EventProcessingNode(mode=NodeMode.BOTH, ...) +node.register_subscriber( + group="projections", + category="orders", + processor=my_processor, + error_handler=ContinueErrorHandler(), + poll_interval=timedelta(seconds=1), +) +await node.start() +``` + +### Configuration + +Could consolidate: + +- Broker settings (distribution interval, leadership duration, etc.) +- Service manager settings (stop signals, isolation modes) +- Subscriber defaults (persistence interval, poll interval, error handling) + +--- + +## Code References + +### Core Components + +- `src/logicblocks/event/processing/broker/base.py:11-14` - EventBroker + interface +- +`src/logicblocks/event/processing/broker/strategies/distributed/broker.py:19-57` - +DistributedEventBroker +- +`src/logicblocks/event/processing/broker/strategies/singleton/broker.py:26-84` - +SingletonEventBroker +- `src/logicblocks/event/processing/services/manager.py:174-224` - + ServiceManager +- `src/logicblocks/event/processing/consumers/subscription.py:54-192` - + EventSubscriptionConsumer & make_subscriber +- `src/logicblocks/event/processing/consumers/source.py:139-239` - + EventSourceConsumer + +### Coordinator/Observer + +- +`src/logicblocks/event/processing/broker/strategies/distributed/coordinator.py:112-364` - +EventSubscriptionCoordinator +- +`src/logicblocks/event/processing/broker/strategies/distributed/observer.py:28-113` - +EventSubscriptionObserver +- +`src/logicblocks/event/processing/broker/strategies/distributed/subscribers/manager.py:39-148` - +EventSubscriberManager + +### Error Handling & Polling + +- `src/logicblocks/event/processing/services/error.py:52-486` - Error handlers + and ErrorHandlingService +- `src/logicblocks/event/processing/services/polling.py:9-24` - PollingService + +### State Management + +- `src/logicblocks/event/processing/consumers/state/base.py:76-198` - + EventConsumerStateStore +- +`src/logicblocks/event/processing/broker/strategies/distributed/subscribers/stores/state/base.py:14-60` - +EventSubscriberStateStore +- +`src/logicblocks/event/processing/broker/strategies/distributed/subscriptions/stores/state/base.py:17-72` - +EventSubscriptionStateStore + +### Tests + +- `tests/unit/logicblocks/event/processing/services/test_manager.py:1-730` - + ServiceManager tests +- +`tests/unit/logicblocks/event/processing/brokers/strategies/distributed/test_broker.py` - +Broker unit tests +- +`tests/integration/logicblocks/event/processing/broker/strategies/distributed/test_broker.py` - +Broker integration tests +- `tests/component/test_processing.py` - End-to-end processing tests + +--- + +## Open Questions + +1. **Node Identity**: Should `EventProcessingNode` generate its own `node_id` or + accept one? How does this relate to deployment topology? + +2. **Dynamic Mode Switching**: Can a node switch between coordinator-only and + both modes at runtime, or is this fixed at construction? + +3. **Subscriber Hot-Swap**: Should the node support adding/removing subscribers + while running? + +4. **Health Reporting**: Should the node expose aggregate health status from all + components? + +5. **Graceful Shutdown**: How should shutdown coordinate between service manager + stop and broker unregistration? + +6. **Configuration Hierarchy**: Should node-level defaults be overridable + per-subscriber?