-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.py
More file actions
143 lines (114 loc) · 4.57 KB
/
main.py
File metadata and controls
143 lines (114 loc) · 4.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os
import weaviate
from fastapi import APIRouter, FastAPI, HTTPException, Request
from fastapi.middleware import Middleware
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
from contextlib import asynccontextmanager
from a2a.routes import router as a2a_router
import logs
logs_router = logs.router
from middleware.auth import jwt_auth_mw
from middleware.session import session_mw
from proxy.engine import router as proxy_router
from usage.factory import _select_backend, get_usage_backend
from usage.metrics import mount_metrics
from utils.env import int_env
try:
from middleware.quota import TokenQuotaMiddleware
QUOTA_AVAILABLE = True
except ImportError:
QUOTA_AVAILABLE = False
mem_router = APIRouter(prefix="/mem", tags=["memory"])
@mem_router.get("/events")
async def get_memory_events(request: Request, limit: int = 10):
"""Fetch recent MemoryEvent objects from Weaviate."""
try:
user_sub = getattr(request.state, "sub", None)
if not user_sub:
raise HTTPException(status_code=401, detail="User not authenticated")
client = weaviate.Client(os.getenv("WEAVIATE_URL", "http://localhost:6666"))
if not client.is_ready():
raise HTTPException(status_code=503, detail="Weaviate is not ready")
try:
schema = client.schema.get()
classes = {c["class"] for c in schema.get("classes", [])}
if "MemoryEvent" not in classes:
return {"data": {"Get": {"MemoryEvent": []}}}
except Exception:
return {"data": {"Get": {"MemoryEvent": []}}}
result = (
client.query.get(
"MemoryEvent",
["timestamp", "event", "user", "state"],
)
.with_additional(["id"])
.with_limit(limit)
.with_sort([{"path": ["timestamp"], "order": "desc"}])
.do()
)
if "errors" in result:
raise HTTPException(
status_code=500, detail=f"GraphQL error: {result['errors']}"
)
if "data" not in result:
raise HTTPException(status_code=500, detail="No data in response")
events = result["data"]["Get"]["MemoryEvent"]
try:
raw_objects = client.data_object.get(class_name="MemoryEvent", limit=limit)
id_to_full_object = {}
for obj in raw_objects.get("objects", []):
obj_id = obj.get("id")
if obj_id:
id_to_full_object[obj_id] = obj.get("properties", {})
for event in events:
event_id = event.get("_additional", {}).get("id")
if event_id and event_id in id_to_full_object:
full_props = id_to_full_object[event_id]
if "result" in full_props:
event["result"] = full_props["result"]
for field in ["event", "session_id", "task_id", "user"]:
if field in full_props:
event[field] = full_props[field]
except Exception:
pass
return result
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error fetching memory events: {str(e)}"
)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifespan - startup and shutdown."""
backend_selector = _select_backend()
app.state.usage = get_usage_backend(backend_selector)
mount_metrics(app)
yield
if hasattr(app.state.usage, 'aclose'):
await app.state.usage.aclose()
app = FastAPI(title="attach-gateway", lifespan=lifespan)
# Add middleware in correct order (CORS outer-most)
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:9000", "http://127.0.0.1:9000"],
allow_methods=["*"],
allow_headers=["*"],
allow_credentials=True,
)
# Only add quota middleware if available and explicitly configured
limit = int_env("MAX_TOKENS_PER_MIN", 60000)
if QUOTA_AVAILABLE and limit is not None:
app.add_middleware(TokenQuotaMiddleware)
app.add_middleware(BaseHTTPMiddleware, dispatch=jwt_auth_mw)
app.add_middleware(BaseHTTPMiddleware, dispatch=session_mw)
@app.get("/auth/config")
async def auth_config():
return {
"domain": os.getenv("AUTH0_DOMAIN"),
"client_id": os.getenv("AUTH0_CLIENT"),
"audience": os.getenv("OIDC_AUD"),
}
app.include_router(a2a_router, prefix="/a2a")
app.include_router(logs_router)
app.include_router(mem_router)
app.include_router(proxy_router)