Skip to content

feat: add blaze voicebot plugin#5050

Open
HoangPN711 wants to merge 5 commits intolivekit:mainfrom
Actable-AI:feat/blaze-voicebot-plugin
Open

feat: add blaze voicebot plugin#5050
HoangPN711 wants to merge 5 commits intolivekit:mainfrom
Actable-AI:feat/blaze-voicebot-plugin

Conversation

@HoangPN711
Copy link

No description provided.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

devin-ai-integration[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 new potential issues.

View 11 additional findings in Devin Review.

Open in Devin Review

Comment on lines +239 to +298
for attempt in range(conn_options.max_retry + 1):
try:
async with self._tts._client.stream(
"POST",
self._tts._tts_url,
files=form_data,
headers=headers,
) as response:
if response.status_code >= 500:
error_text = (await response.aread()).decode(errors="replace")
if attempt < conn_options.max_retry:
delay = conn_options.retry_interval * (2 ** attempt)
jitter = delay * 0.1 * random.random()
logger.warning(
"[%s] TTS attempt %d/%d failed (%d). "
"Retrying in %.1fs…",
request_id, attempt + 1,
conn_options.max_retry + 1,
response.status_code, delay,
)
await asyncio.sleep(delay + jitter)
continue
raise TTSError(
f"TTS service error: {response.status_code}",
status_code=response.status_code,
)

if response.status_code != 200:
error_text = (await response.aread()).decode(errors="replace")
logger.error(
"[%s] TTS error %d: %s",
request_id, response.status_code, error_text,
)
raise TTSError(
f"TTS service error: {response.status_code}",
status_code=response.status_code,
)

# Stream audio chunks
async for chunk in response.aiter_bytes(
chunk_size=self._tts._chunk_size
):
if chunk:
output_emitter.push(chunk)

break # Success — exit retry loop

except (httpx.TimeoutException, httpx.NetworkError) as e:
if attempt < conn_options.max_retry:
delay = conn_options.retry_interval * (2 ** attempt)
jitter = delay * 0.1 * random.random()
logger.warning(
"[%s] TTS network error (attempt %d/%d): %s. "
"Retrying in %.1fs…",
request_id, attempt + 1,
conn_options.max_retry + 1, e, delay,
)
await asyncio.sleep(delay + jitter)
else:
raise TTSError(f"TTS network error: {e}") from e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 TTS internal retry loop causes duplicate/garbled audio when mid-stream errors occur

The _run() method implements its own retry loop (lines 239-298), but output_emitter is initialized once before the loop at line 213. If a httpx.TimeoutException or httpx.NetworkError occurs after some audio chunks have already been pushed to the emitter via output_emitter.push(chunk) at line 282, the retry continues to the next attempt without discarding the partial audio. The next successful attempt pushes the full audio again to the same emitter, resulting in duplicated/garbled audio output.

The base class ChunkedStream._main_task (livekit-agents/livekit/agents/tts/tts.py:274-275) properly handles retries by creating a new output_emitter per attempt. But since Blaze implements its own retry inside _run(), this mechanism is completely bypassed. The fix is to remove the internal retry loop and instead raise APIError (or a subclass) from _run(), letting the base class handle retries with fresh emitters per attempt.

Prompt for agents
In livekit-plugins/livekit-plugins-blaze/livekit/plugins/blaze/tts.py, the _run() method (lines 191-316) should NOT implement its own retry loop. Remove the for loop at line 239 and the except clause at lines 286-298. Instead, catch httpx errors and re-raise them as APIError (from livekit.agents._exceptions) or APIConnectionError/APITimeoutError so the base class ChunkedStream._main_task can handle retries properly with a fresh AudioEmitter per attempt. The base class creates a new output_emitter for each retry iteration, which avoids the audio duplication issue. Specifically:
1. Remove the 'for attempt in range(conn_options.max_retry + 1):' loop and associated retry/sleep logic.
2. Convert httpx.TimeoutException to APITimeoutError and httpx.NetworkError to APIConnectionError.
3. Convert HTTP 500+ errors to APIStatusError with retryable=True.
4. Convert HTTP 4xx errors to APIStatusError with retryable=False.
5. Remove the output_emitter.end_input() call at line 301 since the base class calls it after _run() returns.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +288 to +398
for attempt in range(conn_options.max_retry + 1):
full_response = "" # Reset on each attempt
try:
async with self._llm._client.stream(
"POST",
url,
json=messages,
headers=headers,
) as response:
if response.status_code >= 500:
error_text = (await response.aread()).decode(errors="replace")
if attempt < conn_options.max_retry:
delay = conn_options.retry_interval * (2 ** attempt)
jitter = delay * 0.1 * random.random()
logger.warning(
"[%s] LLM attempt %d/%d failed (%d). "
"Retrying in %.1fs…",
request_id, attempt + 1,
conn_options.max_retry + 1,
response.status_code, delay,
)
await asyncio.sleep(delay + jitter)
continue
raise LLMError(
f"Chatbot service error: {response.status_code}",
status_code=response.status_code,
)

if response.status_code != 200:
error_text = (await response.aread()).decode(errors="replace")
logger.error(
"[%s] LLM error %d: %s",
request_id, response.status_code, error_text,
)
raise LLMError(
f"Chatbot service error: {response.status_code}",
status_code=response.status_code,
)

async for line in response.aiter_lines():
if not line.strip():
continue

# Handle SSE format
if line.startswith("data: "):
data_str = line[6:]

if data_str.strip() == "[DONE]":
logger.debug(
"[%s] Stream completed with [DONE]",
request_id,
)
break

try:
data = json.loads(data_str)
content = self._extract_content(data)
if content:
full_response += content
chunk = llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(
role="assistant",
content=content,
),
)
self._event_ch.send_nowait(chunk)
except json.JSONDecodeError:
logger.warning(
"[%s] Failed to parse SSE data: %s",
request_id, data_str[:100],
)
continue

