Skip to content

Commit 8ec63c3

Browse files
authored
Merge pull request #7 from ai-2070/performance
Nvidia Blackwell Support
2 parents d6a9a05 + dcb1a47 commit 8ec63c3

File tree

7 files changed

+1275
-31
lines changed

7 files changed

+1275
-31
lines changed

ADVANCED.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ async def main():
5858
# Checkpoint resumption (resume from last good token on failure)
5959
continue_from_last_good_token=True,
6060

61-
# Check intervals
62-
check_intervals={"guardrails": 5, "drift": 10, "checkpoint": 10},
61+
# Check intervals (optimized for high-throughput streaming)
62+
check_intervals={"guardrails": 15, "drift": 25, "checkpoint": 20},
6363

6464
# Event callback for observability
6565
on_event=lambda event: print(f"[{event.type}]"),

BENCHMARKS.md

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# Benchmarks
2+
3+
Performance benchmarks measuring L0 overhead on high-throughput streaming.
4+
5+
## Test Environment
6+
7+
- **CPU**: AMD Ryzen 9 5950X (16 cores, 32 threads)
8+
- **Runtime**: Python 3.13 with pytest-asyncio
9+
- **Methodology**: Mock token streams with zero inter-token delay to measure pure L0 overhead
10+
11+
## Results
12+
13+
| Scenario | Tokens/s | Avg Duration | TTFT | Overhead |
14+
|----------|----------|--------------|------|----------|
15+
| Baseline (raw streaming) | 1,100,911 | 1.82 ms | 0.02 ms | - |
16+
| L0 Core (no features) | 382,389 | 5.23 ms | 0.09 ms | 188% |
17+
| L0 + JSON Guardrail | 329,869 | 6.06 ms | 0.08 ms | 234% |
18+
| L0 + All Guardrails | 266,112 | 7.52 ms | 0.09 ms | 314% |
19+
| L0 + Drift Detection | 107,501 | 18.61 ms | 0.09 ms | 924% |
20+
| L0 Full Stack | 92,952 | 21.52 ms | 0.09 ms | 1084% |
21+
22+
**Legend:**
23+
- **Tokens/s** = Throughput (higher is better)
24+
- **Avg Duration** = Average total duration for 2000 tokens
25+
- **TTFT** = Time to first token (lower is better)
26+
- **Overhead** = % slower than baseline
27+
28+
## Key Optimizations
29+
30+
L0 includes several optimizations for high-throughput streaming:
31+
32+
### 1. Incremental JSON State Tracking
33+
Instead of re-scanning the entire content on each guardrail check, L0 tracks JSON structure incrementally:
34+
- **O(delta)** per token instead of **O(content)**
35+
- Only performs full content scan at stream completion
36+
37+
### 2. Sliding Window Drift Detection
38+
Drift detection uses a sliding window (default 500 characters) instead of scanning full content:
39+
- Meta commentary, tone shift, repetition checks operate on window only
40+
- Configurable via `DriftConfig.sliding_window_size`
41+
42+
### 3. Tunable Check Intervals
43+
Default intervals optimized for high throughput:
44+
- **Guardrails**: Every 15 tokens (was 5)
45+
- **Drift**: Every 25 tokens (was 10)
46+
- **Checkpoint**: Every 20 tokens (was 10)
47+
48+
Configure via `check_intervals`:
49+
```python
50+
from l0.guardrails import json_rule
51+
from l0.types import CheckIntervals
52+
import l0
53+
54+
result = await l0.run(
55+
stream=my_stream,
56+
guardrails=[json_rule()],
57+
check_intervals=CheckIntervals(guardrails=15, drift=25, checkpoint=20),
58+
)
59+
```
60+
61+
## Blackwell Ready
62+
63+
Even with full guardrails, drift detection, and checkpointing enabled, L0 sustains **90K+ tokens/s** - well above current LLM inference speeds and ready for Nvidia Blackwell's 1000+ tokens/s streaming.
64+
65+
| GPU Generation | Expected Tokens/s | L0 Headroom |
66+
|----------------|-------------------|-------------|
67+
| Current (H100) | ~100-200 | 450-900x |
68+
| Blackwell (B200) | ~1000+ | 90x |
69+
70+
## Running Benchmarks
71+
72+
```bash
73+
uv run pytest tests/test_benchmark.py::TestComprehensiveReport -v -s
74+
```
75+
76+
To run all benchmark tests:
77+
```bash
78+
uv run pytest tests/test_benchmark.py -v
79+
```
80+
81+
## Benchmark Scenarios
82+
83+
### Baseline
84+
Raw async iteration without L0 - measures the cost of the mock stream itself.
85+
86+
### L0 Core
87+
Minimal L0 wrapper with no guardrails or drift detection. Measures the base cost of the L0 runtime.
88+
89+
### L0 + JSON Guardrail
90+
L0 with `json_rule()` enabled. Tests incremental JSON structure validation.
91+
92+
### L0 + All Guardrails
93+
L0 with `json_rule()`, `markdown_rule()`, and `zero_output_rule()`. Tests multiple guardrail overhead.
94+
95+
### L0 + Drift Detection
96+
L0 with drift detection enabled. Tests sliding window analysis overhead.
97+
98+
### L0 Full Stack
99+
L0 with all features: JSON, Markdown, zero-output guardrails, drift detection, and checkpointing. Represents real-world production usage.

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ L0 includes 1,800+ tests covering all major reliability features.
7777
| **🔧 Own Retry Logic** | No external dependencies (no tenacity). L0 controls all retry behavior for predictable execution. |
7878
| **📝 Type-Safe** | Full type hints with `py.typed` marker. Passes mypy strict mode. |
7979
| **📦 Minimal Dependencies** | Only httpx, pydantic, orjson, typing-extensions, uuid6. No heavy abstractions. |
80+
| **🚀 Nvidia Blackwell-Ready** | Optimized for 1000+ tokens/s streaming. Ready for next-gen GPU inference speeds. |
8081
| **🧪 Battle-Tested** | 1,800+ unit tests and 100+ integration tests validating real streaming, retries, and advanced behavior. |
8182

