Skip to content

Commit 606bef6

Browse files
committed
Add a retention_days option to schedule database cleanup.
- Fixes the MDB_MAP_FULL error.
1 parent 54b8009 commit 606bef6

File tree

4 files changed

+139
-9
lines changed

4 files changed

+139
-9
lines changed

app/main.py

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from contextlib import asynccontextmanager
23

34
from fastapi import FastAPI
@@ -6,21 +7,64 @@
67
from .server.chat import router as chat_router
78
from .server.health import router as health_router
89
from .server.middleware import add_cors_middleware, add_exception_handler
9-
from .services.pool import GeminiClientPool
10+
from .services import GeminiClientPool, LMDBConversationStore
11+
12+
RETENTION_CLEANUP_INTERVAL_SECONDS = 6 * 60 * 60 # 6 hours
13+
14+
15+
async def _run_retention_cleanup(stop_event: asyncio.Event) -> None:
16+
"""
17+
Periodically enforce LMDB retention policy until the stop_event is set.
18+
"""
19+
store = LMDBConversationStore()
20+
if store.retention_days <= 0:
21+
logger.info("LMDB retention cleanup disabled; skipping scheduler.")
22+
return
23+
24+
logger.info(
25+
f"Starting LMDB retention cleanup task (retention={store.retention_days} day(s), interval={RETENTION_CLEANUP_INTERVAL_SECONDS} seconds)."
26+
)
27+
28+
while not stop_event.is_set():
29+
try:
30+
store.cleanup_expired()
31+
except Exception:
32+
logger.exception("LMDB retention cleanup task failed.")
33+
34+
try:
35+
await asyncio.wait_for(
36+
stop_event.wait(),
37+
timeout=RETENTION_CLEANUP_INTERVAL_SECONDS,
38+
)
39+
except asyncio.TimeoutError:
40+
continue
41+
42+
logger.info("LMDB retention cleanup task stopped.")
1043

1144

1245
@asynccontextmanager
1346
async def lifespan(app: FastAPI):
47+
cleanup_stop_event = asyncio.Event()
48+
cleanup_task: asyncio.Task | None = None
1449
try:
1550
pool = GeminiClientPool()
1651
await pool.init()
52+
53+
cleanup_task = asyncio.create_task(_run_retention_cleanup(cleanup_stop_event))
54+
55+
logger.info(f"Gemini clients initialized: {[c.id for c in pool.clients]}.")
56+
logger.info("Gemini API Server ready to serve requests.")
57+
yield
1758
except Exception as e:
1859
logger.exception(f"Failed to initialize Gemini clients: {e}")
1960
raise
20-
21-
logger.success(f"Gemini clients initialized: {[c.id for c in pool.clients]}.")
22-
logger.success("Gemini API Server ready to serve requests.")
23-
yield
61+
finally:
62+
cleanup_stop_event.set()
63+
if cleanup_task:
64+
try:
65+
await cleanup_task
66+
except asyncio.CancelledError:
67+
logger.debug("LMDB retention cleanup task cancelled during shutdown.")
2468

2569

2670
def create_app() -> FastAPI:

app/services/lmdb.py

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import hashlib
22
import re
33
from contextlib import contextmanager
4-
from datetime import datetime
4+
from datetime import datetime, timedelta
55
from pathlib import Path
66
from typing import Any, Dict, List, Optional
77

@@ -39,7 +39,12 @@ class LMDBConversationStore(metaclass=Singleton):
3939

4040
HASH_LOOKUP_PREFIX = "hash:"
4141

