Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ADVANCED.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ async def main():
# Checkpoint resumption (resume from last good token on failure)
continue_from_last_good_token=True,

# Check intervals
check_intervals={"guardrails": 5, "drift": 10, "checkpoint": 10},
# Check intervals (optimized for high-throughput streaming)
check_intervals={"guardrails": 15, "drift": 25, "checkpoint": 20},

# Event callback for observability
on_event=lambda event: print(f"[{event.type}]"),
Expand Down
99 changes: 99 additions & 0 deletions BENCHMARKS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Benchmarks

Performance benchmarks measuring L0 overhead on high-throughput streaming.

## Test Environment

- **CPU**: AMD Ryzen 9 5950X (16 cores, 32 threads)
- **Runtime**: Python 3.13 with pytest-asyncio
- **Methodology**: Mock token streams with zero inter-token delay to measure pure L0 overhead

## Results

| Scenario | Tokens/s | Avg Duration | TTFT | Overhead |
|----------|----------|--------------|------|----------|
| Baseline (raw streaming) | 1,100,911 | 1.82 ms | 0.02 ms | - |
| L0 Core (no features) | 382,389 | 5.23 ms | 0.09 ms | 188% |
| L0 + JSON Guardrail | 329,869 | 6.06 ms | 0.08 ms | 234% |
| L0 + All Guardrails | 266,112 | 7.52 ms | 0.09 ms | 314% |
| L0 + Drift Detection | 107,501 | 18.61 ms | 0.09 ms | 924% |
| L0 Full Stack | 92,952 | 21.52 ms | 0.09 ms | 1084% |

**Legend:**
- **Tokens/s** = Throughput (higher is better)
- **Avg Duration** = Average total duration for 2000 tokens
- **TTFT** = Time to first token (lower is better)
- **Overhead** = % slower than baseline

## Key Optimizations

L0 includes several optimizations for high-throughput streaming:

### 1. Incremental JSON State Tracking
Instead of re-scanning the entire content on each guardrail check, L0 tracks JSON structure incrementally:
- **O(delta)** per token instead of **O(content)**
- Only performs full content scan at stream completion

### 2. Sliding Window Drift Detection
Drift detection uses a sliding window (default 500 characters) instead of scanning full content:
- Meta commentary, tone shift, repetition checks operate on window only
- Configurable via `DriftConfig.sliding_window_size`

### 3. Tunable Check Intervals
Default intervals optimized for high throughput:
- **Guardrails**: Every 15 tokens (was 5)
- **Drift**: Every 25 tokens (was 10)
- **Checkpoint**: Every 20 tokens (was 10)

Configure via `check_intervals`:
```python
from l0.guardrails import json_rule
from l0.types import CheckIntervals
import l0

result = await l0.run(
stream=my_stream,
guardrails=[json_rule()],
check_intervals=CheckIntervals(guardrails=15, drift=25, checkpoint=20),
)
```

## Blackwell Ready

Even with full guardrails, drift detection, and checkpointing enabled, L0 sustains **90K+ tokens/s** - well above current LLM inference speeds and ready for Nvidia Blackwell's 1000+ tokens/s streaming.

| GPU Generation | Expected Tokens/s | L0 Headroom |
|----------------|-------------------|-------------|
| Current (H100) | ~100-200 | 450-900x |
| Blackwell (B200) | ~1000+ | 90x |

## Running Benchmarks

```bash
uv run pytest tests/test_benchmark.py::TestComprehensiveReport -v -s
```

To run all benchmark tests:
```bash
uv run pytest tests/test_benchmark.py -v
```

## Benchmark Scenarios

### Baseline
Raw async iteration without L0 - measures the cost of the mock stream itself.

### L0 Core
Minimal L0 wrapper with no guardrails or drift detection. Measures the base cost of the L0 runtime.

### L0 + JSON Guardrail
L0 with `json_rule()` enabled. Tests incremental JSON structure validation.

### L0 + All Guardrails
L0 with `json_rule()`, `markdown_rule()`, and `zero_output_rule()`. Tests multiple guardrail overhead.

### L0 + Drift Detection
L0 with drift detection enabled. Tests sliding window analysis overhead.

