diff --git a/BENCHMARKS.md b/BENCHMARKS.md index ee56a0e..b2eb2a9 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -5,7 +5,7 @@ Performance benchmarks measuring L0 overhead on high-throughput streaming. ## Test Environment - **CPU**: Apple M1 Max (10 cores) -- **Runtime**: Python 3.13, pytest 9 with pytest-asyncio 1.3.0 +- **Runtime**: Python 3.13 with pytest-asyncio - **Methodology**: Mock token streams with zero inter-token delay to measure pure L0 overhead ## Results diff --git a/src/l0/drift.py b/src/l0/drift.py index f6e6345..6a8bb64 100644 --- a/src/l0/drift.py +++ b/src/l0/drift.py @@ -17,6 +17,60 @@ from dataclasses import dataclass, field from typing import Any, Literal +# ───────────────────────────────────────────────────────────────────────────── +# Pre-compiled regex patterns for performance (avoids re-compilation per check) +# ───────────────────────────────────────────────────────────────────────────── + +# Meta commentary patterns (case-insensitive, checked on last 200 chars) +_META_COMMENTARY_PATTERNS: list[re.Pattern[str]] = [ + re.compile(r"as an ai", re.IGNORECASE), + re.compile(r"i'm an ai", re.IGNORECASE), + re.compile(r"i am an ai", re.IGNORECASE), + re.compile(r"i cannot actually", re.IGNORECASE), + re.compile(r"i don't have personal", re.IGNORECASE), + re.compile(r"i apologize, but i", re.IGNORECASE), + re.compile(r"i'm sorry, but i", re.IGNORECASE), + re.compile(r"let me explain", re.IGNORECASE), + re.compile(r"to clarify", re.IGNORECASE), + re.compile(r"in other words", re.IGNORECASE), +] + +# Tone shift patterns +_FORMAL_PATTERN: re.Pattern[str] = re.compile( + r"\b(therefore|thus|hence|moreover|furthermore|consequently)\b", re.IGNORECASE +) +_INFORMAL_PATTERN: re.Pattern[str] = re.compile( + r"\b(gonna|wanna|yeah|yep|nope|ok|okay)\b", re.IGNORECASE +) + +# Sentence split pattern +_SENTENCE_SPLIT_PATTERN: re.Pattern[str] = re.compile(r"[.!?]+") + +# Format collapse patterns (checked on first 100 chars) +_FORMAT_COLLAPSE_PATTERNS: list[re.Pattern[str]] = [ + re.compile(r"here is the .+?:", re.IGNORECASE), + re.compile(r"here's the .+?:", re.IGNORECASE), + re.compile(r"let me .+? for you", re.IGNORECASE), + re.compile(r"i'll .+? for you", re.IGNORECASE), + re.compile(r"here you go", re.IGNORECASE), +] + +# Markdown patterns +_MARKDOWN_PATTERNS: list[re.Pattern[str]] = [ + re.compile(r"```"), + re.compile(r"^#{1,6}\s", re.MULTILINE), + re.compile(r"\*\*.*?\*\*"), + re.compile(r"\[.*?\]\(.*?\)"), +] + +# Hedging patterns (checked on first line) +_HEDGING_PATTERNS: list[re.Pattern[str]] = [ + re.compile(r"^sure!?\s*$", re.IGNORECASE | re.MULTILINE), + re.compile(r"^certainly!?\s*$", re.IGNORECASE | re.MULTILINE), + re.compile(r"^of course!?\s*$", re.IGNORECASE | re.MULTILINE), + re.compile(r"^absolutely!?\s*$", re.IGNORECASE | re.MULTILINE), +] + # Drift types that can be detected DriftType = Literal[ "tone_shift", @@ -207,42 +261,27 @@ def check(self, content: str, delta: str | None = None) -> DriftResult: ) def _detect_meta_commentary(self, content: str) -> bool: - """Detect meta commentary patterns.""" - meta_patterns = [ - r"as an ai", - r"i'm an ai", - r"i am an ai", - r"i cannot actually", - r"i don't have personal", - r"i apologize, but i", - r"i'm sorry, but i", - r"let me explain", - r"to clarify", - r"in other words", - ] - + """Detect meta commentary patterns using pre-compiled regexes.""" # Check last 200 characters for meta commentary - recent = content[-200:].lower() - return any(re.search(pattern, recent) for pattern in meta_patterns) + recent = content[-200:] + return any(p.search(recent) for p in _META_COMMENTARY_PATTERNS) def _detect_tone_shift(self, content: str, previous_content: str) -> bool: - """Detect tone shift between old and new content.""" + """Detect tone shift between old and new content using pre-compiled regexes.""" if not previous_content or len(previous_content) < 100: return False # Simple heuristic: check if formality suddenly changes - recent_chunk = content[-200:].lower() - previous_chunk = previous_content[-200:].lower() + recent_chunk = content[-200:] + previous_chunk = previous_content[-200:] - # Count formal markers - formal_pattern = r"\b(therefore|thus|hence|moreover|furthermore|consequently)\b" - recent_formal = len(re.findall(formal_pattern, recent_chunk)) - previous_formal = len(re.findall(formal_pattern, previous_chunk)) + # Count formal markers using pre-compiled pattern + recent_formal = len(_FORMAL_PATTERN.findall(recent_chunk)) + previous_formal = len(_FORMAL_PATTERN.findall(previous_chunk)) - # Count informal markers - informal_pattern = r"\b(gonna|wanna|yeah|yep|nope|ok|okay)\b" - recent_informal = len(re.findall(informal_pattern, recent_chunk)) - previous_informal = len(re.findall(informal_pattern, previous_chunk)) + # Count informal markers using pre-compiled pattern + recent_informal = len(_INFORMAL_PATTERN.findall(recent_chunk)) + previous_informal = len(_INFORMAL_PATTERN.findall(previous_chunk)) # Check for sudden shift formal_shift = abs(recent_formal - previous_formal) > 2 @@ -251,11 +290,11 @@ def _detect_tone_shift(self, content: str, previous_content: str) -> bool: return formal_shift or informal_shift def _detect_repetition(self, content: str) -> bool: - """Detect excessive repetition.""" - # Split into sentences + """Detect excessive repetition using pre-compiled regex.""" + # Split into sentences using pre-compiled pattern sentences = [ s.strip().lower() - for s in re.split(r"[.!?]+", content) + for s in _SENTENCE_SPLIT_PATTERN.split(content) if len(s.strip()) > 20 ] @@ -326,59 +365,34 @@ def _detect_entropy_spike(self) -> bool: return last > mean + self.config.entropy_threshold * std_dev def _detect_format_collapse(self, content: str) -> bool: - """Detect format collapse (mixing instruction with output).""" - collapse_patterns = [ - r"here is the .+?:", - r"here's the .+?:", - r"let me .+? for you", - r"i'll .+? for you", - r"here you go", - ] - + """Detect format collapse using pre-compiled regexes.""" # Only check beginning of content - beginning = content[:100].lower() - return any(re.search(pattern, beginning) for pattern in collapse_patterns) + beginning = content[:100] + return any(p.search(beginning) for p in _FORMAT_COLLAPSE_PATTERNS) def _detect_markdown_collapse(self, content: str, previous_content: str) -> bool: - """Detect markdown to plaintext collapse.""" + """Detect markdown to plaintext collapse using pre-compiled regexes.""" if not previous_content or len(previous_content) < 100: return False - # Count markdown elements in recent chunks - markdown_patterns = [ - r"```", - r"^#{1,6}\s", - r"\*\*.*?\*\*", - r"\[.*?\]\(.*?\)", - ] - recent = content[-200:] previous = previous_content[-200:] recent_markdown = 0 previous_markdown = 0 - for pattern in markdown_patterns: - recent_markdown += len(re.findall(pattern, recent, re.MULTILINE)) - previous_markdown += len(re.findall(pattern, previous, re.MULTILINE)) + # Count markdown elements using pre-compiled patterns + for pattern in _MARKDOWN_PATTERNS: + recent_markdown += len(pattern.findall(recent)) + previous_markdown += len(pattern.findall(previous)) # Check if markdown suddenly drops return previous_markdown > 3 and recent_markdown == 0 def _detect_excessive_hedging(self, content: str) -> bool: - """Detect excessive hedging at start.""" - hedging_patterns = [ - r"^sure!?\s*$", - r"^certainly!?\s*$", - r"^of course!?\s*$", - r"^absolutely!?\s*$", - ] - + """Detect excessive hedging at start using pre-compiled regexes.""" first_line = content.strip().split("\n")[0] if content.strip() else "" - return any( - re.search(pattern, first_line, re.IGNORECASE | re.MULTILINE) - for pattern in hedging_patterns - ) + return any(p.search(first_line) for p in _HEDGING_PATTERNS) def reset(self) -> None: """Reset detector state.""" diff --git a/src/l0/runtime.py b/src/l0/runtime.py index b6a601d..ac1fa5c 100644 --- a/src/l0/runtime.py +++ b/src/l0/runtime.py @@ -39,6 +39,23 @@ def __init__(self, message: str, timeout_type: str, timeout_seconds: float): self.timeout_seconds = timeout_seconds +# ───────────────────────────────────────────────────────────────────────────── +# Fast callback ID generation using UUIDv7 (time-ordered, faster than UUIDv4) +# ───────────────────────────────────────────────────────────────────────────── + +import uuid6 + + +def _next_callback_id() -> str: + """Generate a callback ID using UUIDv7. + + UUIDv7 is time-ordered and faster than UUIDv4 since it uses + timestamp + random bits instead of pure crypto-random. + Provides global uniqueness for distributed tracing. + """ + return f"cb_{uuid6.uuid7().hex[:12]}" + + if TYPE_CHECKING: from .events import ObservabilityEvent from .guardrails import GuardrailRule, GuardrailViolation @@ -676,8 +693,6 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]: state.token_count % guardrail_interval == 0 and guardrails ): - import uuid - phase_start_time = time.perf_counter() event_bus.emit( ObservabilityEventType.GUARDRAIL_PHASE_START, @@ -687,7 +702,7 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]: all_violations = [] for idx, rule in enumerate(guardrails): - callback_id = f"cb_{uuid.uuid4().hex[:12]}" + callback_id = _next_callback_id() rule_start_time = time.perf_counter() event_bus.emit( ObservabilityEventType.GUARDRAIL_RULE_START, @@ -857,8 +872,6 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]: # Run final guardrail check (for completion-only rules) if guardrails: - import uuid - final_phase_start_time = time.perf_counter() event_bus.emit( ObservabilityEventType.GUARDRAIL_PHASE_START, @@ -868,7 +881,7 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]: all_violations = [] for idx, rule in enumerate(guardrails): - callback_id = f"cb_{uuid.uuid4().hex[:12]}" + callback_id = _next_callback_id() rule_start_time = time.perf_counter() event_bus.emit( ObservabilityEventType.GUARDRAIL_RULE_START, diff --git a/src/l0/state.py b/src/l0/state.py index eb9562c..a8a5a4c 100644 --- a/src/l0/state.py +++ b/src/l0/state.py @@ -18,12 +18,15 @@ def update_checkpoint(state: State) -> None: def append_token(state: State, token: str) -> None: - """Append token to content and update timing.""" + """Append token to content and update timing. + + Uses O(1) amortized buffer append instead of O(n) string concatenation. + """ now = time.time() if state.first_token_at is None: state.first_token_at = now state.last_token_at = now - state.content += token + state.append_content(token) # O(1) amortized state.token_count += 1 diff --git a/src/l0/types.py b/src/l0/types.py index 7475037..01cb374 100644 --- a/src/l0/types.py +++ b/src/l0/types.py @@ -260,34 +260,105 @@ class BackoffStrategy(str, Enum): # ───────────────────────────────────────────────────────────────────────────── -@dataclass class State: - """Runtime state tracking.""" - - content: str = "" - checkpoint: str = "" # Last known good slice for continuation - token_count: int = 0 - model_retry_count: int = 0 - network_retry_count: int = 0 - fallback_index: int = 0 - violations: "list[GuardrailViolation]" = field(default_factory=list) - drift_detected: bool = False - completed: bool = False - aborted: bool = False - first_token_at: float | None = None - last_token_at: float | None = None - duration: float | None = None - resumed: bool = False # Whether stream was resumed from checkpoint - network_errors: "list[NetworkError]" = field(default_factory=list) - # Multimodal state - data_outputs: list[DataPayload] = field(default_factory=list) - last_progress: Progress | None = None - # Continuation state (for observability) - resume_point: str | None = None # The checkpoint content used for resume - resume_from: int | None = None # Character offset where resume occurred - continuation_used: bool = False # Whether continuation was actually used - deduplication_applied: bool = False # Whether deduplication removed overlap - overlap_removed: str | None = None # The overlapping text that was removed + """Runtime state tracking. + + Uses an internal buffer for O(1) token appends instead of O(n) string concatenation. + Access `content` property to get the accumulated string (flushes buffer automatically). + """ + + __slots__ = ( + "_content", + "_content_buffer", + "checkpoint", + "token_count", + "model_retry_count", + "network_retry_count", + "fallback_index", + "violations", + "drift_detected", + "completed", + "aborted", + "first_token_at", + "last_token_at", + "duration", + "resumed", + "network_errors", + "data_outputs", + "last_progress", + "resume_point", + "resume_from", + "continuation_used", + "deduplication_applied", + "overlap_removed", + ) + + def __init__( + self, + content: str = "", + checkpoint: str = "", + token_count: int = 0, + model_retry_count: int = 0, + network_retry_count: int = 0, + fallback_index: int = 0, + violations: "list[GuardrailViolation] | None" = None, + drift_detected: bool = False, + completed: bool = False, + aborted: bool = False, + first_token_at: float | None = None, + last_token_at: float | None = None, + duration: float | None = None, + resumed: bool = False, + network_errors: "list[NetworkError] | None" = None, + data_outputs: list[DataPayload] | None = None, + last_progress: Progress | None = None, + resume_point: str | None = None, + resume_from: int | None = None, + continuation_used: bool = False, + deduplication_applied: bool = False, + overlap_removed: str | None = None, + ) -> None: + self._content = content + self._content_buffer: list[str] = [] + self.checkpoint = checkpoint + self.token_count = token_count + self.model_retry_count = model_retry_count + self.network_retry_count = network_retry_count + self.fallback_index = fallback_index + self.violations = violations if violations is not None else [] + self.drift_detected = drift_detected + self.completed = completed + self.aborted = aborted + self.first_token_at = first_token_at + self.last_token_at = last_token_at + self.duration = duration + self.resumed = resumed + self.network_errors = network_errors if network_errors is not None else [] + self.data_outputs = data_outputs if data_outputs is not None else [] + self.last_progress = last_progress + self.resume_point = resume_point + self.resume_from = resume_from + self.continuation_used = continuation_used + self.deduplication_applied = deduplication_applied + self.overlap_removed = overlap_removed + + @property + def content(self) -> str: + """Get accumulated content, flushing buffer if needed.""" + if self._content_buffer: + self._content = self._content + "".join(self._content_buffer) + self._content_buffer.clear() + return self._content + + @content.setter + def content(self, value: str) -> None: + """Set content directly, clearing any buffered tokens.""" + self._content = value + self._content_buffer.clear() + + def append_content(self, token: str) -> None: + """Append token to content buffer (O(1) amortized).""" + self._content_buffer.append(token) # ─────────────────────────────────────────────────────────────────────────────