8283
> **Know what you're doing?** [Skip the tutorial](./ADVANCED.md)

src/l0/drift.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ class DriftConfig:
7171
entropy_window: int = 50
7272
"""Window size for entropy calculation."""
7373

74+
sliding_window_size: int = 500
75+
"""Size of sliding window for content analysis (chars). Only the last N chars are analyzed."""
76+
7477

7578
@dataclass
7679
class _DriftHistory:
@@ -108,6 +111,16 @@ def __init__(self, config: DriftConfig | None = None) -> None:
108111
self.config = config or DriftConfig()
109112
self._history = _DriftHistory()
110113

114+
def _get_window(self, content: str) -> str:
115+
"""Get sliding window of content for analysis.
116+
117+
Uses only the last N characters to avoid O(content_length) scanning.
118+
"""
119+
window_size = self.config.sliding_window_size
120+
if len(content) <= window_size:
121+
return content
122+
return content[-window_size:]
123+
111124
def check(self, content: str, delta: str | None = None) -> DriftResult:
112125
"""Check content for drift.
113126
@@ -122,29 +135,33 @@ def check(self, content: str, delta: str | None = None) -> DriftResult:
122135
confidence = 0.0
123136
details: list[str] = []
124137

138+
# Use sliding window for content analysis (O(window_size) instead of O(content_length))
139+
window = self._get_window(content)
140+
last_window = self._get_window(self._history.last_content)
141+
125142
# Update history
126143
if delta:
127144
self._history.tokens.append(delta)
128145
if len(self._history.tokens) > self.config.entropy_window:
129146
self._history.tokens.pop(0)
130147

131-
# Check for meta commentary
148+
# Check for meta commentary (on window only)
132149
if self.config.detect_meta_commentary:
133-
if self._detect_meta_commentary(content):
150+
if self._detect_meta_commentary(window):
134151
types.append("meta_commentary")
135152
confidence = max(confidence, 0.9)
136153
details.append("Meta commentary detected")
137154

138-
# Check for tone shift
155+
# Check for tone shift (on windows only)
139156
if self.config.detect_tone_shift:
140-
if self._detect_tone_shift(content, self._history.last_content):
157+
if self._detect_tone_shift(window, last_window):
141158
types.append("tone_shift")
142159
confidence = max(confidence, 0.7)
143160
details.append("Tone shift detected")
144161

145-
# Check for repetition
162+
# Check for repetition (on window only)
146163
if self.config.detect_repetition:
147-
if self._detect_repetition(content):
164+
if self._detect_repetition(window):
148165
types.append("repetition")
149166
confidence = max(confidence, 0.8)
150167
details.append("Excessive repetition detected")
@@ -161,19 +178,19 @@ def check(self, content: str, delta: str | None = None) -> DriftResult:
161178
confidence = max(confidence, 0.6)
162179
details.append("Entropy spike detected")
163180

164-
# Check for format collapse
181+
# Check for format collapse (already uses first 100 chars)
165182
if self._detect_format_collapse(content):
166183
types.append("format_collapse")
167184
confidence = max(confidence, 0.8)
168185
details.append("Format collapse detected")
169186

170-
# Check for markdown collapse
171-
if self._detect_markdown_collapse(content, self._history.last_content):
187+
# Check for markdown collapse (on windows only)
188+
if self._detect_markdown_collapse(window, last_window):
172189
types.append("markdown_collapse")
173190
confidence = max(confidence, 0.7)
174191
details.append("Markdown formatting collapse detected")
175192

176-
# Check for excessive hedging
193+
# Check for excessive hedging (already uses first line only)
177194
if self._detect_excessive_hedging(content):
178195
types.append("hedging")
179196
confidence = max(confidence, 0.5)

src/l0/guardrails.py

Lines changed: 98 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,63 @@ def all_patterns(self) -> list[str]:
657657
# ─────────────────────────────────────────────────────────────────────────────
658658

659659

660+
@dataclass
661+
class IncrementalJsonState:
662+
"""State for incremental JSON parsing (O(delta) per token instead of O(content))."""
663+
664+
open_braces: int = 0
665+
close_braces: int = 0
666+
open_brackets: int = 0
667+
close_brackets: int = 0
668+
in_string: bool = False
669+
escape_next: bool = False
670+
processed_length: int = 0
671+
672+
673+
def update_json_state_incremental(
674+
state: IncrementalJsonState,
675+
delta: str,
676+
) -> IncrementalJsonState:
677+
"""Update JSON state incrementally with new delta content.
678+
679+
Only processes the delta, not the full content - O(delta) per call.
680+
681+
Args:
682+
state: Current incremental state
683+
delta: New content to process
684+
685+
Returns:
686+
Updated state (mutates and returns the same object)
687+
"""
688+
for char in delta:
689+
if state.escape_next:
690+
state.escape_next = False
691+
continue
692+
693+
if char == "\\" and state.in_string:
694+
state.escape_next = True
695+
continue
696+
697+
if char == '"' and not state.escape_next:
698+
state.in_string = not state.in_string
699+
continue
700+
701+
if state.in_string:
702+
continue
703+
704+
if char == "{":
705+
state.open_braces += 1
706+
elif char == "}":
707+
state.close_braces += 1
708+
elif char == "[":
709+
state.open_brackets += 1
710+
elif char == "]":
711+
state.close_brackets += 1
712+
713+
state.processed_length += len(delta)
714+
return state
715+
716+
660717
@dataclass
661718
class JsonAnalysis:
662719
"""Result of JSON structure analysis."""
@@ -1493,24 +1550,52 @@ def json_rule() -> GuardrailRule:
14931550
- Unclosed strings
14941551
- Multiple consecutive commas
14951552
- Malformed patterns like {, or [,
1553+
1554+
Uses incremental state tracking for O(delta) per-token updates instead of
1555+
O(content) full scans during streaming. Only does full analysis at completion.
1556+
1557+
Note: State is reset when content is empty or shorter than processed length
1558+
to handle new streams, aborted streams, or rule reuse.
14961559
"""
1560+
# Incremental state for O(delta) streaming checks
1561+
incremental_state = IncrementalJsonState()
1562+
last_content_length = 0
14971563

