Skip to content

Commit 342b371

Browse files
author
Sunny Khandokar
committed
feat(core): Add LilithKit, DivineBus, and LogKitten
- Implement LilithKit as the soulbound guardian daemon for process monitoring - Add DivineBus for inter-process communication with pub/sub - Introduce LogKitten for unified structured logging - Set up initial integration between components
1 parent 5f0e781 commit 342b371

File tree

3 files changed

+1156
-0
lines changed

3 files changed

+1156
-0
lines changed

core/divinebus.py

Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
"""
2+
DivineBus - The Divine Event Bus for LilithOS
3+
4+
A high-performance, thread-safe event bus for inter-process communication
5+
and reactive programming across the LilithOS ecosystem.
6+
"""
7+
8+
import asyncio
9+
import json
10+
import logging
11+
import time
12+
import uuid
13+
from collections import defaultdict
14+
from dataclasses import dataclass, field, asdict
15+
from enum import Enum
16+
from typing import Any, Callable, Dict, List, Optional, Set, Union, Coroutine
17+
from pathlib import Path
18+
19+
import aiofiles
20+
import aiofiles.os
21+
from typing_extensions import Protocol
22+
23+
# Constants
24+
DEFAULT_HISTORY_SIZE = 1000
25+
MAX_EVENT_SIZE = 1024 * 1024 # 1MB max event size
26+
27+
class EventPriority(Enum):
28+
"""Priority levels for event delivery."""
29+
LOW = 0
30+
NORMAL = 1
31+
HIGH = 2
32+
CRITICAL = 3
33+
34+
@dataclass
35+
class Event:
36+
"""Base event class for all DivineBus events."""
37+
name: str
38+
data: Dict[str, Any] = field(default_factory=dict)
39+
source: str = "system"
40+
timestamp: float = field(default_factory=time.time)
41+
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
42+
priority: EventPriority = EventPriority.NORMAL
43+
ttl: Optional[float] = None # Time to live in seconds
44+
45+
def to_dict(self) -> Dict[str, Any]:
46+
"""Convert event to dictionary for serialization."""
47+
return {
48+
"name": self.name,
49+
"data": self.data,
50+
"source": self.source,
51+
"timestamp": self.timestamp,
52+
"event_id": self.event_id,
53+
"priority": self.priority.name,
54+
"ttl": self.ttl
55+
}
56+
57+
@classmethod
58+
def from_dict(cls, data: Dict[str, Any]) -> 'Event':
59+
"""Create an event from a dictionary."""
60+
return cls(
61+
name=data["name"],
62+
data=data.get("data", {}),
63+
source=data.get("source", "system"),
64+
timestamp=data.get("timestamp", time.time()),
65+
event_id=data.get("event_id", str(uuid.uuid4())),
66+
priority=EventPriority[data.get("priority", "NORMAL")],
67+
ttl=data.get("ttl")
68+
)
69+
70+
class EventHandler(Protocol):
71+
"""Protocol for event handler functions."""
72+
async def __call__(self, event: Event) -> None:
73+
...
74+
75+
class Subscription:
76+
"""Represents an event subscription."""
77+
78+
def __init__(
79+
self,
80+
event_name: str,
81+
callback: EventHandler,
82+
priority: EventPriority = EventPriority.NORMAL,
83+
filter_fn: Optional[Callable[[Event], bool]] = None
84+
):
85+
self.event_name = event_name
86+
self.callback = callback
87+
self.priority = priority
88+
self.filter_fn = filter_fn or (lambda _: True)
89+
self.active = True
90+
self.id = str(uuid.uuid4())
91+
92+
async def handle(self, event: Event) -> None:
93+
"""Handle an event if the subscription is active and filter passes."""
94+
if self.active and self.filter_fn(event):
95+
await self.callback(event)
96+
97+
class DivineBus:
98+
"""
99+
The Divine Event Bus for LilithOS.
100+
101+
A high-performance, thread-safe event bus that supports:
102+
- Synchronous and asynchronous event handling
103+
- Priority-based event delivery
104+
- Event persistence and replay
105+
- Cross-process communication
106+
- Event filtering and transformation
107+
"""
108+
109+
_instance = None
110+
111+
def __new__(cls):
112+
if cls._instance is None:
113+
cls._instance = super().__new__(cls)
114+
cls._instance._initialized = False
115+
return cls._instance
116+
117+
def __init__(self):
118+
if self._initialized:
119+
return
120+
121+
self._logger = logging.getLogger("DivineBus")
122+
self._subscriptions: Dict[str, List[Subscription]] = defaultdict(list)
123+
self._event_history: Dict[str, List[Event]] = defaultdict(list)
124+
self._history_size = DEFAULT_HISTORY_SIZE
125+
self._event_loop = asyncio.get_event_loop()
126+
self._persistence_path = Path("/var/lilithos/events")
127+
self._persistence_path.mkdir(parents=True, exist_ok=True)
128+
self._initialized = True
129+
self._logger.info("DivineBus initialized")
130+
131+
async def publish(self, event_name: str, data: Optional[Dict[str, Any]] = None, **kwargs) -> str:
132+
"""
133+
Publish an event to the bus.
134+
135+
Args:
136+
event_name: Name of the event
137+
data: Event data dictionary
138+
**kwargs: Additional event attributes (source, priority, ttl)
139+
140+
Returns:
141+
str: The event ID
142+
"""
143+
event = Event(
144+
name=event_name,
145+
data=data or {},
146+
**{k: v for k, v in kwargs.items() if hasattr(Event, k)}
147+
)
148+
149+
self._logger.debug(f"Publishing event: {event_name} (ID: {event.event_id})")
150+
151+
# Store in history
152+
self._event_history[event_name].append(event)
153+
if len(self._event_history[event_name]) > self._history_size:
154+
self._event_history[event_name].pop(0)
155+
156+
# Persist if needed
157+
if event.priority in [EventPriority.HIGH, EventPriority.CRITICAL]:
158+
await self._persist_event(event)
159+
160+
# Deliver to subscribers
161+
await self._deliver_event(event)
162+
163+
return event.event_id
164+
165+
async def _deliver_event(self, event: Event) -> None:
166+
"""Deliver an event to all matching subscribers."""
167+
# Get all matching subscriptions
168+
subscriptions = []
169+
170+
# Exact match
171+
if event.name in self._subscriptions:
172+
subscriptions.extend(self._subscriptions[event.name])
173+
174+
# Wildcard match (e.g., "system.*")
175+
for event_pattern, subs in self._subscriptions.items():
176+
if ".*" in event_pattern:
177+
pattern = event_pattern.replace(".*", "[^.]*")
178+
import re
179+
if re.fullmatch(pattern, event.name):
180+
subscriptions.extend(s for s in subs if s not in subscriptions)
181+
182+
# Sort by priority (highest first)
183+
subscriptions.sort(key=lambda s: s.priority.value, reverse=True)
184+
185+
# Deliver to subscribers
186+
for subscription in subscriptions:
187+
try:
188+
await subscription.handle(event)
189+
except Exception as e:
190+
self._logger.error(
191+
f"Error in event handler for {event.name}: {e}",
192+
exc_info=True
193+
)
194+
195+
def subscribe(
196+
self,
197+
event_name: str,
198+
callback: EventHandler,
199+
priority: EventPriority = EventPriority.NORMAL,
200+
filter_fn: Optional[Callable[[Event], bool]] = None
201+
) -> Subscription:
202+
"""
203+
Subscribe to events matching the given name pattern.
204+
205+
Args:
206+
event_name: Event name or pattern (supports * wildcard)
207+
callback: Async callback function to handle the event
208+
priority: Priority for event delivery
209+
filter_fn: Optional filter function to further filter events
210+
211+
Returns:
212+
Subscription: A subscription object that can be used to unsubscribe
213+
"""
214+
subscription = Subscription(event_name, callback, priority, filter_fn)
215+
self._subscriptions[event_name].append(subscription)
216+
self._logger.debug(f"New subscription for {event_name} (ID: {subscription.id})")
217+
return subscription
218+
219+
def unsubscribe(self, subscription: Subscription) -> None:
220+
"""Remove an event subscription."""
221+
if subscription.event_name in self._subscriptions:
222+
self._subscriptions[subscription.event_name] = [
223+
s for s in self._subscriptions[subscription.event_name]
224+
if s.id != subscription.id
225+
]
226+
227+
async def _persist_event(self, event: Event) -> None:
228+
"""Persist an event to disk for durability."""
229+
try:
230+
event_path = self._persistence_path / f"{int(time.time())}_{event.event_id}.json"
231+
async with aiofiles.open(event_path, 'w') as f:
232+
await f.write(json.dumps(event.to_dict()))
233+
except Exception as e:
234+
self._logger.error(f"Failed to persist event {event.event_id}: {e}")
235+
236+
async def replay_events(
237+
self,
238+
event_name: Optional[str] = None,
239+
since: Optional[float] = None,
240+
limit: int = 100
241+
) -> List[Event]:
242+
"""
243+
Replay historical events.
244+
245+
Args:
246+
event_name: Optional event name filter
247+
since: Optional timestamp to get events after
248+
limit: Maximum number of events to return
249+
250+
Returns:
251+
List[Event]: List of matching events
252+
"""
253+
events = []
254+
255+
# Get from memory history first
256+
for name, event_list in self._event_history.items():
257+
if event_name and name != event_name:
258+
continue
259+
260+
for event in event_list:
261+
if since and event.timestamp < since:
262+
continue
263+
events.append(event)
264+
265+
# Get from disk if needed
266+
if len(events) < limit and self._persistence_path.exists():
267+
# Implementation for reading from disk would go here
268+
pass
269+
270+
# Sort by timestamp and apply limit
271+
events.sort(key=lambda e: e.timestamp)
272+
return events[-limit:]
273+
274+
async def wait_for(
275+
self,
276+
event_name: str,
277+
timeout: Optional[float] = None,
278+
filter_fn: Optional[Callable[[Event], bool]] = None
279+
) -> Event:
280+
"""
281+
Wait for an event matching the given criteria.
282+
283+
Args:
284+
event_name: Name of the event to wait for
285+
timeout: Maximum time to wait in seconds
286+
filter_fn: Optional filter function
287+
288+
Returns:
289+
Event: The matching event
290+
291+
Raises:
292+
asyncio.TimeoutError: If the timeout is reached
293+
"""
294+
event_future = self._event_loop.create_future()
295+
296+
def handler(event: Event) -> None:
297+
if not filter_fn or filter_fn(event):
298+
if not event_future.done():
299+
event_future.set_result(event)
300+
301+
subscription = self.subscribe(event_name, handler)
302+
303+
try:
304+
return await asyncio.wait_for(event_future, timeout)
305+
except asyncio.TimeoutError:
306+
self.unsubscribe(subscription)
307+
raise
308+
finally:
309+
self.unsubscribe(subscription)
310+
311+
# Singleton instance
312+
event_bus = DivineBus()
313+
314+
# Shortcut functions for common operations
315+
async def publish(event_name: str, data: Optional[Dict[str, Any]] = None, **kwargs) -> str:
316+
"""Publish an event to the default event bus."""
317+
return await event_bus.publish(event_name, data, **kwargs)
318+
319+
def subscribe(
320+
event_name: str,
321+
callback: EventHandler,
322+
priority: EventPriority = EventPriority.NORMAL,
323+
filter_fn: Optional[Callable[[Event], bool]] = None
324+
) -> Subscription:
325+
"""Subscribe to events on the default event bus."""
326+
return event_bus.subscribe(event_name, callback, priority, filter_fn)
327+
328+
async def wait_for(
329+
event_name: str,
330+
timeout: Optional[float] = None,
331+
filter_fn: Optional[Callable[[Event], bool]] = None
332+
) -> Event:
333+
"""Wait for an event on the default event bus."""
334+
return await event_bus.wait_for(event_name, timeout, filter_fn)

0 commit comments

Comments
 (0)