|
| 1 | +""" |
| 2 | +DivineBus - Secure RPC communication channel for LilithOS <-> AthenaCore |
| 3 | +
|
| 4 | +This module implements an encrypted, authenticated socket-based RPC system that allows |
| 5 | +bidirectional communication between LilithOS and AthenaCore. All communications are |
| 6 | +encrypted using AES-256-GCM for confidentiality and authenticity. |
| 7 | +""" |
| 8 | + |
| 9 | +import asyncio |
| 10 | +import json |
| 11 | +import logging |
| 12 | +import os |
| 13 | +import struct |
| 14 | +from typing import Any, Callable, Dict, Optional, Tuple |
| 15 | + |
| 16 | +from cryptography.hazmat.primitives.ciphers.aead import AESGCM |
| 17 | +from cryptography.hazmat.primitives import hashes, hmac |
| 18 | +from cryptography.hazmat.primitives.kdf.hkdf import HKDF |
| 19 | +from cryptography.exceptions import InvalidTag |
| 20 | + |
| 21 | +# Configure logging |
| 22 | +logger = logging.getLogger("lilith.divine_bus") |
| 23 | + |
| 24 | +class DivineBus: |
| 25 | + """Secure RPC communication bus for LilithOS <-> AthenaCore communication.""" |
| 26 | + |
| 27 | + def __init__(self, config_path: str = "config/athena.json"): |
| 28 | + """Initialize the DivineBus with configuration. |
| 29 | + |
| 30 | + Args: |
| 31 | + config_path: Path to Athena configuration file |
| 32 | + """ |
| 33 | + self.config_path = config_path |
| 34 | + self.config = self._load_config() |
| 35 | + self.aes_key = self._derive_key(self.config["shared_secret"].encode()) |
| 36 | + self.running = False |
| 37 | + self.server = None |
| 38 | + self._handlers = {} |
| 39 | + self._default_handler = None |
| 40 | + self._connection_id = 0 |
| 41 | + |
| 42 | + # Register default handlers |
| 43 | + self.register_handler("ping", self._handle_ping) |
| 44 | + self.register_handler("restart_module", self._handle_restart_module) |
| 45 | + self.register_handler("get_metrics", self._handle_get_metrics) |
| 46 | + self.register_handler("sync_heartbeat", self._handle_sync_heartbeat) |
| 47 | + self.register_handler("send_alert", self._handle_send_alert) |
| 48 | + |
| 49 | + def _load_config(self) -> Dict[str, Any]: |
| 50 | + """Load Athena configuration.""" |
| 51 | + try: |
| 52 | + with open(self.config_path, 'r') as f: |
| 53 | + return json.load(f) |
| 54 | + except FileNotFoundError: |
| 55 | + logger.warning(f"Configuration file {self.config_path} not found. Using defaults.") |
| 56 | + return { |
| 57 | + "host": "0.0.0.0", |
| 58 | + "port": 9001, |
| 59 | + "shared_secret": os.urandom(32).hex(), # Generate a random secret if none exists |
| 60 | + "auth_timeout": 5.0, |
| 61 | + "max_message_size": 1048576, # 1MB |
| 62 | + } |
| 63 | + |
| 64 | + def _derive_key(self, secret: bytes, salt: bytes = b'lilith-divine-bus') -> bytes: |
| 65 | + """Derive a secure encryption key from the shared secret.""" |
| 66 | + hkdf = HKDF( |
| 67 | + algorithm=hashes.SHA256(), |
| 68 | + length=32, # 256 bits for AES-256 |
| 69 | + salt=salt, |
| 70 | + info=b'divine-bus-key', |
| 71 | + ) |
| 72 | + return hkdf.derive(secret) |
| 73 | + |
| 74 | + async def start(self): |
| 75 | + """Start the DivineBus server.""" |
| 76 | + if self.running: |
| 77 | + logger.warning("DivineBus is already running") |
| 78 | + return |
| 79 | + |
| 80 | + self.running = True |
| 81 | + try: |
| 82 | + self.server = await asyncio.start_server( |
| 83 | + self._handle_connection, |
| 84 | + host=self.config["host"], |
| 85 | + port=self.config["port"] |
| 86 | + ) |
| 87 | + logger.info(f"DivineBus listening on {self.config['host']}:{self.config['port']}") |
| 88 | + |
| 89 | + # Keep the server running |
| 90 | + async with self.server: |
| 91 | + await self.server.serve_forever() |
| 92 | + |
| 93 | + except Exception as e: |
| 94 | + logger.error(f"DivineBus server error: {e}") |
| 95 | + raise |
| 96 | + finally: |
| 97 | + self.running = False |
| 98 | + |
| 99 | + async def stop(self): |
| 100 | + """Stop the DivineBus server.""" |
| 101 | + if not self.running or not self.server: |
| 102 | + return |
| 103 | + |
| 104 | + self.running = False |
| 105 | + self.server.close() |
| 106 | + await self.server.wait_closed() |
| 107 | + logger.info("DivineBus server stopped") |
| 108 | + |
| 109 | + def register_handler(self, method: str, handler: Callable): |
| 110 | + """Register an RPC handler for a specific method. |
| 111 | + |
| 112 | + Args: |
| 113 | + method: The RPC method name |
| 114 | + handler: Async function that takes (params: Dict) and returns a serializable result |
| 115 | + """ |
| 116 | + self._handlers[method] = handler |
| 117 | + |
| 118 | + def set_default_handler(self, handler: Callable): |
| 119 | + """Set the default handler for unregistered methods.""" |
| 120 | + self._default_handler = handler |
| 121 | + |
| 122 | + async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): |
| 123 | + """Handle a new client connection.""" |
| 124 | + conn_id = self._connection_id |
| 125 | + self._connection_id += 1 |
| 126 | + |
| 127 | + client_addr = writer.get_extra_info('peername') |
| 128 | + logger.info(f"New DivineBus connection from {client_addr} [conn-{conn_id}]") |
| 129 | + |
| 130 | + try: |
| 131 | + while self.running: |
| 132 | + # Read message length (4 bytes, big-endian) |
| 133 | + try: |
| 134 | + msg_len_bytes = await asyncio.wait_for( |
| 135 | + reader.readexactly(4), |
| 136 | + timeout=30.0 |
| 137 | + ) |
| 138 | + except (asyncio.IncompleteReadError, asyncio.TimeoutError): |
| 139 | + logger.debug(f"Connection {conn_id} timed out or closed") |
| 140 | + break |
| 141 | + |
| 142 | + msg_len = struct.unpack('>I', msg_len_bytes)[0] |
| 143 | + |
| 144 | + # Validate message length |
| 145 | + if msg_len > self.config["max_message_size"]: |
| 146 | + logger.warning(f"Message too large: {msg_len} bytes") |
| 147 | + await self._send_error(writer, "message_too_large", "Message exceeds maximum size") |
| 148 | + break |
| 149 | + |
| 150 | + # Read encrypted message |
| 151 | + try: |
| 152 | + encrypted_msg = await asyncio.wait_for( |
| 153 | + reader.readexactly(msg_len), |
| 154 | + timeout=5.0 |
| 155 | + ) |
| 156 | + except (asyncio.IncompleteReadError, asyncio.TimeoutError): |
| 157 | + logger.warning(f"Failed to read message from {client_addr}") |
| 158 | + break |
| 159 | + |
| 160 | + # Decrypt and process message |
| 161 | + try: |
| 162 | + decrypted_msg = self._decrypt(encrypted_msg) |
| 163 | + result = await self._handle_rpc(decrypted_msg) |
| 164 | + response = self._create_response(result=result) |
| 165 | + except Exception as e: |
| 166 | + logger.error(f"Error processing RPC: {e}", exc_info=True) |
| 167 | + response = self._create_response(error={"code": "internal_error", "message": str(e)}) |
| 168 | + |
| 169 | + # Send response |
| 170 | + await self._send_encrypted(writer, response) |
| 171 | + |
| 172 | + except Exception as e: |
| 173 | + logger.error(f"Connection {conn_id} error: {e}", exc_info=True) |
| 174 | + finally: |
| 175 | + writer.close() |
| 176 | + try: |
| 177 | + await writer.wait_closed() |
| 178 | + except Exception: |
| 179 | + pass |
| 180 | + logger.info(f"Connection {conn_id} closed") |
| 181 | + |
| 182 | + async def _handle_rpc(self, message: Dict) -> Any: |
| 183 | + """Handle an incoming RPC message.""" |
| 184 | + method = message.get("method") |
| 185 | + params = message.get("params", {}) |
| 186 | + |
| 187 | + if not method: |
| 188 | + raise ValueError("No method specified") |
| 189 | + |
| 190 | + handler = self._handlers.get(method) |
| 191 | + if not handler: |
| 192 | + if self._default_handler: |
| 193 | + return await self._default_handler(method, params) |
| 194 | + raise ValueError(f"Unknown method: {method}") |
| 195 | + |
| 196 | + return await handler(params) |
| 197 | + |
| 198 | + async def _handle_ping(self, params: Dict) -> Dict: |
| 199 | + """Handle ping request.""" |
| 200 | + return {"status": "alive", "timestamp": asyncio.get_event_loop().time()} |
| 201 | + |
| 202 | + async def _handle_restart_module(self, params: Dict) -> Dict: |
| 203 | + """Handle module restart request.""" |
| 204 | + module = params.get("module") |
| 205 | + if not module: |
| 206 | + raise ValueError("No module specified") |
| 207 | + |
| 208 | + # TODO: Implement actual module restart logic |
| 209 | + logger.info(f"Restarting module: {module}") |
| 210 | + return {"status": "restarting", "module": module} |
| 211 | + |
| 212 | + async def _handle_get_metrics(self, params: Dict) -> Dict: |
| 213 | + """Handle metrics collection request.""" |
| 214 | + # TODO: Implement actual metrics collection |
| 215 | + from .performance import collect_metrics |
| 216 | + return await collect_metrics() |
| 217 | + |
| 218 | + async def _handle_sync_heartbeat(self, params: Dict) -> Dict: |
| 219 | + """Handle heartbeat synchronization.""" |
| 220 | + # TODO: Implement heartbeat sync logic |
| 221 | + return {"status": "synced", "timestamp": asyncio.get_event_loop().time()} |
| 222 | + |
| 223 | + async def _handle_send_alert(self, params: Dict) -> Dict: |
| 224 | + """Handle alert notification.""" |
| 225 | + message = params.get("message") |
| 226 | + level = params.get("level", "info") |
| 227 | + |
| 228 | + if not message: |
| 229 | + raise ValueError("No message provided") |
| 230 | + |
| 231 | + # TODO: Implement actual alert handling |
| 232 | + logger.log( |
| 233 | + getattr(logging, level.upper(), logging.INFO), |
| 234 | + f"ALERT ({level}): {message}" |
| 235 | + ) |
| 236 | + |
| 237 | + return {"status": "alert_sent", "message": message, "level": level} |
| 238 | + |
| 239 | + def _encrypt(self, data: bytes) -> bytes: |
| 240 | + """Encrypt data using AES-GCM.""" |
| 241 | + aesgcm = AESGCM(self.aes_key) |
| 242 | + nonce = os.urandom(12) # 96-bit nonce for GCM |
| 243 | + ct = aesgcm.encrypt(nonce, data, None) |
| 244 | + return nonce + ct # Return nonce || ciphertext || tag |
| 245 | + |
| 246 | + def _decrypt(self, data: bytes) -> bytes: |
| 247 | + """Decrypt data using AES-GCM.""" |
| 248 | + if len(data) < 28: # 12-byte nonce + 16-byte tag |
| 249 | + raise ValueError("Invalid encrypted data") |
| 250 | + |
| 251 | + nonce = data[:12] |
| 252 | + ct = data[12:] |
| 253 | + |
| 254 | + aesgcm = AESGCM(self.aes_key) |
| 255 | + try: |
| 256 | + return aesgcm.decrypt(nonce, ct, None) |
| 257 | + except InvalidTag: |
| 258 | + raise ValueError("Invalid authentication tag") |
| 259 | + |
| 260 | + def _create_response(self, result: Any = None, error: Dict = None) -> Dict: |
| 261 | + """Create an RPC response.""" |
| 262 | + if result is not None and error is not None: |
| 263 | + raise ValueError("Cannot specify both result and error") |
| 264 | + |
| 265 | + response = {"jsonrpc": "2.0"} |
| 266 | + |
| 267 | + if error is not None: |
| 268 | + response["error"] = error |
| 269 | + else: |
| 270 | + response["result"] = result |
| 271 | + |
| 272 | + return response |
| 273 | + |
| 274 | + async def _send_encrypted(self, writer: asyncio.StreamWriter, data: Dict): |
| 275 | + """Send encrypted JSON-RPC response.""" |
| 276 | + try: |
| 277 | + json_data = json.dumps(data).encode('utf-8') |
| 278 | + encrypted = self._encrypt(json_data) |
| 279 | + |
| 280 | + # Send message length (4 bytes, big-endian) |
| 281 | + writer.write(struct.pack('>I', len(encrypted))) |
| 282 | + |
| 283 | + # Send encrypted message |
| 284 | + writer.write(encrypted) |
| 285 | + await writer.drain() |
| 286 | + |
| 287 | + except Exception as e: |
| 288 | + logger.error(f"Failed to send response: {e}", exc_info=True) |
| 289 | + raise |
| 290 | + |
| 291 | + async def _send_error(self, writer: asyncio.StreamWriter, code: str, message: str): |
| 292 | + """Send an error response and close the connection.""" |
| 293 | + error = {"code": code, "message": message} |
| 294 | + response = self._create_response(error=error) |
| 295 | + await self._send_encrypted(writer, response) |
| 296 | + writer.close() |
| 297 | + await writer.wait_closed() |
| 298 | + |
| 299 | + |
| 300 | +# Singleton instance |
| 301 | +divine_bus = DivineBus() |
| 302 | + |
| 303 | +async def start_divine_bus(): |
| 304 | + """Start the DivineBus server.""" |
| 305 | + await divine_bus.start() |
| 306 | + |
| 307 | +async def stop_divine_bus(): |
| 308 | + """Stop the DivineBus server.""" |
| 309 | + await divine_bus.stop() |
| 310 | + |
| 311 | +# Example usage |
| 312 | +if __name__ == "__main__": |
| 313 | + import logging |
| 314 | + logging.basicConfig(level=logging.INFO) |
| 315 | + |
| 316 | + async def main(): |
| 317 | + bus = DivineBus() |
| 318 | + try: |
| 319 | + await bus.start() |
| 320 | + except KeyboardInterrupt: |
| 321 | + await bus.stop() |
| 322 | + |
| 323 | + asyncio.run(main()) |
0 commit comments