### L0 Full Stack
L0 with all features: JSON, Markdown, zero-output guardrails, drift detection, and checkpointing. Represents real-world production usage.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ L0 includes 1,800+ tests covering all major reliability features.
| **🔧 Own Retry Logic** | No external dependencies (no tenacity). L0 controls all retry behavior for predictable execution. |
| **📝 Type-Safe** | Full type hints with `py.typed` marker. Passes mypy strict mode. |
| **📦 Minimal Dependencies** | Only httpx, pydantic, orjson, typing-extensions, uuid6. No heavy abstractions. |
| **🚀 Nvidia Blackwell-Ready** | Optimized for 1000+ tokens/s streaming. Ready for next-gen GPU inference speeds. |
| **🧪 Battle-Tested** | 1,800+ unit tests and 100+ integration tests validating real streaming, retries, and advanced behavior. |

> **Know what you're doing?** [Skip the tutorial](./ADVANCED.md)
Expand Down
37 changes: 27 additions & 10 deletions src/l0/drift.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class DriftConfig:
entropy_window: int = 50
"""Window size for entropy calculation."""

sliding_window_size: int = 500
"""Size of sliding window for content analysis (chars). Only the last N chars are analyzed."""


@dataclass
class _DriftHistory:
Expand Down Expand Up @@ -108,6 +111,16 @@ def __init__(self, config: DriftConfig | None = None) -> None:
self.config = config or DriftConfig()
self._history = _DriftHistory()

def _get_window(self, content: str) -> str:
"""Get sliding window of content for analysis.

Uses only the last N characters to avoid O(content_length) scanning.
"""
window_size = self.config.sliding_window_size
if len(content) <= window_size:
return content
return content[-window_size:]

def check(self, content: str, delta: str | None = None) -> DriftResult:
"""Check content for drift.

Expand All @@ -122,29 +135,33 @@ def check(self, content: str, delta: str | None = None) -> DriftResult:
confidence = 0.0
details: list[str] = []

# Use sliding window for content analysis (O(window_size) instead of O(content_length))
window = self._get_window(content)
last_window = self._get_window(self._history.last_content)

# Update history
if delta:
self._history.tokens.append(delta)
if len(self._history.tokens) > self.config.entropy_window:
self._history.tokens.pop(0)

# Check for meta commentary
# Check for meta commentary (on window only)
if self.config.detect_meta_commentary:
if self._detect_meta_commentary(content):
if self._detect_meta_commentary(window):
types.append("meta_commentary")
confidence = max(confidence, 0.9)
details.append("Meta commentary detected")

# Check for tone shift
# Check for tone shift (on windows only)
if self.config.detect_tone_shift:
if self._detect_tone_shift(content, self._history.last_content):
if self._detect_tone_shift(window, last_window):
types.append("tone_shift")
confidence = max(confidence, 0.7)
details.append("Tone shift detected")

# Check for repetition
# Check for repetition (on window only)
if self.config.detect_repetition:
if self._detect_repetition(content):
if self._detect_repetition(window):
types.append("repetition")
confidence = max(confidence, 0.8)
details.append("Excessive repetition detected")
Expand All @@ -161,19 +178,19 @@ def check(self, content: str, delta: str | None = None) -> DriftResult:
confidence = max(confidence, 0.6)
details.append("Entropy spike detected")

# Check for format collapse
# Check for format collapse (already uses first 100 chars)
if self._detect_format_collapse(content):
types.append("format_collapse")
confidence = max(confidence, 0.8)
details.append("Format collapse detected")

# Check for markdown collapse
if self._detect_markdown_collapse(content, self._history.last_content):
# Check for markdown collapse (on windows only)
if self._detect_markdown_collapse(window, last_window):
types.append("markdown_collapse")
confidence = max(confidence, 0.7)
details.append("Markdown formatting collapse detected")

# Check for excessive hedging
# Check for excessive hedging (already uses first line only)
if self._detect_excessive_hedging(content):
types.append("hedging")
confidence = max(confidence, 0.5)
Expand Down
114 changes: 98 additions & 16 deletions src/l0/guardrails.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,63 @@ def all_patterns(self) -> list[str]:
# ─────────────────────────────────────────────────────────────────────────────


@dataclass
class IncrementalJsonState:
"""State for incremental JSON parsing (O(delta) per token instead of O(content))."""

open_braces: int = 0
close_braces: int = 0
open_brackets: int = 0
close_brackets: int = 0
in_string: bool = False
escape_next: bool = False
processed_length: int = 0


def update_json_state_incremental(
state: IncrementalJsonState,
delta: str,
) -> IncrementalJsonState:
"""Update JSON state incrementally with new delta content.

