-
Notifications
You must be signed in to change notification settings - Fork 205
Add support for LLM token usage data streaming #116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
|
Warning
|
| Cohort / File(s) | Summary |
|---|---|
Streaming chain / control flow src/nvidia_rag/rag_server/main.py |
Introduces a separate streaming chain (stream_chain) used for streaming paths (LLM and RAG). Replaces prior chain usage with stream_chain.ainvoke/astream or .ainvoke as appropriate, eagerly prefetches first chunk to surface errors, and preserves a non-streaming base_chain for full-string responses. Updated imports to include RunnableGenerator. |
Response construction & usage parsing src/nvidia_rag/rag_server/response_generator.py |
Adds handling of usage_metadata from AIMessageChunk in sync and async generators; parses input_tokens, output_tokens, total_tokens into a Usage object (defaulting total=input+output), skips empty/contentless usage-only chunks, uses chunk content for messages/deltas, and attaches usage to final ChainResponse. Function signatures extended to accept optional usage. |
LLM streaming & sentinel emission src/nvidia_rag/utils/llm.py |
Streaming yields changed to emit AIMessageChunk objects and propagate usage_metadata; logic consolidated to use ChatOpenAI with endpoint/base_url handling; streaming filter parsers updated to passthrough when disabled; emits final usage-only chunk if needed. |
Dependencies / config pyproject.toml |
Adds langchain-openai>=0.2,<1.0 to top-level dependencies and removes it from several optional groups. |
Sequence Diagram(s)
sequenceDiagram
participant Client as Client
participant StreamChain as stream_chain (RunnableGenerator)
participant LLM as LLM Model
participant RespGen as ResponseGenerator
participant Response as ChainResponse
Client->>StreamChain: request (streaming)
StreamChain->>LLM: astream / invoke
LLM-->>StreamChain: AIMessageChunk (content + usage_metadata)
StreamChain->>RespGen: forward chunks (prefetch first chunk)
RespGen->>RespGen: accumulate content, detect sentinel/usage_metadata
RespGen->>RespGen: build Usage object from usage_metadata
RespGen->>Response: finalize ChainResponse with usage
Response->>Client: stream/final response (with usage)
Estimated code review effort
🎯 4 (Complex) | ⏱️ ~45 minutes
Poem
🐰 I hopped along the streaming line,
Collected tokens, counted fine,
In JSON whispers I did tuck,
Usage tucked into each luck,
Now responses wear their token shine 🥕
🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
| Check name | Status | Explanation |
|---|---|---|
| Description Check | ✅ Passed | Check skipped - CodeRabbit’s high-level summary is enabled. |
| Title check | ✅ Passed | The title 'Add support for LLM token usage data streaming' accurately summarizes the main objective of the pull request, which introduces streaming integration for token usage metadata across multiple files. |
| Docstring Coverage | ✅ Passed | Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%. |
✏️ Tip: You can configure your own custom pre-merge checks in the settings.
✨ Finishing touches
- 📝 Generate docstrings
📜 Recent review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
pyproject.tomlsrc/nvidia_rag/rag_server/main.pysrc/nvidia_rag/rag_server/response_generator.pysrc/nvidia_rag/utils/llm.py
🧰 Additional context used
🪛 Ruff (0.14.10)
src/nvidia_rag/rag_server/response_generator.py
472-472: Do not catch blind exception: Exception
(BLE001)
659-659: Do not catch blind exception: Exception
(BLE001)
src/nvidia_rag/rag_server/main.py
2443-2443: Local variable base_chain is assigned to but never used
Remove assignment to unused variable base_chain
(F841)
🔇 Additional comments (13)
pyproject.toml (1)
28-28: LGTM! Dependency promotion aligns with code changes.Promoting
langchain-openaito core dependencies is appropriate sincellm.pynow usesChatOpenAIfor all LLM interactions (both NVIDIA endpoints and API catalog), making it a required dependency rather than optional.src/nvidia_rag/rag_server/main.py (3)
51-51: LGTM! Import addition supports usage metadata handling.Adding
RunnableGeneratorto imports is necessary for the streaming chain implementation that preservesAIMessageChunkobjects withusage_metadata.
411-423: LGTM! Streaming chain preserves usage metadata.Creating a separate
stream_chainwithoutStrOutputParseris the correct approach to preserveAIMessageChunkobjects that containusage_metadata, which can then be parsed downstream in the response generator.
2509-2515: LGTM! Consistent streaming pattern with usage metadata support.Using
stream_chain.astream()for the streaming path (when reflection is not active) correctly preservesAIMessageChunkobjects withusage_metadatafor downstream parsing.src/nvidia_rag/rag_server/response_generator.py (5)
446-474: LGTM! Robust usage metadata parsing.The implementation properly:
- Checks for
usage_metadataattribute on chunks- Extracts token counts with safe defaults
- Logs usage information for observability
- Gracefully handles parsing failures with debug logging
475-482: LGTM! Proper content extraction from chunks.Extracting content using
hasattr(chunk, "content")handles bothAIMessageChunkobjects and plain strings safely. Skipping empty content chunks (which may only contain usage metadata) prevents spurious empty response chunks.
559-560: LGTM! Usage attached to final response.Conditionally attaching the parsed
Usageobject to the finalChainResponseensures token usage data is included in the API response when available.
633-670: LGTM! Consistent async implementation.The async version mirrors the sync implementation correctly, with proper handling of
AIMessageChunkobjects, usage metadata parsing, content extraction, and empty chunk filtering.
746-747: LGTM! Async usage attachment matches sync pattern.Consistent with the sync version, usage data is properly attached to the final response in the async path.
src/nvidia_rag/utils/llm.py (4)
25-37: LGTM! Imports support consolidated LLM implementation.The new imports (
json,AIMessageChunk,ChatOpenAI) are necessary for the unified LLM creation approach and usage metadata propagation in streaming.
215-277: LGTM! Excellent consolidation of LLM creation logic.This unified approach using
ChatOpenAIfor all NVIDIA endpoints (both API catalog and custom endpoints) is cleaner and solves token usage reporting issues. Key strengths:
- Unified code path reduces maintenance burden
_is_nvidia_endpoint()for proper endpoint detectionstream_options: {"include_usage": True}enables usage metadata- NVIDIA-specific parameters properly placed in
extra_body- Thinking token parameters handled correctly
- Clean separation of standard vs. NVIDIA-specific parameters
481-490: LGTM! Usage metadata capture in streaming filter.Properly capturing
usage_metadatafrom chunks during filtering ensures usage information is preserved and propagated through the streaming pipeline.
583-616: LGTM! Robust usage propagation in filtered output.The implementation correctly:
- Yields
AIMessageChunkobjects instead of plain strings- Attaches
usage_metadatato chunks when available- Handles partial matches at EOF
- Emits an empty chunk with usage if no content was emitted (lines 611-616)
This ensures usage data reaches the response generator even when all content is filtered out.
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.
Comment @coderabbitai help to get the list of available commands and usage tips.
|
@coderabbitai review this |
|
@shubhadeepd, I'll review the changes in this PR. ✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
src/nvidia_rag/rag_server/main.py(6 hunks)src/nvidia_rag/rag_server/response_generator.py(6 hunks)src/nvidia_rag/utils/llm.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
src/nvidia_rag/rag_server/response_generator.py (1)
tests/unit/test_rag_server/test_rag_server.py (1)
llm(86-90)
src/nvidia_rag/rag_server/main.py (1)
src/nvidia_rag/utils/llm.py (2)
get_streaming_filter_think_parser_async(530-552)stream_with_usage_sentinel(558-575)
🪛 Ruff (0.14.8)
src/nvidia_rag/utils/llm.py
574-574: Do not catch blind exception: Exception
(BLE001)
src/nvidia_rag/rag_server/response_generator.py
465-465: Do not catch blind exception: Exception
(BLE001)
635-635: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (9)
src/nvidia_rag/utils/llm.py (1)
555-555: LGTM on the sentinel prefix design.Using a distinctive prefix like
__RAG_USAGE_SENTINEL__:is a clean approach for marking synthetic metadata chunks in the stream.src/nvidia_rag/rag_server/response_generator.py (3)
28-28: LGTM on the new imports.The imports for
jsonandUSAGE_SENTINEL_PREFIXare correctly added to support parsing usage sentinel chunks.Also applies to: 43-43
543-544: LGTM on attaching usage to the final response.The conditional attachment of usage data only when available is correct and maintains backward compatibility.
713-714: LGTM on the async usage attachment.Consistent with the sync path implementation.
src/nvidia_rag/rag_server/main.py (5)
198-200: LGTM on UsageSentinelParser initialization.The parser is correctly instantiated as a
RunnableGeneratorwrappingstream_with_usage_sentinel, and the comment clearly explains its purpose.
1271-1279: LGTM on the LLM chain construction.The
UsageSentinelParseris correctly positioned between the LLM andStreamingFilterThinkParserto capture usage metadata before think tokens are filtered. The comment adequately explains the rationale.
2154-2165: LGTM on the dual chain design for RAG.The separation of
base_chain(for reflection/non-streaming) andstream_chain(for streaming with usage sentinel) is a clean design. Usage metadata is appropriately omitted from the reflection path where it wouldn't be surfaced anyway.
2173-2173: LGTM on using base_chain for reflection path.Correctly uses
base_chainwithout usage sentinel for the reflection iteration path.
2201-2205: LGTM on using stream_chain for the streaming path.Correctly uses
stream_chainwith usage sentinel for the normal streaming response path.
| except Exception as e: | ||
| logger.debug("Failed to parse usage sentinel: %s", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Narrow the exception type for usage parsing.
As flagged by static analysis (BLE001), catching bare Exception is too broad. The parsing can fail due to JSON decode errors or type conversion issues.
- except Exception as e:
+ except (json.JSONDecodeError, ValueError, TypeError) as e:
logger.debug("Failed to parse usage sentinel: %s", e)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| except Exception as e: | |
| logger.debug("Failed to parse usage sentinel: %s", e) | |
| except (json.JSONDecodeError, ValueError, TypeError) as e: | |
| logger.debug("Failed to parse usage sentinel: %s", e) |
🧰 Tools
🪛 Ruff (0.14.8)
465-465: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In src/nvidia_rag/rag_server/response_generator.py around lines 465-466 the code
currently catches a bare Exception when parsing the usage sentinel; instead,
restrict the except clause to the specific errors that can occur (e.g.,
json.JSONDecodeError for JSON parsing and ValueError/TypeError for type
conversion issues), import json if not already present, and handle/log those
specific exceptions (or add multiple except blocks) rather than using a blanket
Exception catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check again if still valid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nv-nikkulkarni, yes, the review comment is still valid. The code currently catches bare Exception when parsing usage metadata, though the implementation has changed since my original comment.
Current state: Both the sync (generate_answer) and async (generate_answer_async) versions still use broad exception handling around lines 465 and 565 respectively.
Updated recommendation: Since the code no longer parses JSON but directly accesses usage_metadata attributes and converts values with int(), the specific exceptions to catch should be:
except (ValueError, TypeError, AttributeError) as e:
logger.debug("Failed to parse usage metadata: %s", e)This will catch:
ValueError/TypeErrorfromint()conversionsAttributeErrorif the dictionary operations fail unexpectedly
This change should be applied to both the sync version (around line 465) and the async version (around line 565).
460fd88 to
03437f2
Compare
| chain = prompt | llm | self.StreamingFilterThinkParser | StrOutputParser() | ||
| # Base chain (no usage sentinel) used for non-streaming reflection path. | ||
| # We keep StrOutputParser here because we want the full string response for logic. | ||
| base_chain = prompt | llm | self.StreamingFilterThinkParser | StrOutputParser() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check again if still valid
| 6. get_streaming_filter_think_parser_async: Get the parser for filtering the think tokens (async). | ||
| """ | ||
|
|
||
| import json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check again if still valid
| from langchain_core.prompts import MessagesPlaceholder | ||
| from langchain_core.prompts.chat import ChatPromptTemplate | ||
| from langchain_core.runnables import RunnableAssign | ||
| from langchain_core.runnables import RunnableAssign, RunnableGenerator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check again if still valid
03437f2 to
75bdea9
Compare
|
@coderabbitai review this |
|
@nv-nikkulkarni, I'll review the changes in this PR. ✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/nvidia_rag/rag_server/main.py (1)
2441-2458: Critical bug: undefined variable reference.Line 2458 references
chain.ainvoke(), but the variablechainis not defined in this scope. Onlybase_chainandstream_chainexist. This will cause aNameErrorat runtime when reflection is enabled.🐛 Fix the undefined variable reference
# Check response groundedness if we still have reflection # iterations available if ( self.config.reflection.enable_reflection and reflection_counter.remaining > 0 ): - initial_response = await chain.ainvoke( + initial_response = await base_chain.ainvoke( {"question": query, "context": docs} )
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
pyproject.tomlsrc/nvidia_rag/rag_server/main.pysrc/nvidia_rag/rag_server/response_generator.pysrc/nvidia_rag/utils/llm.py
🧰 Additional context used
🪛 Ruff (0.14.10)
src/nvidia_rag/rag_server/response_generator.py
472-472: Do not catch blind exception: Exception
(BLE001)
659-659: Do not catch blind exception: Exception
(BLE001)
src/nvidia_rag/rag_server/main.py
2443-2443: Local variable base_chain is assigned to but never used
Remove assignment to unused variable base_chain
(F841)
🔇 Additional comments (13)
pyproject.toml (1)
28-28: LGTM! Dependency promotion aligns with code changes.Promoting
langchain-openaito core dependencies is appropriate sincellm.pynow usesChatOpenAIfor all LLM interactions (both NVIDIA endpoints and API catalog), making it a required dependency rather than optional.src/nvidia_rag/rag_server/main.py (3)
51-51: LGTM! Import addition supports usage metadata handling.Adding
RunnableGeneratorto imports is necessary for the streaming chain implementation that preservesAIMessageChunkobjects withusage_metadata.
411-423: LGTM! Streaming chain preserves usage metadata.Creating a separate
stream_chainwithoutStrOutputParseris the correct approach to preserveAIMessageChunkobjects that containusage_metadata, which can then be parsed downstream in the response generator.
2509-2515: LGTM! Consistent streaming pattern with usage metadata support.Using
stream_chain.astream()for the streaming path (when reflection is not active) correctly preservesAIMessageChunkobjects withusage_metadatafor downstream parsing.src/nvidia_rag/rag_server/response_generator.py (5)
446-474: LGTM! Robust usage metadata parsing.The implementation properly:
- Checks for
usage_metadataattribute on chunks- Extracts token counts with safe defaults
- Logs usage information for observability
- Gracefully handles parsing failures with debug logging
475-482: LGTM! Proper content extraction from chunks.Extracting content using
hasattr(chunk, "content")handles bothAIMessageChunkobjects and plain strings safely. Skipping empty content chunks (which may only contain usage metadata) prevents spurious empty response chunks.
559-560: LGTM! Usage attached to final response.Conditionally attaching the parsed
Usageobject to the finalChainResponseensures token usage data is included in the API response when available.
633-670: LGTM! Consistent async implementation.The async version mirrors the sync implementation correctly, with proper handling of
AIMessageChunkobjects, usage metadata parsing, content extraction, and empty chunk filtering.
746-747: LGTM! Async usage attachment matches sync pattern.Consistent with the sync version, usage data is properly attached to the final response in the async path.
src/nvidia_rag/utils/llm.py (4)
25-37: LGTM! Imports support consolidated LLM implementation.The new imports (
json,AIMessageChunk,ChatOpenAI) are necessary for the unified LLM creation approach and usage metadata propagation in streaming.
215-277: LGTM! Excellent consolidation of LLM creation logic.This unified approach using
ChatOpenAIfor all NVIDIA endpoints (both API catalog and custom endpoints) is cleaner and solves token usage reporting issues. Key strengths:
- Unified code path reduces maintenance burden
_is_nvidia_endpoint()for proper endpoint detectionstream_options: {"include_usage": True}enables usage metadata- NVIDIA-specific parameters properly placed in
extra_body- Thinking token parameters handled correctly
- Clean separation of standard vs. NVIDIA-specific parameters
481-490: LGTM! Usage metadata capture in streaming filter.Properly capturing
usage_metadatafrom chunks during filtering ensures usage information is preserved and propagated through the streaming pipeline.
583-616: LGTM! Robust usage propagation in filtered output.The implementation correctly:
- Yields
AIMessageChunkobjects instead of plain strings- Attaches
usage_metadatato chunks when available- Handles partial matches at EOF
- Emits an empty chunk with usage if no content was emitted (lines 611-616)
This ensures usage data reaches the response generator even when all content is filtered out.
Replaces ChatNVIDIA with ChatOpenAI to resolve token usage reporting issues. Moves non-standard parameters to extra_body and enables stream usage reporting.
75bdea9 to
9442c17
Compare
Summary by CodeRabbit
New Features
Improvements
✏️ Tip: You can customize this high-level summary in your review settings.