14981564
def check(state: State) -> list[GuardrailViolation]:
1565+
nonlocal incremental_state, last_content_length
1566+
14991567
content = state.content
15001568
if not content.strip():
1569+
# Reset state when content is empty (new stream starting)
1570+
incremental_state = IncrementalJsonState()
1571+
last_content_length = 0
15011572
return []
15021573

15031574
# Only check if it looks like JSON
15041575
if not looks_like_json(content):
1576+
# Reset state when content doesn't look like JSON
1577+
incremental_state = IncrementalJsonState()
1578+
last_content_length = 0
15051579
return []
15061580

1507-
analysis = analyze_json_structure(content)
1581+
# Reset state if content is shorter than what we've processed
1582+
# (indicates a new stream or aborted stream being reused)
1583+
if len(content) < last_content_length:
1584+
incremental_state = IncrementalJsonState()
1585+
last_content_length = 0
1586+
15081587
violations = []
15091588

1510-
# During streaming, only report critical issues
1589+
# During streaming, use incremental state tracking (O(delta) instead of O(content))
15111590
if not state.completed:
1512-
# Too many closes is always bad
1513-
if analysis.close_braces > analysis.open_braces:
1591+
# Get delta since last check
1592+
if len(content) > last_content_length:
1593+
delta = content[last_content_length:]
1594+
update_json_state_incremental(incremental_state, delta)
1595+
last_content_length = len(content)
1596+
1597+
# Check for critical issues using incremental state
1598+
if incremental_state.close_braces > incremental_state.open_braces:
15141599
violations.append(
15151600
GuardrailViolation(
15161601
rule="json",
@@ -1519,7 +1604,7 @@ def check(state: State) -> list[GuardrailViolation]:
15191604
suggestion="Check JSON structure",
15201605
)
15211606
)
1522-
if analysis.close_brackets > analysis.open_brackets:
1607+
if incremental_state.close_brackets > incremental_state.open_brackets:
15231608
violations.append(
15241609
GuardrailViolation(
15251610
rule="json",
@@ -1528,18 +1613,11 @@ def check(state: State) -> list[GuardrailViolation]:
15281613
suggestion="Check JSON structure",
15291614
)
15301615
)
1531-
# Report malformed patterns immediately
1532-
for issue in analysis.issues:
1533-
if "Malformed pattern" in issue or "consecutive commas" in issue:
1534-
violations.append(
1535-
GuardrailViolation(
1536-
rule="json",
1537-
message=issue,
1538-
severity="error",
1539-
)
1540-
)
15411616
else:
1542-
# On completion, check for both extra closes AND missing closes
1617+
# On completion, do full analysis for comprehensive check
1618+
analysis = analyze_json_structure(content)
1619+
1620+
# Check for both extra closes AND missing closes
15431621
if analysis.close_braces > analysis.open_braces:
15441622
violations.append(
15451623
GuardrailViolation(
@@ -1583,6 +1661,10 @@ def check(state: State) -> list[GuardrailViolation]:
15831661
)
15841662
)
15851663

1664+
# Reset incremental state for potential reuse
1665+
incremental_state = IncrementalJsonState()
1666+
last_content_length = 0
1667+
15861668
return violations
15871669

15881670
return GuardrailRule(

src/l0/types.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -747,9 +747,11 @@ class CheckIntervals:
747747
intervals = CheckIntervals(guardrails=50, drift=100, checkpoint=50)
748748
"""
749749

750-
guardrails: int = 5 # Check guardrails every N tokens
751-
drift: int = 10 # Check drift every N tokens
752-
checkpoint: int = 10 # Save checkpoint every N tokens
750+
guardrails: int = (
751+
15 # Check guardrails every N tokens (optimized for high throughput)
752+
)
753+
drift: int = 25 # Check drift every N tokens (optimized for high throughput)
754+
checkpoint: int = 20 # Save checkpoint every N tokens
753755

754756

755757
# ─────────────────────────────────────────────────────────────────────────────

0 commit comments

Comments
 (0)