From f9886323847f6bd736a50a02653e4cd91223c500 Mon Sep 17 00:00:00 2001 From: LZL0 <12474488+LZL0@users.noreply.github.com> Date: Sat, 13 Dec 2025 16:58:36 +0100 Subject: [PATCH 1/4] Performance improvements. --- src/l0/drift.py | 144 ++++++++++++++++++++++++++---------------------- src/l0/state.py | 7 ++- src/l0/types.py | 27 ++++++++- 3 files changed, 109 insertions(+), 69 deletions(-) diff --git a/src/l0/drift.py b/src/l0/drift.py index f6e63453..6a8bb646 100644 --- a/src/l0/drift.py +++ b/src/l0/drift.py @@ -17,6 +17,60 @@ from dataclasses import dataclass, field from typing import Any, Literal +# ───────────────────────────────────────────────────────────────────────────── +# Pre-compiled regex patterns for performance (avoids re-compilation per check) +# ───────────────────────────────────────────────────────────────────────────── + +# Meta commentary patterns (case-insensitive, checked on last 200 chars) +_META_COMMENTARY_PATTERNS: list[re.Pattern[str]] = [ + re.compile(r"as an ai", re.IGNORECASE), + re.compile(r"i'm an ai", re.IGNORECASE), + re.compile(r"i am an ai", re.IGNORECASE), + re.compile(r"i cannot actually", re.IGNORECASE), + re.compile(r"i don't have personal", re.IGNORECASE), + re.compile(r"i apologize, but i", re.IGNORECASE), + re.compile(r"i'm sorry, but i", re.IGNORECASE), + re.compile(r"let me explain", re.IGNORECASE), + re.compile(r"to clarify", re.IGNORECASE), + re.compile(r"in other words", re.IGNORECASE), +] + +# Tone shift patterns +_FORMAL_PATTERN: re.Pattern[str] = re.compile( + r"\b(therefore|thus|hence|moreover|furthermore|consequently)\b", re.IGNORECASE +) +_INFORMAL_PATTERN: re.Pattern[str] = re.compile( + r"\b(gonna|wanna|yeah|yep|nope|ok|okay)\b", re.IGNORECASE +) + +# Sentence split pattern +_SENTENCE_SPLIT_PATTERN: re.Pattern[str] = re.compile(r"[.!?]+") + +# Format collapse patterns (checked on first 100 chars) +_FORMAT_COLLAPSE_PATTERNS: list[re.Pattern[str]] = [ + re.compile(r"here is the .+?:", re.IGNORECASE), + re.compile(r"here's the .+?:", re.IGNORECASE), + re.compile(r"let me .+? for you", re.IGNORECASE), + re.compile(r"i'll .+? for you", re.IGNORECASE), + re.compile(r"here you go", re.IGNORECASE), +] + +# Markdown patterns +_MARKDOWN_PATTERNS: list[re.Pattern[str]] = [ + re.compile(r"```"), + re.compile(r"^#{1,6}\s", re.MULTILINE), + re.compile(r"\*\*.*?\*\*"), + re.compile(r"\[.*?\]\(.*?\)"), +] + +# Hedging patterns (checked on first line) +_HEDGING_PATTERNS: list[re.Pattern[str]] = [ + re.compile(r"^sure!?\s*$", re.IGNORECASE | re.MULTILINE), + re.compile(r"^certainly!?\s*$", re.IGNORECASE | re.MULTILINE), + re.compile(r"^of course!?\s*$", re.IGNORECASE | re.MULTILINE), + re.compile(r"^absolutely!?\s*$", re.IGNORECASE | re.MULTILINE), +] + # Drift types that can be detected DriftType = Literal[ "tone_shift", @@ -207,42 +261,27 @@ def check(self, content: str, delta: str | None = None) -> DriftResult: ) def _detect_meta_commentary(self, content: str) -> bool: - """Detect meta commentary patterns.""" - meta_patterns = [ - r"as an ai", - r"i'm an ai", - r"i am an ai", - r"i cannot actually", - r"i don't have personal", - r"i apologize, but i", - r"i'm sorry, but i", - r"let me explain", - r"to clarify", - r"in other words", - ] - + """Detect meta commentary patterns using pre-compiled regexes.""" # Check last 200 characters for meta commentary - recent = content[-200:].lower() - return any(re.search(pattern, recent) for pattern in meta_patterns) + recent = content[-200:] + return any(p.search(recent) for p in _META_COMMENTARY_PATTERNS) def _detect_tone_shift(self, content: str, previous_content: str) -> bool: - """Detect tone shift between old and new content.""" + """Detect tone shift between old and new content using pre-compiled regexes.""" if not previous_content or len(previous_content) < 100: return False # Simple heuristic: check if formality suddenly changes - recent_chunk = content[-200:].lower() - previous_chunk = previous_content[-200:].lower() + recent_chunk = content[-200:] + previous_chunk = previous_content[-200:] - # Count formal markers - formal_pattern = r"\b(therefore|thus|hence|moreover|furthermore|consequently)\b" - recent_formal = len(re.findall(formal_pattern, recent_chunk)) - previous_formal = len(re.findall(formal_pattern, previous_chunk)) + # Count formal markers using pre-compiled pattern + recent_formal = len(_FORMAL_PATTERN.findall(recent_chunk)) + previous_formal = len(_FORMAL_PATTERN.findall(previous_chunk)) - # Count informal markers - informal_pattern = r"\b(gonna|wanna|yeah|yep|nope|ok|okay)\b" - recent_informal = len(re.findall(informal_pattern, recent_chunk)) - previous_informal = len(re.findall(informal_pattern, previous_chunk)) + # Count informal markers using pre-compiled pattern + recent_informal = len(_INFORMAL_PATTERN.findall(recent_chunk)) + previous_informal = len(_INFORMAL_PATTERN.findall(previous_chunk)) # Check for sudden shift formal_shift = abs(recent_formal - previous_formal) > 2 @@ -251,11 +290,11 @@ def _detect_tone_shift(self, content: str, previous_content: str) -> bool: return formal_shift or informal_shift def _detect_repetition(self, content: str) -> bool: - """Detect excessive repetition.""" - # Split into sentences + """Detect excessive repetition using pre-compiled regex.""" + # Split into sentences using pre-compiled pattern sentences = [ s.strip().lower() - for s in re.split(r"[.!?]+", content) + for s in _SENTENCE_SPLIT_PATTERN.split(content) if len(s.strip()) > 20 ] @@ -326,59 +365,34 @@ def _detect_entropy_spike(self) -> bool: return last > mean + self.config.entropy_threshold * std_dev def _detect_format_collapse(self, content: str) -> bool: - """Detect format collapse (mixing instruction with output).""" - collapse_patterns = [ - r"here is the .+?:", - r"here's the .+?:", - r"let me .+? for you", - r"i'll .+? for you", - r"here you go", - ] - + """Detect format collapse using pre-compiled regexes.""" # Only check beginning of content - beginning = content[:100].lower() - return any(re.search(pattern, beginning) for pattern in collapse_patterns) + beginning = content[:100] + return any(p.search(beginning) for p in _FORMAT_COLLAPSE_PATTERNS) def _detect_markdown_collapse(self, content: str, previous_content: str) -> bool: - """Detect markdown to plaintext collapse.""" + """Detect markdown to plaintext collapse using pre-compiled regexes.""" if not previous_content or len(previous_content) < 100: return False - # Count markdown elements in recent chunks - markdown_patterns = [ - r"```", - r"^#{1,6}\s", - r"\*\*.*?\*\*", - r"\[.*?\]\(.*?\)", - ] - recent = content[-200:] previous = previous_content[-200:] recent_markdown = 0 previous_markdown = 0 - for pattern in markdown_patterns: - recent_markdown += len(re.findall(pattern, recent, re.MULTILINE)) - previous_markdown += len(re.findall(pattern, previous, re.MULTILINE)) + # Count markdown elements using pre-compiled patterns + for pattern in _MARKDOWN_PATTERNS: + recent_markdown += len(pattern.findall(recent)) + previous_markdown += len(pattern.findall(previous)) # Check if markdown suddenly drops return previous_markdown > 3 and recent_markdown == 0 def _detect_excessive_hedging(self, content: str) -> bool: - """Detect excessive hedging at start.""" - hedging_patterns = [ - r"^sure!?\s*$", - r"^certainly!?\s*$", - r"^of course!?\s*$", - r"^absolutely!?\s*$", - ] - + """Detect excessive hedging at start using pre-compiled regexes.""" first_line = content.strip().split("\n")[0] if content.strip() else "" - return any( - re.search(pattern, first_line, re.IGNORECASE | re.MULTILINE) - for pattern in hedging_patterns - ) + return any(p.search(first_line) for p in _HEDGING_PATTERNS) def reset(self) -> None: """Reset detector state.""" diff --git a/src/l0/state.py b/src/l0/state.py index eb9562cd..a8a5a4c9 100644 --- a/src/l0/state.py +++ b/src/l0/state.py @@ -18,12 +18,15 @@ def update_checkpoint(state: State) -> None: def append_token(state: State, token: str) -> None: - """Append token to content and update timing.""" + """Append token to content and update timing. + + Uses O(1) amortized buffer append instead of O(n) string concatenation. + """ now = time.time() if state.first_token_at is None: state.first_token_at = now state.last_token_at = now - state.content += token + state.append_content(token) # O(1) amortized state.token_count += 1 diff --git a/src/l0/types.py b/src/l0/types.py index 74750379..39b22fdc 100644 --- a/src/l0/types.py +++ b/src/l0/types.py @@ -262,9 +262,14 @@ class BackoffStrategy(str, Enum): @dataclass class State: - """Runtime state tracking.""" + """Runtime state tracking. - content: str = "" + Uses an internal buffer for O(1) token appends instead of O(n) string concatenation. + Access `content` property to get the accumulated string (flushes buffer automatically). + """ + + _content: str = "" + _content_buffer: list[str] = field(default_factory=list) checkpoint: str = "" # Last known good slice for continuation token_count: int = 0 model_retry_count: int = 0 @@ -289,6 +294,24 @@ class State: deduplication_applied: bool = False # Whether deduplication removed overlap overlap_removed: str | None = None # The overlapping text that was removed + @property + def content(self) -> str: + """Get accumulated content, flushing buffer if needed.""" + if self._content_buffer: + self._content = self._content + "".join(self._content_buffer) + self._content_buffer.clear() + return self._content + + @content.setter + def content(self, value: str) -> None: + """Set content directly, clearing any buffered tokens.""" + self._content = value + self._content_buffer.clear() + + def append_content(self, token: str) -> None: + """Append token to content buffer (O(1) amortized).""" + self._content_buffer.append(token) + # ───────────────────────────────────────────────────────────────────────────── # Retry + Timeout From 040c888313b9184c41cedb8e2cc3e04b3224cc7c Mon Sep 17 00:00:00 2001 From: LZL0 <12474488+LZL0@users.noreply.github.com> Date: Sat, 13 Dec 2025 17:00:36 +0100 Subject: [PATCH 2/4] Use uuidv7 for callbacks. --- src/l0/runtime.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/src/l0/runtime.py b/src/l0/runtime.py index b6a601de..ac1fa5cf 100644 --- a/src/l0/runtime.py +++ b/src/l0/runtime.py @@ -39,6 +39,23 @@ def __init__(self, message: str, timeout_type: str, timeout_seconds: float): self.timeout_seconds = timeout_seconds +# ───────────────────────────────────────────────────────────────────────────── +# Fast callback ID generation using UUIDv7 (time-ordered, faster than UUIDv4) +# ───────────────────────────────────────────────────────────────────────────── + +import uuid6 + + +def _next_callback_id() -> str: + """Generate a callback ID using UUIDv7. + + UUIDv7 is time-ordered and faster than UUIDv4 since it uses + timestamp + random bits instead of pure crypto-random. + Provides global uniqueness for distributed tracing. + """ + return f"cb_{uuid6.uuid7().hex[:12]}" + + if TYPE_CHECKING: from .events import ObservabilityEvent from .guardrails import GuardrailRule, GuardrailViolation @@ -676,8 +693,6 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]: state.token_count % guardrail_interval == 0 and guardrails ): - import uuid - phase_start_time = time.perf_counter() event_bus.emit( ObservabilityEventType.GUARDRAIL_PHASE_START, @@ -687,7 +702,7 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]: all_violations = [] for idx, rule in enumerate(guardrails): - callback_id = f"cb_{uuid.uuid4().hex[:12]}" + callback_id = _next_callback_id() rule_start_time = time.perf_counter() event_bus.emit( ObservabilityEventType.GUARDRAIL_RULE_START, @@ -857,8 +872,6 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]: # Run final guardrail check (for completion-only rules) if guardrails: - import uuid - final_phase_start_time = time.perf_counter() event_bus.emit( ObservabilityEventType.GUARDRAIL_PHASE_START, @@ -868,7 +881,7 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]: all_violations = [] for idx, rule in enumerate(guardrails): - callback_id = f"cb_{uuid.uuid4().hex[:12]}" + callback_id = _next_callback_id() rule_start_time = time.perf_counter() event_bus.emit( ObservabilityEventType.GUARDRAIL_RULE_START, From 6e6c820cceb557e2e5729554cb5415fb9a967982 Mon Sep 17 00:00:00 2001 From: LZL0 <12474488+LZL0@users.noreply.github.com> Date: Sat, 13 Dec 2025 17:02:09 +0100 Subject: [PATCH 3/4] Update types.py --- src/l0/types.py | 100 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 74 insertions(+), 26 deletions(-) diff --git a/src/l0/types.py b/src/l0/types.py index 39b22fdc..01cb374c 100644 --- a/src/l0/types.py +++ b/src/l0/types.py @@ -260,7 +260,6 @@ class BackoffStrategy(str, Enum): # ───────────────────────────────────────────────────────────────────────────── -@dataclass class State: """Runtime state tracking. @@ -268,31 +267,80 @@ class State: Access `content` property to get the accumulated string (flushes buffer automatically). """ - _content: str = "" - _content_buffer: list[str] = field(default_factory=list) - checkpoint: str = "" # Last known good slice for continuation - token_count: int = 0 - model_retry_count: int = 0 - network_retry_count: int = 0 - fallback_index: int = 0 - violations: "list[GuardrailViolation]" = field(default_factory=list) - drift_detected: bool = False - completed: bool = False - aborted: bool = False - first_token_at: float | None = None - last_token_at: float | None = None - duration: float | None = None - resumed: bool = False # Whether stream was resumed from checkpoint - network_errors: "list[NetworkError]" = field(default_factory=list) - # Multimodal state - data_outputs: list[DataPayload] = field(default_factory=list) - last_progress: Progress | None = None - # Continuation state (for observability) - resume_point: str | None = None # The checkpoint content used for resume - resume_from: int | None = None # Character offset where resume occurred - continuation_used: bool = False # Whether continuation was actually used - deduplication_applied: bool = False # Whether deduplication removed overlap - overlap_removed: str | None = None # The overlapping text that was removed + __slots__ = ( + "_content", + "_content_buffer", + "checkpoint", + "token_count", + "model_retry_count", + "network_retry_count", + "fallback_index", + "violations", + "drift_detected", + "completed", + "aborted", + "first_token_at", + "last_token_at", + "duration", + "resumed", + "network_errors", + "data_outputs", + "last_progress", + "resume_point", + "resume_from", + "continuation_used", + "deduplication_applied", + "overlap_removed", + ) + + def __init__( + self, + content: str = "", + checkpoint: str = "", + token_count: int = 0, + model_retry_count: int = 0, + network_retry_count: int = 0, + fallback_index: int = 0, + violations: "list[GuardrailViolation] | None" = None, + drift_detected: bool = False, + completed: bool = False, + aborted: bool = False, + first_token_at: float | None = None, + last_token_at: float | None = None, + duration: float | None = None, + resumed: bool = False, + network_errors: "list[NetworkError] | None" = None, + data_outputs: list[DataPayload] | None = None, + last_progress: Progress | None = None, + resume_point: str | None = None, + resume_from: int | None = None, + continuation_used: bool = False, + deduplication_applied: bool = False, + overlap_removed: str | None = None, + ) -> None: + self._content = content + self._content_buffer: list[str] = [] + self.checkpoint = checkpoint + self.token_count = token_count + self.model_retry_count = model_retry_count + self.network_retry_count = network_retry_count + self.fallback_index = fallback_index + self.violations = violations if violations is not None else [] + self.drift_detected = drift_detected + self.completed = completed + self.aborted = aborted + self.first_token_at = first_token_at + self.last_token_at = last_token_at + self.duration = duration + self.resumed = resumed + self.network_errors = network_errors if network_errors is not None else [] + self.data_outputs = data_outputs if data_outputs is not None else [] + self.last_progress = last_progress + self.resume_point = resume_point + self.resume_from = resume_from + self.continuation_used = continuation_used + self.deduplication_applied = deduplication_applied + self.overlap_removed = overlap_removed @property def content(self) -> str: From 3d67fa49a3e76885d78b02bad326fc150d9ef0d1 Mon Sep 17 00:00:00 2001 From: LZL0 <12474488+LZL0@users.noreply.github.com> Date: Sat, 13 Dec 2025 17:17:04 +0100 Subject: [PATCH 4/4] Update BENCHMARKS.md --- BENCHMARKS.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index edc783cf..cc4aa635 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -4,7 +4,7 @@ Performance benchmarks measuring L0 overhead on high-throughput streaming. ## Test Environment -- **CPU**: AMD Ryzen 9 5950X (16 cores, 32 threads) +- **CPU**: Apple M1 Pro - **Runtime**: Python 3.13 with pytest-asyncio - **Methodology**: Mock token streams with zero inter-token delay to measure pure L0 overhead @@ -12,12 +12,12 @@ Performance benchmarks measuring L0 overhead on high-throughput streaming. | Scenario | Tokens/s | Avg Duration | TTFT | Overhead | |----------|----------|--------------|------|----------| -| Baseline (raw streaming) | 1,100,911 | 1.82 ms | 0.02 ms | - | -| L0 Core (no features) | 382,389 | 5.23 ms | 0.09 ms | 188% | -| L0 + JSON Guardrail | 329,869 | 6.06 ms | 0.08 ms | 234% | -| L0 + All Guardrails | 266,112 | 7.52 ms | 0.09 ms | 314% | -| L0 + Drift Detection | 107,501 | 18.61 ms | 0.09 ms | 924% | -| L0 Full Stack | 92,952 | 21.52 ms | 0.09 ms | 1084% | +| Baseline (raw streaming) | 1,486,663 | 1.35 ms | 0.01 ms | - | +| L0 Core (no features) | 643,240 | 3.11 ms | 0.06 ms | 131% | +| L0 + JSON Guardrail | 525,381 | 3.81 ms | 0.07 ms | 183% | +| L0 + All Guardrails | 400,643 | 4.99 ms | 0.07 ms | 271% | +| L0 + Drift Detection | 125,256 | 15.97 ms | 0.08 ms | 1086% | +| L0 Full Stack | 112,414 | 17.79 ms | 0.06 ms | 1221% | **Legend:** - **Tokens/s** = Throughput (higher is better) @@ -60,12 +60,12 @@ result = await l0.run( ## Blackwell Ready -Even with full guardrails, drift detection, and checkpointing enabled, L0 sustains **90K+ tokens/s** - well above current LLM inference speeds and ready for Nvidia Blackwell's 1000+ tokens/s streaming. +Even with full guardrails, drift detection, and checkpointing enabled, L0 sustains **112K+ tokens/s** - well above current LLM inference speeds and ready for Nvidia Blackwell's 1000+ tokens/s streaming. | GPU Generation | Expected Tokens/s | L0 Headroom | |----------------|-------------------|-------------| -| Current (H100) | ~100-200 | 450-900x | -| Blackwell (B200) | ~1000+ | 90x | +| Current (H100) | ~100-200 | 560-1120x | +| Blackwell (B200) | ~1000+ | 112x | ## Running Benchmarks