Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions examples/tutorials/00_sync/030_langgraph/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# Environments
.env**
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# IDE
.idea/
.vscode/
*.swp
*.swo

# Git
.git
.gitignore

# Misc
.DS_Store
50 changes: 50 additions & 0 deletions examples/tutorials/00_sync/030_langgraph/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# syntax=docker/dockerfile:1.3
FROM python:3.12-slim
COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/

# Install system dependencies
RUN apt-get update && apt-get install -y \
htop \
vim \
curl \
tar \
python3-dev \
postgresql-client \
build-essential \
libpq-dev \
gcc \
cmake \
netcat-openbsd \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

RUN uv pip install --system --upgrade pip setuptools wheel

ENV UV_HTTP_TIMEOUT=1000

# Copy pyproject.toml and README.md to install dependencies
COPY 00_sync/030_langgraph/pyproject.toml /app/030_langgraph/pyproject.toml
COPY 00_sync/030_langgraph/README.md /app/030_langgraph/README.md

WORKDIR /app/030_langgraph

# Copy the project code
COPY 00_sync/030_langgraph/project /app/030_langgraph/project

# Copy the test files
COPY 00_sync/030_langgraph/tests /app/030_langgraph/tests

# Copy shared test utilities
COPY test_utils /app/test_utils

# Install the required Python packages with dev dependencies
RUN uv pip install --system .[dev]

# Set environment variables
ENV PYTHONPATH=/app

# Set test environment variables
ENV AGENT_NAME=s030-langgraph

# Run the agent using uvicorn
CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"]
48 changes: 48 additions & 0 deletions examples/tutorials/00_sync/030_langgraph/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Tutorial 030: Sync LangGraph Agent

This tutorial demonstrates how to build a **synchronous** LangGraph agent on AgentEx with:
- Tool calling (ReAct pattern)
- Streaming token output
- Multi-turn conversation memory via AgentEx checkpointer
- Tracing integration

## Graph Structure

![Graph](graph.png)

## Key Concepts

### Sync ACP
The sync ACP model uses HTTP request/response for communication. The `@acp.on_message_send` handler receives a message and yields streaming events back to the client.

### LangGraph Integration
- **StateGraph**: Defines the agent's state machine with `AgentState` (message history)
- **ToolNode**: Automatically executes tool calls from the LLM
- **tools_condition**: Routes between tool execution and final response
- **Checkpointer**: Uses AgentEx's HTTP checkpointer for cross-request memory

### Streaming
The agent streams tokens as they're generated using `convert_langgraph_to_agentex_events()`, which converts LangGraph's stream events into AgentEx `TaskMessageUpdate` events.

## Files

| File | Description |
|------|-------------|
| `project/acp.py` | ACP server and message handler |
| `project/graph.py` | LangGraph state graph definition |
| `project/tools.py` | Tool definitions (weather example) |
| `tests/test_agent.py` | Integration tests |
| `manifest.yaml` | Agent configuration |

## Running Locally

```bash
# From this directory
agentex agents run
```

## Running Tests

```bash
pytest tests/test_agent.py -v
```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
58 changes: 58 additions & 0 deletions examples/tutorials/00_sync/030_langgraph/manifest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
build:
context:
root: ../../
include_paths:
- 00_sync/030_langgraph
- test_utils
dockerfile: 00_sync/030_langgraph/Dockerfile
dockerignore: 00_sync/030_langgraph/.dockerignore

local_development:
agent:
port: 8000
host_address: host.docker.internal
paths:
acp: project/acp.py

agent:
acp_type: sync
name: s030-langgraph
description: A sync LangGraph agent with tool calling and streaming

temporal:
enabled: false

credentials:
- env_var_name: OPENAI_API_KEY
secret_name: openai-api-key
secret_key: api-key
- env_var_name: REDIS_URL
secret_name: redis-url-secret
secret_key: url
- env_var_name: SGP_API_KEY
secret_name: sgp-api-key
secret_key: api-key
- env_var_name: SGP_ACCOUNT_ID
secret_name: sgp-account-id
secret_key: account-id
- env_var_name: SGP_CLIENT_BASE_URL
secret_name: sgp-client-base-url
secret_key: url

deployment:
image:
repository: ""
tag: "latest"

