Conversation
|
|
| for attempt in range(conn_options.max_retry + 1): | ||
| try: | ||
| async with self._tts._client.stream( | ||
| "POST", | ||
| self._tts._tts_url, | ||
| files=form_data, | ||
| headers=headers, | ||
| ) as response: | ||
| if response.status_code >= 500: | ||
| error_text = (await response.aread()).decode(errors="replace") | ||
| if attempt < conn_options.max_retry: | ||
| delay = conn_options.retry_interval * (2 ** attempt) | ||
| jitter = delay * 0.1 * random.random() | ||
| logger.warning( | ||
| "[%s] TTS attempt %d/%d failed (%d). " | ||
| "Retrying in %.1fs…", | ||
| request_id, attempt + 1, | ||
| conn_options.max_retry + 1, | ||
| response.status_code, delay, | ||
| ) | ||
| await asyncio.sleep(delay + jitter) | ||
| continue | ||
| raise TTSError( | ||
| f"TTS service error: {response.status_code}", | ||
| status_code=response.status_code, | ||
| ) | ||
|
|
||
| if response.status_code != 200: | ||
| error_text = (await response.aread()).decode(errors="replace") | ||
| logger.error( | ||
| "[%s] TTS error %d: %s", | ||
| request_id, response.status_code, error_text, | ||
| ) | ||
| raise TTSError( | ||
| f"TTS service error: {response.status_code}", | ||
| status_code=response.status_code, | ||
| ) | ||
|
|
||
| # Stream audio chunks | ||
| async for chunk in response.aiter_bytes( | ||
| chunk_size=self._tts._chunk_size | ||
| ): | ||
| if chunk: | ||
| output_emitter.push(chunk) | ||
|
|
||
| break # Success — exit retry loop | ||
|
|
||
| except (httpx.TimeoutException, httpx.NetworkError) as e: | ||
| if attempt < conn_options.max_retry: | ||
| delay = conn_options.retry_interval * (2 ** attempt) | ||
| jitter = delay * 0.1 * random.random() | ||
| logger.warning( | ||
| "[%s] TTS network error (attempt %d/%d): %s. " | ||
| "Retrying in %.1fs…", | ||
| request_id, attempt + 1, | ||
| conn_options.max_retry + 1, e, delay, | ||
| ) | ||
| await asyncio.sleep(delay + jitter) | ||
| else: | ||
| raise TTSError(f"TTS network error: {e}") from e |
There was a problem hiding this comment.
🔴 TTS internal retry loop causes duplicate/garbled audio when mid-stream errors occur
The _run() method implements its own retry loop (lines 239-298), but output_emitter is initialized once before the loop at line 213. If a httpx.TimeoutException or httpx.NetworkError occurs after some audio chunks have already been pushed to the emitter via output_emitter.push(chunk) at line 282, the retry continues to the next attempt without discarding the partial audio. The next successful attempt pushes the full audio again to the same emitter, resulting in duplicated/garbled audio output.
The base class ChunkedStream._main_task (livekit-agents/livekit/agents/tts/tts.py:274-275) properly handles retries by creating a new output_emitter per attempt. But since Blaze implements its own retry inside _run(), this mechanism is completely bypassed. The fix is to remove the internal retry loop and instead raise APIError (or a subclass) from _run(), letting the base class handle retries with fresh emitters per attempt.
Prompt for agents
In livekit-plugins/livekit-plugins-blaze/livekit/plugins/blaze/tts.py, the _run() method (lines 191-316) should NOT implement its own retry loop. Remove the for loop at line 239 and the except clause at lines 286-298. Instead, catch httpx errors and re-raise them as APIError (from livekit.agents._exceptions) or APIConnectionError/APITimeoutError so the base class ChunkedStream._main_task can handle retries properly with a fresh AudioEmitter per attempt. The base class creates a new output_emitter for each retry iteration, which avoids the audio duplication issue. Specifically:
1. Remove the 'for attempt in range(conn_options.max_retry + 1):' loop and associated retry/sleep logic.
2. Convert httpx.TimeoutException to APITimeoutError and httpx.NetworkError to APIConnectionError.
3. Convert HTTP 500+ errors to APIStatusError with retryable=True.
4. Convert HTTP 4xx errors to APIStatusError with retryable=False.
5. Remove the output_emitter.end_input() call at line 301 since the base class calls it after _run() returns.
Was this helpful? React with 👍 or 👎 to provide feedback.
| for attempt in range(conn_options.max_retry + 1): | ||
| full_response = "" # Reset on each attempt | ||
| try: | ||
| async with self._llm._client.stream( | ||
| "POST", | ||
| url, | ||
| json=messages, | ||
| headers=headers, | ||
| ) as response: | ||
| if response.status_code >= 500: | ||
| error_text = (await response.aread()).decode(errors="replace") | ||
| if attempt < conn_options.max_retry: | ||
| delay = conn_options.retry_interval * (2 ** attempt) | ||
| jitter = delay * 0.1 * random.random() | ||
| logger.warning( | ||
| "[%s] LLM attempt %d/%d failed (%d). " | ||
| "Retrying in %.1fs…", | ||
| request_id, attempt + 1, | ||
| conn_options.max_retry + 1, | ||
| response.status_code, delay, | ||
| ) | ||
| await asyncio.sleep(delay + jitter) | ||
| continue | ||
| raise LLMError( | ||
| f"Chatbot service error: {response.status_code}", | ||
| status_code=response.status_code, | ||
| ) | ||
|
|
||
| if response.status_code != 200: | ||
| error_text = (await response.aread()).decode(errors="replace") | ||
| logger.error( | ||
| "[%s] LLM error %d: %s", | ||
| request_id, response.status_code, error_text, | ||
| ) | ||
| raise LLMError( | ||
| f"Chatbot service error: {response.status_code}", | ||
| status_code=response.status_code, | ||
| ) | ||
|
|
||
| async for line in response.aiter_lines(): | ||
| if not line.strip(): | ||
| continue | ||
|
|
||
| # Handle SSE format | ||
| if line.startswith("data: "): | ||
| data_str = line[6:] | ||
|
|
||
| if data_str.strip() == "[DONE]": | ||
| logger.debug( | ||
| "[%s] Stream completed with [DONE]", | ||
| request_id, | ||
| ) | ||
| break | ||
|
|
||
| try: | ||
| data = json.loads(data_str) | ||
| content = self._extract_content(data) | ||
| if content: | ||
| full_response += content | ||
| chunk = llm.ChatChunk( | ||
| id=request_id, | ||
| delta=llm.ChoiceDelta( | ||
| role="assistant", | ||
| content=content, | ||
| ), | ||
| ) | ||
| self._event_ch.send_nowait(chunk) | ||
| except json.JSONDecodeError: | ||
| logger.warning( | ||
| "[%s] Failed to parse SSE data: %s", | ||
| request_id, data_str[:100], | ||
| ) | ||
| continue | ||
|
|
||
| # Handle raw JSON lines format | ||
| else: | ||
| try: | ||
| data = json.loads(line) | ||
| content = self._extract_content(data) | ||
| if content: | ||
| full_response += content | ||
| chunk = llm.ChatChunk( | ||
| id=request_id, | ||
| delta=llm.ChoiceDelta( | ||
| role="assistant", | ||
| content=content, | ||
| ), | ||
| ) | ||
| self._event_ch.send_nowait(chunk) | ||
| except json.JSONDecodeError: | ||
| logger.warning( | ||
| "[%s] Failed to parse JSON line: %s", | ||
| request_id, line[:100], | ||
| ) | ||
| continue | ||
|
|
||
| break # Success — exit retry loop | ||
|
|
||
| except (httpx.TimeoutException, httpx.NetworkError) as e: | ||
| if attempt < conn_options.max_retry: | ||
| delay = conn_options.retry_interval * (2 ** attempt) | ||
| jitter = delay * 0.1 * random.random() | ||
| logger.warning( | ||
| "[%s] LLM network error (attempt %d/%d): %s. " | ||
| "Retrying in %.1fs…", | ||
| request_id, attempt + 1, | ||
| conn_options.max_retry + 1, e, delay, | ||
| ) | ||
| await asyncio.sleep(delay + jitter) | ||
| else: | ||
| raise LLMError(f"LLM network error: {e}") from e |
There was a problem hiding this comment.
🔴 LLM internal retry loop causes duplicate text chunks when mid-stream errors occur
The LLMStream._run() method implements its own retry loop (lines 288-398). Chat chunks are sent to self._event_ch via send_nowait at line 354 as they stream in. If a httpx.TimeoutException or httpx.NetworkError occurs mid-stream—after some chunks have already been dispatched to the channel and potentially consumed—the retry resets full_response (line 289) but cannot retract chunks already sent to _event_ch. The next attempt generates the full response again and sends all new chunks to the same channel. The consumer sees partial text from the failed attempt concatenated with the complete text from the successful retry (e.g., "Hello, how can I help youHello, how can I help you?").
The base class LLMStream._main_task (livekit-agents/livekit/agents/llm/llm.py:208-247) has its own retry mechanism that catches APIError. The fix is to remove the internal retry loop and raise APIError subclasses instead, letting the base class handle retries.
Prompt for agents
In livekit-plugins/livekit-plugins-blaze/livekit/plugins/blaze/llm.py, the LLMStream._run() method (lines 240-413) should NOT implement its own retry loop. Remove the 'for attempt in range(conn_options.max_retry + 1):' loop at line 288 and the associated retry/sleep logic in lines 286-398. Instead:
1. Make a single attempt to stream the response.
2. Convert httpx.TimeoutException to APITimeoutError (from livekit.agents._exceptions).
3. Convert httpx.NetworkError to APIConnectionError.
4. Convert HTTP 500+ errors to APIStatusError with retryable=True.
5. Convert HTTP 4xx errors to APIStatusError with retryable=False.
This lets the base class LLMStream._main_task handle retries properly. While the base class also shares the same _event_ch across retries, this is the established pattern and ensures consistency with how other plugins (e.g. anthropic, openai) work.
Was this helpful? React with 👍 or 👎 to provide feedback.
No description provided.