Skip to content

Commit ad62981

Browse files
feat: Enhance tank control service with connection management and reset functionality
1 parent 456d88d commit ad62981

File tree

5 files changed

+398
-17
lines changed

5 files changed

+398
-17
lines changed

control_broker/app.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from datetime import timedelta
77
from typing import List
88

9-
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
9+
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
1010
from fastapi.middleware.cors import CORSMiddleware
1111
import redis.asyncio as redis
1212
from redis import exceptions as redis_exceptions
@@ -25,7 +25,10 @@
2525
config = get_config()
2626

2727
# Initialize connection manager
28-
manager = ConnectionManager()
28+
manager = ConnectionManager(
29+
stale_timeout_seconds=config.tank_stale_timeout_seconds,
30+
prune_interval_seconds=config.tank_prune_interval_seconds,
31+
)
2932
radar_broker = RadarBroker()
3033
redis_client_lock = asyncio.Lock()
3134

@@ -88,6 +91,7 @@ async def get_redis_client() -> redis.Redis:
8891
async def on_startup() -> None:
8992
"""Initialize Redis connection and start command listener."""
9093
await reset_redis_client()
94+
await manager.start()
9195

9296
# Start Redis command stream listener
9397
listener = RedisCommandListener(get_redis_client, reset_redis_client, config, manager)
@@ -108,6 +112,9 @@ async def on_shutdown() -> None:
108112
await redis_client.close()
109113
app.state.redis = None
110114

115+
await manager.stop()
116+
await manager.close_all()
117+
111118

112119
# ========================================
113120
# Middleware
@@ -141,6 +148,15 @@ async def list_tanks() -> List[dict]:
141148
return await manager.snapshot()
142149

143150

151+
@app.post("/tanks/{tank_id}/reset")
152+
async def reset_tank(tank_id: str) -> dict:
153+
"""Forcefully reset a tank connection and clear its state."""
154+
reset = await manager.force_reset(tank_id)
155+
if not reset:
156+
raise HTTPException(status_code=404, detail="Tank not found")
157+
return {"status": "reset", "tankId": tank_id, "timestamp": utcnow().isoformat()}
158+
159+
144160
@app.get("/radars")
145161
async def list_radars() -> List[dict]:
146162
"""List all connected radar sources."""

control_broker/core/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ class Config:
1515
redis_status_maxlen: int
1616
redis_radar_stream: str
1717
redis_radar_maxlen: int
18+
tank_stale_timeout_seconds: int
19+
tank_prune_interval_seconds: int
1820

1921

2022
def get_config() -> Config:
@@ -27,4 +29,6 @@ def get_config() -> Config:
2729
redis_status_maxlen=int(os.getenv("REDIS_STATUS_MAXLEN", "500")),
2830
redis_radar_stream=os.getenv("REDIS_RADAR_STREAM", "tank_radar"),
2931
redis_radar_maxlen=int(os.getenv("REDIS_RADAR_MAXLEN", "1000")),
32+
tank_stale_timeout_seconds=int(os.getenv("TANK_STALE_TIMEOUT_SECONDS", "600")),
33+
tank_prune_interval_seconds=int(os.getenv("TANK_PRUNE_INTERVAL_SECONDS", "30")),
3034
)

control_broker/services/connection_manager.py

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import json
55
from contextlib import suppress
66
from datetime import timedelta
7-
from typing import Dict, List, Optional
7+
from typing import Dict, List, Optional, Tuple
88
from fastapi import WebSocket
99

1010
from models import TankInfo
@@ -14,25 +14,57 @@
1414
class ConnectionManager:
1515
"""Manages WebSocket connections to tanks."""
1616

17-
def __init__(self) -> None:
17+
def __init__(
18+
self,
19+
*,
20+
stale_timeout_seconds: int = 600,
21+
prune_interval_seconds: int = 30,
22+
) -> None:
1823
self._tanks: Dict[str, TankInfo] = {}
1924
self._lock = asyncio.Lock()
20-
self._stale_timeout = timedelta(minutes=10)
25+
self._stale_timeout = timedelta(seconds=max(1, stale_timeout_seconds))
26+
self._prune_interval = timedelta(seconds=max(5, prune_interval_seconds))
27+
self._maintenance_task: Optional[asyncio.Task] = None
2128

22-
async def _prune_stale(self) -> None:
29+
async def start(self) -> None:
30+
"""Start background maintenance tasks."""
31+
if self._maintenance_task and not self._maintenance_task.done():
32+
return
33+
self._maintenance_task = asyncio.create_task(self._run_auto_prune())
34+
35+
async def stop(self) -> None:
36+
"""Stop maintenance tasks and wait for completion."""
37+
task = self._maintenance_task
38+
if task:
39+
task.cancel()
40+
with suppress(asyncio.CancelledError):
41+
await task
42+
self._maintenance_task = None
43+
44+
async def _run_auto_prune(self) -> None:
45+
"""Periodically prune stale tank connections."""
46+
try:
47+
while True:
48+
await asyncio.sleep(self._prune_interval.total_seconds())
49+
await self._prune_stale(reason="auto-prune")
50+
except asyncio.CancelledError:
51+
pass
52+
53+
async def _prune_stale(self, *, reason: str = "stale") -> None:
2354
"""Remove tanks that have been inactive for longer than the timeout."""
2455
now = utcnow()
25-
to_close: List[WebSocket] = []
56+
to_close: List[Tuple[str, WebSocket]] = []
2657
async with self._lock:
2758
for tank_id, info in list(self._tanks.items()):
2859
if (now - info.last_seen) > self._stale_timeout:
2960
websocket = info.websocket
3061
if websocket is not None:
31-
to_close.append(websocket)
62+
to_close.append((tank_id, websocket))
3263
self._tanks.pop(tank_id, None)
33-
for websocket in to_close:
64+
for tank_id, websocket in to_close:
3465
with suppress(Exception):
3566
await websocket.close(code=1011)
67+
print(f"[MANAGER] Pruned tank '{tank_id}' due to {reason}")
3668

3769
async def register_tank(self, tank_id: str, websocket: WebSocket) -> TankInfo:
3870
"""Register a new tank connection."""
@@ -110,3 +142,28 @@ async def update_last_seen(self, tank_id: str, payload: Optional[dict]) -> None:
110142
info.last_seen = utcnow()
111143
if payload is not None:
112144
info.last_payload = payload
145+
146+
async def force_reset(self, tank_id: str) -> bool:
147+
"""Forcefully terminate and remove a tank connection."""
148+
async with self._lock:
149+
info = self._tanks.pop(tank_id, None)
150+
if not info:
151+
return False
152+
websocket = info.websocket
153+
if websocket:
154+
with suppress(Exception):
155+
await websocket.close(code=1012)
156+
print(f"[MANAGER] Forced reset for tank '{tank_id}'")
157+
return True
158+
159+
async def close_all(self) -> None:
160+
"""Close all tracked tank connections."""
161+
async with self._lock:
162+
entries = list(self._tanks.items())
163+
self._tanks.clear()
164+
for tank_id, info in entries:
165+
websocket = info.websocket
166+
if websocket:
167+
with suppress(Exception):
168+
await websocket.close(code=1001)
169+
print(f"[MANAGER] Closed connection for tank '{tank_id}' during shutdown")

0 commit comments

Comments
 (0)