Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion BENCHMARKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Performance benchmarks measuring L0 overhead on high-throughput streaming.
## Test Environment

- **CPU**: Apple M1 Max (10 cores)
- **Runtime**: Python 3.13, pytest 9 with pytest-asyncio 1.3.0
- **Runtime**: Python 3.13 with pytest-asyncio
- **Methodology**: Mock token streams with zero inter-token delay to measure pure L0 overhead

## Results
Expand Down
144 changes: 79 additions & 65 deletions src/l0/drift.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,60 @@
from dataclasses import dataclass, field
from typing import Any, Literal

# ─────────────────────────────────────────────────────────────────────────────
# Pre-compiled regex patterns for performance (avoids re-compilation per check)
# ─────────────────────────────────────────────────────────────────────────────

# Meta commentary patterns (case-insensitive, checked on last 200 chars)
_META_COMMENTARY_PATTERNS: list[re.Pattern[str]] = [
re.compile(r"as an ai", re.IGNORECASE),
re.compile(r"i'm an ai", re.IGNORECASE),
re.compile(r"i am an ai", re.IGNORECASE),
re.compile(r"i cannot actually", re.IGNORECASE),
re.compile(r"i don't have personal", re.IGNORECASE),
re.compile(r"i apologize, but i", re.IGNORECASE),
re.compile(r"i'm sorry, but i", re.IGNORECASE),
re.compile(r"let me explain", re.IGNORECASE),
re.compile(r"to clarify", re.IGNORECASE),
re.compile(r"in other words", re.IGNORECASE),
]

# Tone shift patterns
_FORMAL_PATTERN: re.Pattern[str] = re.compile(
r"\b(therefore|thus|hence|moreover|furthermore|consequently)\b", re.IGNORECASE
)
_INFORMAL_PATTERN: re.Pattern[str] = re.compile(
r"\b(gonna|wanna|yeah|yep|nope|ok|okay)\b", re.IGNORECASE
)

# Sentence split pattern
_SENTENCE_SPLIT_PATTERN: re.Pattern[str] = re.compile(r"[.!?]+")

# Format collapse patterns (checked on first 100 chars)
_FORMAT_COLLAPSE_PATTERNS: list[re.Pattern[str]] = [
re.compile(r"here is the .+?:", re.IGNORECASE),
re.compile(r"here's the .+?:", re.IGNORECASE),
re.compile(r"let me .+? for you", re.IGNORECASE),
re.compile(r"i'll .+? for you", re.IGNORECASE),
re.compile(r"here you go", re.IGNORECASE),
]

# Markdown patterns
_MARKDOWN_PATTERNS: list[re.Pattern[str]] = [
re.compile(r"```"),
re.compile(r"^#{1,6}\s", re.MULTILINE),
re.compile(r"\*\*.*?\*\*"),
re.compile(r"\[.*?\]\(.*?\)"),
]

# Hedging patterns (checked on first line)
_HEDGING_PATTERNS: list[re.Pattern[str]] = [
re.compile(r"^sure!?\s*$", re.IGNORECASE | re.MULTILINE),
re.compile(r"^certainly!?\s*$", re.IGNORECASE | re.MULTILINE),
re.compile(r"^of course!?\s*$", re.IGNORECASE | re.MULTILINE),
re.compile(r"^absolutely!?\s*$", re.IGNORECASE | re.MULTILINE),
]

# Drift types that can be detected
DriftType = Literal[
"tone_shift",
Expand Down Expand Up @@ -207,42 +261,27 @@ def check(self, content: str, delta: str | None = None) -> DriftResult:
)

def _detect_meta_commentary(self, content: str) -> bool:
"""Detect meta commentary patterns."""
meta_patterns = [
r"as an ai",
r"i'm an ai",
r"i am an ai",
r"i cannot actually",
r"i don't have personal",
r"i apologize, but i",
r"i'm sorry, but i",
r"let me explain",
r"to clarify",
r"in other words",
]

"""Detect meta commentary patterns using pre-compiled regexes."""
# Check last 200 characters for meta commentary
recent = content[-200:].lower()
return any(re.search(pattern, recent) for pattern in meta_patterns)
recent = content[-200:]
return any(p.search(recent) for p in _META_COMMENTARY_PATTERNS)

def _detect_tone_shift(self, content: str, previous_content: str) -> bool:
"""Detect tone shift between old and new content."""
"""Detect tone shift between old and new content using pre-compiled regexes."""
if not previous_content or len(previous_content) < 100:
return False

# Simple heuristic: check if formality suddenly changes
recent_chunk = content[-200:].lower()
previous_chunk = previous_content[-200:].lower()
recent_chunk = content[-200:]
previous_chunk = previous_content[-200:]

# Count formal markers
formal_pattern = r"\b(therefore|thus|hence|moreover|furthermore|consequently)\b"
recent_formal = len(re.findall(formal_pattern, recent_chunk))
previous_formal = len(re.findall(formal_pattern, previous_chunk))
# Count formal markers using pre-compiled pattern
recent_formal = len(_FORMAL_PATTERN.findall(recent_chunk))
previous_formal = len(_FORMAL_PATTERN.findall(previous_chunk))

# Count informal markers
informal_pattern = r"\b(gonna|wanna|yeah|yep|nope|ok|okay)\b"
recent_informal = len(re.findall(informal_pattern, recent_chunk))
previous_informal = len(re.findall(informal_pattern, previous_chunk))
# Count informal markers using pre-compiled pattern
recent_informal = len(_INFORMAL_PATTERN.findall(recent_chunk))
previous_informal = len(_INFORMAL_PATTERN.findall(previous_chunk))