global:
agent:
name: "s030-langgraph"
description: "A sync LangGraph agent with tool calling and streaming"
replicaCount: 1
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1000m"
memory: "2Gi"
Empty file.
94 changes: 94 additions & 0 deletions examples/tutorials/00_sync/030_langgraph/project/acp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""
ACP (Agent Communication Protocol) handler for Agentex.

This is the API layer — it manages the graph lifecycle and streams
tokens and tool calls from the LangGraph graph to the Agentex frontend.
"""

from __future__ import annotations

import os
from typing import AsyncGenerator

from dotenv import load_dotenv

load_dotenv()

import agentex.lib.adk as adk
from project.graph import create_graph
from agentex.lib.adk import create_langgraph_tracing_handler, convert_langgraph_to_agentex_events
from agentex.lib.types.acp import SendMessageParams
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.types.task_message_delta import TextDelta
from agentex.types.task_message_update import TaskMessageUpdate
from agentex.types.task_message_content import TaskMessageContent
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config

logger = make_logger(__name__)

# Register the Agentex tracing processor so spans are shipped to the backend
add_tracing_processor_config(
SGPTracingProcessorConfig(
sgp_api_key=os.environ.get("SGP_API_KEY", ""),
sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""),
sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""),
))
# Create ACP server
acp = FastACP.create(acp_type="sync")

# Compiled graph (lazy-initialized on first request)
_graph = None


async def get_graph():
"""Get or create the compiled graph instance."""
global _graph
if _graph is None:
_graph = await create_graph()
return _graph


@acp.on_message_send
async def handle_message_send(
params: SendMessageParams,
) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]:
"""Handle incoming messages from Agentex, streaming tokens and tool calls."""
graph = await get_graph()

thread_id = params.task.id
user_message = params.content.content

logger.info(f"Processing message for thread {thread_id}")

async with adk.tracing.span(
trace_id=thread_id,
name="message",
input={"message": user_message},
data={"__span_type__": "AGENT_WORKFLOW"},
) as turn_span:
callback = create_langgraph_tracing_handler(
trace_id=thread_id,
parent_span_id=turn_span.id if turn_span else None,
)

stream = graph.astream(
{"messages": [{"role": "user", "content": user_message}]},
config={
"configurable": {"thread_id": thread_id},
"callbacks": [callback],
},
stream_mode=["messages", "updates"],
)

final_text = ""
async for event in convert_langgraph_to_agentex_events(stream):
# Accumulate text deltas for span output
delta = getattr(event, "delta", None)
if isinstance(delta, TextDelta) and delta.text_delta:
final_text += delta.text_delta
yield event

if turn_span:
turn_span.output = {"final_output": final_text}
73 changes: 73 additions & 0 deletions examples/tutorials/00_sync/030_langgraph/project/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
LangGraph graph definition.

Defines the state, nodes, edges, and compiles the graph.
The compiled graph is the boundary between this module and the API layer.
"""

from __future__ import annotations

from typing import Any, Annotated
from datetime import datetime
from typing_extensions import TypedDict

from langgraph.graph import START, StateGraph
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.messages import SystemMessage
from langgraph.graph.message import add_messages

from project.tools import TOOLS
from agentex.lib.adk import create_checkpointer

MODEL_NAME = "gpt-5"
SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools.

Current date and time: {timestamp}

Guidelines:
- Be concise and helpful
- Use tools when they would help answer the user's question
- If you're unsure, ask clarifying questions
- Always provide accurate information
"""


class AgentState(TypedDict):
"""State schema for the agent graph."""
messages: Annotated[list[Any], add_messages]


async def create_graph():
"""Create and compile the agent graph with checkpointer.

Returns:
A compiled LangGraph StateGraph ready for invocation.
"""
llm = ChatOpenAI(
model=MODEL_NAME,
reasoning={"effort": "high", "summary": "auto"},
)
llm_with_tools = llm.bind_tools(TOOLS)

checkpointer = await create_checkpointer()

def agent_node(state: AgentState) -> dict[str, Any]:
"""Process the current state and generate a response."""
messages = state["messages"]
if not messages or not isinstance(messages[0], SystemMessage):
system_content = SYSTEM_PROMPT.format(
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
messages = [SystemMessage(content=system_content)] + messages
response = llm_with_tools.invoke(messages)
return {"messages": [response]}

builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.add_node("tools", ToolNode(tools=TOOLS))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", tools_condition, "tools")
builder.add_edge("tools", "agent")

return builder.compile(checkpointer=checkpointer)
Loading