Skip to content

Commit a8223ab

Browse files
committed
missing file
1 parent bc4e217 commit a8223ab

File tree

1 file changed

+220
-0
lines changed

1 file changed

+220
-0
lines changed

src/ezmsg/core/profiling.py

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
import time
2+
from collections import defaultdict, deque
3+
from typing import Any
4+
from uuid import UUID
5+
6+
# Sliding window used for profiling summaries. Keep a small window to bound memory.
7+
PROFILE_WINDOW_S = 10.0
8+
9+
10+
class SlidingWindow:
11+
"""
12+
Maintain a rolling window of timestamped values and provide simple summaries.
13+
"""
14+
15+
def __init__(self, max_window_s: float = PROFILE_WINDOW_S) -> None:
16+
self.max_window_s = max_window_s
17+
self._events: deque[tuple[float, float]] = deque()
18+
self._total = 0.0
19+
20+
def add(self, value: float, now: float | None = None) -> None:
21+
now = time.perf_counter() if now is None else now
22+
self._events.append((now, value))
23+
self._total += value
24+
self._prune(now)
25+
26+
def _prune(self, now: float, window_s: float | None = None) -> None:
27+
window = self.max_window_s if window_s is None else min(
28+
window_s, self.max_window_s
29+
)
30+
cutoff = now - window
31+
while self._events and self._events[0][0] < cutoff:
32+
_, val = self._events.popleft()
33+
self._total -= val
34+
35+
def summary(
36+
self, window_s: float | None = None, now: float | None = None
37+
) -> dict[str, float]:
38+
now = time.perf_counter() if now is None else now
39+
self._prune(now)
40+
41+
if window_s is None or window_s > self.max_window_s:
42+
window_s = self.max_window_s
43+
44+
cutoff = now - window_s
45+
total = 0.0
46+
count = 0
47+
max_val = 0.0
48+
for ts, val in self._events:
49+
if ts >= cutoff:
50+
total += val
51+
count += 1
52+
if val > max_val:
53+
max_val = val
54+
55+
avg = total / count if count else 0.0
56+
57+
return {
58+
"count": count,
59+
"total": total,
60+
"avg": avg,
61+
"max": max_val,
62+
"window_s": window_s,
63+
}
64+
65+
66+
class LeaseDurationTelemetry:
67+
"""
68+
Track lease/free durations from Backpressure to estimate processing latency.
69+
"""
70+
71+
def __init__(self, max_window_s: float = PROFILE_WINDOW_S, per_client: bool = False):
72+
self.max_window_s = max_window_s
73+
self.per_client = per_client
74+
self._starts: dict[tuple[UUID, int], float] = {}
75+
76+
if per_client:
77+
self._windows: dict[UUID, SlidingWindow] = defaultdict(
78+
lambda: SlidingWindow(max_window_s)
79+
)
80+
else:
81+
self._window = SlidingWindow(max_window_s)
82+
83+
def on_lease(self, client_id: UUID, buf_idx: int, now: float | None = None) -> None:
84+
self._starts[(client_id, buf_idx)] = time.perf_counter() if now is None else now
85+
86+
def on_free(
87+
self, client_id: UUID, buf_idx: int | None, now: float | None = None
88+
) -> None:
89+
now = time.perf_counter() if now is None else now
90+
91+
if buf_idx is None:
92+
# Client disconnected; drop any inflight timers.
93+
for key in [k for k in self._starts if k[0] == client_id]:
94+
self._starts.pop(key, None)
95+
return
96+
97+
start = self._starts.pop((client_id, buf_idx), None)
98+
if start is None:
99+
return
100+
101+
duration = now - start
102+
if self.per_client:
103+
self._windows[client_id].add(duration, now)
104+
else:
105+
self._window.add(duration, now)
106+
107+
def summary(
108+
self, window_s: float | None = None, now: float | None = None
109+
) -> dict[Any, dict[str, float]]:
110+
now = time.perf_counter() if now is None else now
111+
if self.per_client:
112+
return {
113+
client_id: window.summary(window_s=window_s, now=now)
114+
for client_id, window in self._windows.items()
115+
}
116+
else:
117+
return {None: self._window.summary(window_s=window_s, now=now)}
118+
119+
def in_flight(self, client_id: UUID | None = None) -> int:
120+
if client_id is None:
121+
return len(self._starts)
122+
return sum(1 for cid, _ in self._starts if cid == client_id)
123+
124+
125+
class PublisherTelemetry:
126+
"""
127+
Record publish throughput and backpressure latency for a Publisher.
128+
"""
129+
130+
def __init__(self, max_window_s: float = PROFILE_WINDOW_S) -> None:
131+
self.max_window_s = max_window_s
132+
self.msg_window = SlidingWindow(max_window_s)
133+
self.byte_window = SlidingWindow(max_window_s)
134+
self.backpressure = LeaseDurationTelemetry(max_window_s)
135+
self.total_messages = 0
136+
self.total_bytes = 0
137+
138+
def record_message(self, num_bytes: int | None, now: float | None = None) -> None:
139+
now = time.perf_counter() if now is None else now
140+
self.msg_window.add(1.0, now)
141+
self.total_messages += 1
142+
143+
if num_bytes is not None:
144+
self.byte_window.add(float(num_bytes), now)
145+
self.total_bytes += num_bytes
146+
147+
def snapshot(self, window_s: float | None = None) -> dict[str, Any]:
148+
now = time.perf_counter()
149+
msg_stats = self.msg_window.summary(window_s=window_s, now=now)
150+
byte_stats = self.byte_window.summary(window_s=window_s, now=now)
151+
bp_stats = list(self.backpressure.summary(window_s=window_s, now=now).values())[
152+
0
153+
]
154+
155+
window = msg_stats["window_s"]
156+
message_rate = (
157+
msg_stats["count"] / window if window > 0 else 0.0
158+
)
159+
byte_rate = (
160+
byte_stats["total"] / window if window > 0 else 0.0
161+
)
162+
163+
return {
164+
"window_s": window,
165+
"message_rate_hz": message_rate,
166+
"byte_rate_per_s": byte_rate,
167+
"messages": {
168+
"total": self.total_messages,
169+
"window": msg_stats["count"],
170+
},
171+
"bytes": {
172+
"total": self.total_bytes,
173+
"window": int(byte_stats["total"]),
174+
},
175+
"backpressure": {
176+
"avg_ms": bp_stats["avg"] * 1000.0 if bp_stats["count"] else 0.0,
177+
"max_ms": bp_stats["max"] * 1000.0 if bp_stats["count"] else 0.0,
178+
"samples": bp_stats["count"],
179+
"in_flight": self.backpressure.in_flight(),
180+
},
181+
}
182+
183+
184+
class ChannelTelemetry:
185+
"""
186+
Track per-subscriber processing time on a Channel.
187+
"""
188+
189+
def __init__(self, max_window_s: float = PROFILE_WINDOW_S) -> None:
190+
self.max_window_s = max_window_s
191+
self.leases = LeaseDurationTelemetry(max_window_s, per_client=True)
192+
193+
def snapshot(
194+
self,
195+
window_s: float | None = None,
196+
handles: dict[UUID, str] | None = None,
197+
) -> dict[str, Any]:
198+
now = time.perf_counter()
199+
lease_stats = self.leases.summary(window_s=window_s, now=now)
200+
201+
subscribers: list[dict[str, Any]] = []
202+
for client_id, stats in lease_stats.items():
203+
window = stats["window_s"]
204+
subscribers.append(
205+
{
206+
"subscriber_id": str(client_id),
207+
"handle": handles.get(client_id, None) if handles else None,
208+
"avg_ms": stats["avg"] * 1000.0 if stats["count"] else 0.0,
209+
"max_ms": stats["max"] * 1000.0 if stats["count"] else 0.0,
210+
"samples": stats["count"],
211+
"message_rate_hz": stats["count"] / window if window > 0 else 0.0,
212+
"in_flight": self.leases.in_flight(client_id),
213+
}
214+
)
215+
216+
return {
217+
"window_s": window_s if window_s is not None else self.max_window_s,
218+
"subscribers": subscribers,
219+
"in_flight": self.leases.in_flight(),
220+
}

0 commit comments

Comments
 (0)