42-
def __init__(self, db_path: Optional[str] = None, max_db_size: Optional[int] = None):
42+
def __init__(
43+
self,
44+
db_path: Optional[str] = None,
45+
max_db_size: Optional[int] = None,
46+
retention_days: Optional[int] = None,
47+
):
4348
"""
4449
Initialize LMDB store.
4550
@@ -52,9 +57,12 @@ def __init__(self, db_path: Optional[str] = None, max_db_size: Optional[int] = N
5257
db_path = g_config.storage.path
5358
if max_db_size is None:
5459
max_db_size = g_config.storage.max_size
60+
if retention_days is None:
61+
retention_days = g_config.storage.retention_days
5562

5663
self.db_path: Path = Path(db_path)
5764
self.max_db_size: int = max_db_size
65+
self.retention_days: int = max(0, int(retention_days))
5866
self._env: lmdb.Environment | None = None
5967

6068
self._ensure_db_path()
@@ -310,6 +318,78 @@ def keys(self, prefix: str = "", limit: Optional[int] = None) -> List[str]:
310318

311319
return keys
312320

321+
def cleanup_expired(self, retention_days: Optional[int] = None) -> int:
322+
"""
323+
Delete conversations older than the given retention period.
324+
325+
Args:
326+
retention_days: Optional override for retention period in days.
327+
328+
Returns:
329+
Number of conversations removed.
330+
"""
331+
retention_value = (
332+
self.retention_days if retention_days is None else max(0, int(retention_days))
333+
)
334+
if retention_value <= 0:
335+
logger.debug("Retention cleanup skipped because retention is disabled.")
336+
return 0
337+
338+
cutoff = datetime.now() - timedelta(days=retention_value)
339+
expired_entries: list[tuple[str, ConversationInStore]] = []
340+
341+
try:
342+
with self._get_transaction(write=False) as txn:
343+
cursor = txn.cursor()
344+
345+
for key_bytes, value_bytes in cursor:
346+
key_str = key_bytes.decode("utf-8")
347+
if key_str.startswith(self.HASH_LOOKUP_PREFIX):
348+
continue
349+
350+
try:
351+
storage_data = orjson.loads(value_bytes) # type: ignore[arg-type]
352+
conv = ConversationInStore.model_validate(storage_data)
353+
except Exception as exc:
354+
logger.warning(f"Failed to decode record for key {key_str}: {exc}")
355+
continue
356+
357+
timestamp = conv.created_at or conv.updated_at
358+
if not timestamp:
359+
continue
360+
361+
if timestamp < cutoff:
362+
expired_entries.append((key_str, conv))
363+
except Exception as exc:
364+
logger.error(f"Failed to scan LMDB for retention cleanup: {exc}")
365+
raise
366+
367+
if not expired_entries:
368+
return 0
369+
370+
removed = 0
371+
try:
372+
with self._get_transaction(write=True) as txn:
373+
for key_str, conv in expired_entries:
374+
key_bytes = key_str.encode("utf-8")
375+
if not txn.delete(key_bytes):
376+
continue
377+
378+
message_hash = _hash_conversation(conv.client_id, conv.model, conv.messages)
379+
if message_hash and key_str != message_hash:
380+
txn.delete(f"{self.HASH_LOOKUP_PREFIX}{message_hash}".encode("utf-8"))
381+
removed += 1
382+
except Exception as exc:
383+
logger.error(f"Failed to delete expired conversations: {exc}")
384+
raise
385+
386+
if removed:
387+
logger.info(
388+
f"LMDB retention cleanup removed {removed} conversation(s) older than {cutoff.isoformat()}."
389+
)
390+
391+
return removed
392+
313393
def stats(self) -> Dict[str, Any]:
314394
"""
315395
Get database statistics.

app/utils/config.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,15 @@ class StorageConfig(BaseModel):
8484
description="Path to the storage directory where data will be saved",
8585
)
8686
max_size: int = Field(
87-
default=1024**2 * 128, # 128 MB
87+
default=1024**2 * 256, # 256 MB
8888
ge=1,
8989
description="Maximum size of the storage in bytes",
9090
)
91+
retention_days: int = Field(
92+
default=14,
93+
ge=0,
94+
description="Number of days to retain conversations before automatic cleanup (0 disables cleanup)",
95+
)
9196

9297

9398
class LoggingConfig(BaseModel):

config/config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ gemini:
2929

3030
storage:
3131
path: "data/lmdb" # Database storage path
32-
max_size: 134217728 # Maximum database size (128 MB)
32+
max_size: 268435456 # Maximum database size (256 MB)
33+
retention_days: 14 # Number of days to retain conversations before cleanup
3334

3435
logging:
3536
level: "INFO" # Log level: DEBUG, INFO, WARNING, ERROR

0 commit comments

Comments
 (0)