Only processes the delta, not the full content - O(delta) per call.

Args:
state: Current incremental state
delta: New content to process

Returns:
Updated state (mutates and returns the same object)
"""
for char in delta:
if state.escape_next:
state.escape_next = False
continue

if char == "\\" and state.in_string:
state.escape_next = True
continue

if char == '"' and not state.escape_next:
state.in_string = not state.in_string
continue

if state.in_string:
continue

if char == "{":
state.open_braces += 1
elif char == "}":
state.close_braces += 1
elif char == "[":
state.open_brackets += 1
elif char == "]":
state.close_brackets += 1

state.processed_length += len(delta)
return state


@dataclass
class JsonAnalysis:
"""Result of JSON structure analysis."""
Expand Down Expand Up @@ -1493,24 +1550,52 @@ def json_rule() -> GuardrailRule:
- Unclosed strings
- Multiple consecutive commas
- Malformed patterns like {, or [,

Uses incremental state tracking for O(delta) per-token updates instead of
O(content) full scans during streaming. Only does full analysis at completion.

Note: State is reset when content is empty or shorter than processed length
to handle new streams, aborted streams, or rule reuse.
"""
# Incremental state for O(delta) streaming checks
incremental_state = IncrementalJsonState()
last_content_length = 0

def check(state: State) -> list[GuardrailViolation]:
nonlocal incremental_state, last_content_length

content = state.content
if not content.strip():
# Reset state when content is empty (new stream starting)
incremental_state = IncrementalJsonState()
last_content_length = 0
return []

# Only check if it looks like JSON
if not looks_like_json(content):
# Reset state when content doesn't look like JSON
incremental_state = IncrementalJsonState()
last_content_length = 0
return []

analysis = analyze_json_structure(content)
# Reset state if content is shorter than what we've processed
# (indicates a new stream or aborted stream being reused)
if len(content) < last_content_length:
incremental_state = IncrementalJsonState()
last_content_length = 0

violations = []

# During streaming, only report critical issues
# During streaming, use incremental state tracking (O(delta) instead of O(content))
if not state.completed:
# Too many closes is always bad
if analysis.close_braces > analysis.open_braces:
# Get delta since last check
if len(content) > last_content_length:
delta = content[last_content_length:]
update_json_state_incremental(incremental_state, delta)
last_content_length = len(content)

# Check for critical issues using incremental state
if incremental_state.close_braces > incremental_state.open_braces:
violations.append(
GuardrailViolation(
rule="json",
Expand All @@ -1519,7 +1604,7 @@ def check(state: State) -> list[GuardrailViolation]:
suggestion="Check JSON structure",
)
)
if analysis.close_brackets > analysis.open_brackets:
if incremental_state.close_brackets > incremental_state.open_brackets:
violations.append(
GuardrailViolation(
rule="json",
Expand All @@ -1528,18 +1613,11 @@ def check(state: State) -> list[GuardrailViolation]:
suggestion="Check JSON structure",
)
)
# Report malformed patterns immediately
for issue in analysis.issues:
if "Malformed pattern" in issue or "consecutive commas" in issue:
violations.append(
GuardrailViolation(
rule="json",
message=issue,
severity="error",
)
)
else:
# On completion, check for both extra closes AND missing closes
# On completion, do full analysis for comprehensive check
analysis = analyze_json_structure(content)

# Check for both extra closes AND missing closes
if analysis.close_braces > analysis.open_braces:
violations.append(
GuardrailViolation(
Expand Down Expand Up @@ -1583,6 +1661,10 @@ def check(state: State) -> list[GuardrailViolation]:
)
)

# Reset incremental state for potential reuse
incremental_state = IncrementalJsonState()
last_content_length = 0

return violations

return GuardrailRule(
Expand Down
8 changes: 5 additions & 3 deletions src/l0/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,9 +747,11 @@ class CheckIntervals:
intervals = CheckIntervals(guardrails=50, drift=100, checkpoint=50)
"""

guardrails: int = 5 # Check guardrails every N tokens
drift: int = 10 # Check drift every N tokens
checkpoint: int = 10 # Save checkpoint every N tokens
guardrails: int = (
15 # Check guardrails every N tokens (optimized for high throughput)
)
drift: int = 25 # Check drift every N tokens (optimized for high throughput)
checkpoint: int = 20 # Save checkpoint every N tokens


# ─────────────────────────────────────────────────────────────────────────────
Expand Down
Loading