# Check for sudden shift
formal_shift = abs(recent_formal - previous_formal) > 2
Expand All @@ -251,11 +290,11 @@ def _detect_tone_shift(self, content: str, previous_content: str) -> bool:
return formal_shift or informal_shift

def _detect_repetition(self, content: str) -> bool:
"""Detect excessive repetition."""
# Split into sentences
"""Detect excessive repetition using pre-compiled regex."""
# Split into sentences using pre-compiled pattern
sentences = [
s.strip().lower()
for s in re.split(r"[.!?]+", content)
for s in _SENTENCE_SPLIT_PATTERN.split(content)
if len(s.strip()) > 20
]

Expand Down Expand Up @@ -326,59 +365,34 @@ def _detect_entropy_spike(self) -> bool:
return last > mean + self.config.entropy_threshold * std_dev

def _detect_format_collapse(self, content: str) -> bool:
"""Detect format collapse (mixing instruction with output)."""
collapse_patterns = [
r"here is the .+?:",
r"here's the .+?:",
r"let me .+? for you",
r"i'll .+? for you",
r"here you go",
]

"""Detect format collapse using pre-compiled regexes."""
# Only check beginning of content
beginning = content[:100].lower()
return any(re.search(pattern, beginning) for pattern in collapse_patterns)
beginning = content[:100]
return any(p.search(beginning) for p in _FORMAT_COLLAPSE_PATTERNS)

def _detect_markdown_collapse(self, content: str, previous_content: str) -> bool:
"""Detect markdown to plaintext collapse."""
"""Detect markdown to plaintext collapse using pre-compiled regexes."""
if not previous_content or len(previous_content) < 100:
return False

# Count markdown elements in recent chunks
markdown_patterns = [
r"```",
r"^#{1,6}\s",
r"\*\*.*?\*\*",
r"\[.*?\]\(.*?\)",
]

recent = content[-200:]
previous = previous_content[-200:]

recent_markdown = 0
previous_markdown = 0

for pattern in markdown_patterns:
recent_markdown += len(re.findall(pattern, recent, re.MULTILINE))
previous_markdown += len(re.findall(pattern, previous, re.MULTILINE))
# Count markdown elements using pre-compiled patterns
for pattern in _MARKDOWN_PATTERNS:
recent_markdown += len(pattern.findall(recent))
previous_markdown += len(pattern.findall(previous))

# Check if markdown suddenly drops
return previous_markdown > 3 and recent_markdown == 0

def _detect_excessive_hedging(self, content: str) -> bool:
"""Detect excessive hedging at start."""
hedging_patterns = [
r"^sure!?\s*$",
r"^certainly!?\s*$",
r"^of course!?\s*$",
r"^absolutely!?\s*$",
]

"""Detect excessive hedging at start using pre-compiled regexes."""
first_line = content.strip().split("\n")[0] if content.strip() else ""
return any(
re.search(pattern, first_line, re.IGNORECASE | re.MULTILINE)
for pattern in hedging_patterns
)
return any(p.search(first_line) for p in _HEDGING_PATTERNS)

def reset(self) -> None:
"""Reset detector state."""
Expand Down
25 changes: 19 additions & 6 deletions src/l0/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ def __init__(self, message: str, timeout_type: str, timeout_seconds: float):
self.timeout_seconds = timeout_seconds


# ─────────────────────────────────────────────────────────────────────────────
# Fast callback ID generation using UUIDv7 (time-ordered, faster than UUIDv4)
# ─────────────────────────────────────────────────────────────────────────────

import uuid6


def _next_callback_id() -> str:
"""Generate a callback ID using UUIDv7.

UUIDv7 is time-ordered and faster than UUIDv4 since it uses
timestamp + random bits instead of pure crypto-random.
Provides global uniqueness for distributed tracing.
"""
return f"cb_{uuid6.uuid7().hex[:12]}"


if TYPE_CHECKING:
from .events import ObservabilityEvent
from .guardrails import GuardrailRule, GuardrailViolation
Expand Down Expand Up @@ -676,8 +693,6 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]:
state.token_count % guardrail_interval == 0
and guardrails
):
import uuid

phase_start_time = time.perf_counter()
event_bus.emit(
ObservabilityEventType.GUARDRAIL_PHASE_START,
Expand All @@ -687,7 +702,7 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]:

all_violations = []
for idx, rule in enumerate(guardrails):
callback_id = f"cb_{uuid.uuid4().hex[:12]}"
callback_id = _next_callback_id()
rule_start_time = time.perf_counter()
event_bus.emit(
ObservabilityEventType.GUARDRAIL_RULE_START,
Expand Down Expand Up @@ -857,8 +872,6 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]:

# Run final guardrail check (for completion-only rules)
if guardrails:
import uuid

final_phase_start_time = time.perf_counter()
event_bus.emit(
ObservabilityEventType.GUARDRAIL_PHASE_START,
Expand All @@ -868,7 +881,7 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]:

all_violations = []
for idx, rule in enumerate(guardrails):
callback_id = f"cb_{uuid.uuid4().hex[:12]}"
callback_id = _next_callback_id()
rule_start_time = time.perf_counter()
event_bus.emit(
ObservabilityEventType.GUARDRAIL_RULE_START,
Expand Down
7 changes: 5 additions & 2 deletions src/l0/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ def update_checkpoint(state: State) -> None:


def append_token(state: State, token: str) -> None:
"""Append token to content and update timing."""
"""Append token to content and update timing.

Uses O(1) amortized buffer append instead of O(n) string concatenation.
"""
now = time.time()
if state.first_token_at is None:
state.first_token_at = now
state.last_token_at = now
state.content += token
state.append_content(token) # O(1) amortized
state.token_count += 1


Expand Down
Loading