# Handle raw JSON lines format
else:
try:
data = json.loads(line)
content = self._extract_content(data)
if content:
full_response += content
chunk = llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(
role="assistant",
content=content,
),
)
self._event_ch.send_nowait(chunk)
except json.JSONDecodeError:
logger.warning(
"[%s] Failed to parse JSON line: %s",
request_id, line[:100],
)
continue

break # Success — exit retry loop

except (httpx.TimeoutException, httpx.NetworkError) as e:
if attempt < conn_options.max_retry:
delay = conn_options.retry_interval * (2 ** attempt)
jitter = delay * 0.1 * random.random()
logger.warning(
"[%s] LLM network error (attempt %d/%d): %s. "
"Retrying in %.1fs…",
request_id, attempt + 1,
conn_options.max_retry + 1, e, delay,
)
await asyncio.sleep(delay + jitter)
else:
raise LLMError(f"LLM network error: {e}") from e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 LLM internal retry loop causes duplicate text chunks when mid-stream errors occur

The LLMStream._run() method implements its own retry loop (lines 288-398). Chat chunks are sent to self._event_ch via send_nowait at line 354 as they stream in. If a httpx.TimeoutException or httpx.NetworkError occurs mid-stream—after some chunks have already been dispatched to the channel and potentially consumed—the retry resets full_response (line 289) but cannot retract chunks already sent to _event_ch. The next attempt generates the full response again and sends all new chunks to the same channel. The consumer sees partial text from the failed attempt concatenated with the complete text from the successful retry (e.g., "Hello, how can I help youHello, how can I help you?").

The base class LLMStream._main_task (livekit-agents/livekit/agents/llm/llm.py:208-247) has its own retry mechanism that catches APIError. The fix is to remove the internal retry loop and raise APIError subclasses instead, letting the base class handle retries.

Prompt for agents
In livekit-plugins/livekit-plugins-blaze/livekit/plugins/blaze/llm.py, the LLMStream._run() method (lines 240-413) should NOT implement its own retry loop. Remove the 'for attempt in range(conn_options.max_retry + 1):' loop at line 288 and the associated retry/sleep logic in lines 286-398. Instead:
1. Make a single attempt to stream the response.
2. Convert httpx.TimeoutException to APITimeoutError (from livekit.agents._exceptions).
3. Convert httpx.NetworkError to APIConnectionError.
4. Convert HTTP 500+ errors to APIStatusError with retryable=True.
5. Convert HTTP 4xx errors to APIStatusError with retryable=False.
This lets the base class LLMStream._main_task handle retries properly. While the base class also shares the same _event_ch across retries, this is the established pattern and ensures consistency with how other plugins (e.g. anthropic, openai) work.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants