Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion agent_sdks/python/src/a2ui/a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# limitations under the License.

import logging
from typing import Any, Optional, List
from typing import Any, Optional, List, AsyncIterable, TYPE_CHECKING

if TYPE_CHECKING:
from a2ui.core.parser.streaming import A2uiStreamParser
from a2a.server.agent_execution import RequestContext
from a2a.types import (
AgentExtension,
Expand Down Expand Up @@ -247,3 +249,39 @@ def try_activate_a2ui_extension(
return selected_uri.replace(f"{A2UI_EXTENSION_BASE_URI}/v", "")

return None


async def stream_response_to_parts(
parser: "A2uiStreamParser",
token_stream: AsyncIterable[str],
) -> AsyncIterable[Part]:
"""Helper to parse a stream of LLM tokens into A2A Parts incrementally.
Args:
parser: A2uiStreamParser instance to process the stream.
token_stream: An async iterable of strings (tokens).
Yields:
A2A Part objects as they are discovered in the stream.
"""
async for token in token_stream:
logger.info("-----------------------------")
logger.info(f"--- AGENT: Received token:\n{token}")
response_parts = parser.process_chunk(token)
logger.info(
f"--- AGENT: Response parts:\n{[part.a2ui_json for part in response_parts]}\n"
)
logger.info("-----------------------------")

for part in response_parts:
if part.text:
yield Part(root=TextPart(text=part.text))

if part.a2ui_json:
json_data = part.a2ui_json

if isinstance(json_data, list):
for message in json_data:
yield create_a2ui_part(message)
else:
yield create_a2ui_part(json_data)
12 changes: 8 additions & 4 deletions agent_sdks/python/src/a2ui/core/schema/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@

DEFAULT_WORKFLOW_RULES = f"""
The generated response MUST follow these rules:
1. The response can contain one or more A2UI JSON blocks.
2. Each A2UI JSON block MUST be wrapped in `{A2UI_OPEN_TAG}` and `{A2UI_CLOSE_TAG}` tags.
3. Between or around these blocks, you can provide conversational text.
4. The JSON part MUST be a single, raw JSON object (usually a list of A2UI messages) and MUST validate against the provided A2UI JSON SCHEMA.
- The response can contain one or more A2UI JSON blocks.
- Each A2UI JSON block MUST be wrapped in `{A2UI_OPEN_TAG}` and `{A2UI_CLOSE_TAG}` tags.
- Between or around these blocks, you can provide conversational text.
- The JSON part MUST be a single, raw JSON object (usually a list of A2UI messages) and MUST validate against the provided A2UI JSON SCHEMA.
- Top-Down Component Ordering: Within the `components` list of a message:
- The 'root' component MUST be the FIRST element.
- Parent components MUST appear before their child components.
This specific ordering allows the streaming parser to yield and render the UI incrementally as it arrives.
"""
63 changes: 45 additions & 18 deletions samples/agent/adk/contact_lookup/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import jsonschema

from google.adk.agents import run_config
from google.adk.agents.llm_agent import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
Expand All @@ -43,7 +44,11 @@
from a2ui.core.schema.manager import A2uiSchemaManager
from a2ui.core.parser.parser import parse_response, ResponsePart
from a2ui.basic_catalog.provider import BasicCatalog
from a2ui.a2a import create_a2ui_part, get_a2ui_agent_extension, parse_response_to_parts
from a2ui.a2a import (
get_a2ui_agent_extension,
parse_response_to_parts,
stream_response_to_parts,
)

logger = logging.getLogger(__name__)

Expand All @@ -61,6 +66,7 @@ def __init__(self, base_url: str):

self._schema_managers: Dict[str, A2uiSchemaManager] = {}
self._ui_runners: Dict[str, Runner] = {}
self._parsers: Dict[str, A2uiStreamParser] = {}

for version in [VERSION_0_8, VERSION_0_9]:
schema_manager = self._build_schema_manager(version)
Expand Down Expand Up @@ -233,27 +239,46 @@ async def stream(
current_message = types.Content(
role="user", parts=[types.Part.from_text(text=current_query_text)]
)
final_response_content = None

async for event in runner.run_async(
user_id=self._user_id,
session_id=session.id,
new_message=current_message,
):
logger.info(f"Event from runner: {event}")
if event.is_final_response():
if event.content and event.content.parts and event.content.parts[0].text:
final_response_content = "\n".join(
[p.text for p in event.content.parts if p.text]
)
break # Got the final response, stop consuming events
else:
logger.info(f"Intermediate event: {event}")
# Yield intermediate updates on every attempt
full_content_list = []

async def token_stream():
async for event in runner.run_async(
user_id=self._user_id,
session_id=session.id,
run_config=run_config.RunConfig(
streaming_mode=run_config.StreamingMode.SSE
),
new_message=current_message,
):
if event.content and event.content.parts:
for p in event.content.parts:
if p.text:
full_content_list.append(p.text)
yield p.text

if selected_catalog:
from a2ui.core.parser.streaming import A2uiStreamParser

if session_id not in self._parsers:
self._parsers[session_id] = A2uiStreamParser(catalog=selected_catalog)

async for part in stream_response_to_parts(
self._parsers[session_id],
token_stream(),
):
yield {
"is_task_complete": False,
"updates": self.get_processing_message(),
"parts": [part],
}
else:
async for token in token_stream():
yield {
"is_task_complete": False,
"updates": token,
}

final_response_content = "".join(full_content_list)

if final_response_content is None:
logger.warning(
Expand All @@ -265,8 +290,10 @@ async def stream(
"I received no response. Please try again."
f"Please retry the original request: '{query}'"
)
logger.info(f"Retrying with query: {current_query_text}")
continue # Go to next retry
else:
logger.info("Retries exhausted on no-response")
# Retries exhausted on no-response
final_response_content = (
"I'm sorry, I encountered an error and couldn't process your request."
Expand Down
33 changes: 21 additions & 12 deletions samples/agent/adk/contact_lookup/agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from a2a.utils.errors import ServerError
from agent import ContactAgent
from a2ui.a2a import try_activate_a2ui_extension
from a2ui.a2a import create_a2ui_part

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -127,26 +128,24 @@ async def execute(
async for item in self._agent.stream(query, task.context_id, active_ui_version):
is_task_complete = item["is_task_complete"]
if not is_task_complete:
await updater.update_status(
TaskState.working,
new_agent_text_message(item["updates"], task.context_id, task.id),
)
message = None
if "parts" in item:
message = new_agent_parts_message(item["parts"], task.context_id, task.id)
elif "updates" in item:
message = new_agent_text_message(item["updates"], task.context_id, task.id)

if message:
await updater.update_status(TaskState.working, message)
continue

final_state = TaskState.input_required # Default
final_state = TaskState.input_required
if action in ["send_email", "send_message", "view_full_profile"]:
final_state = TaskState.completed

final_parts = item["parts"]

logger.info("--- FINAL PARTS TO BE SENT ---")
for i, part in enumerate(final_parts):
logger.info(f" - Part {i}: Type = {type(part.root)}")
if isinstance(part.root, TextPart):
logger.info(f" - Text: {part.root.text[:200]}...")
elif isinstance(part.root, DataPart):
logger.info(f" - Data: {str(part.root.data)[:200]}...")
logger.info("-----------------------------")
self._log_parts(final_parts)

await updater.update_status(
final_state,
Expand All @@ -159,3 +158,13 @@ async def cancel(
self, request: RequestContext, event_queue: EventQueue
) -> Task | None:
raise ServerError(error=UnsupportedOperationError())

def _log_parts(self, parts: list[Part]):
logger.info("--- PARTS TO BE SENT ---")
for i, part in enumerate(parts):
logger.info(f" - Part {i}: Type = {type(part.root)}")
if isinstance(part.root, TextPart):
logger.info(f" - Text: {part.root.text[:200]}...")
elif isinstance(part.root, DataPart):
logger.info(f" - Data: {str(part.root.data)[:200]}...")
logger.info("-----------------------------")
Loading
Loading