Skip to content

Commit 86432d6

Browse files
authored
fix(tracing): use a span processor to add custom span attributes (#1104)
- Fix custom tracing span attributes for the kagent-adk, kagent-langgraph, kagent-crewai packages. - Add a custom span processor which can be used to include custom kagent span metrics for all spans created within a particular OTel trace context. - Use the custom span processor, and update the execute method of each agent framework package to set the custom span attributes in context. Signed-off-by: David Haifley <[email protected]>
1 parent 765917e commit 86432d6

File tree

5 files changed

+388
-234
lines changed

5 files changed

+388
-234
lines changed

python/packages/kagent-adk/src/kagent/adk/_agent_executor.py

Lines changed: 53 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import annotations
22

3-
import asyncio
43
import inspect
54
import logging
65
import uuid
@@ -29,6 +28,10 @@
2928
from typing_extensions import override
3029

3130
from kagent.core.a2a import TaskResultAggregator, get_kagent_metadata_key
31+
from kagent.core.tracing._span_processor import (
32+
clear_kagent_span_attributes,
33+
set_kagent_span_attributes,
34+
)
3235

3336
from .converters.event_converter import convert_event_to_a2a_events
3437
from .converters.request_converter import convert_a2a_request_to_adk_run_args
@@ -106,57 +109,72 @@ async def execute(
106109
if not context.message:
107110
raise ValueError("A2A request must have a message")
108111

109-
# for new task, create a task submitted event
110-
if not context.current_task:
111-
await event_queue.enqueue_event(
112-
TaskStatusUpdateEvent(
113-
task_id=context.task_id,
114-
status=TaskStatus(
115-
state=TaskState.submitted,
116-
message=context.message,
117-
timestamp=datetime.now(timezone.utc).isoformat(),
118-
),
119-
context_id=context.context_id,
120-
final=False,
121-
)
122-
)
112+
# Convert the a2a request to ADK run args
113+
run_args = convert_a2a_request_to_adk_run_args(context)
123114

124-
# Handle the request and publish updates to the event queue
125-
runner = await self._resolve_runner()
115+
# Prepare span attributes.
116+
span_attributes = {}
117+
if run_args.get("user_id"):
118+
span_attributes["kagent.user_id"] = run_args["user_id"]
119+
if context.task_id:
120+
span_attributes["gen_ai.task.id"] = context.task_id
121+
if run_args.get("session_id"):
122+
span_attributes["gen_ai.conversation.id"] = run_args["session_id"]
123+
124+
# Set kagent span attributes for all spans in context.
125+
context_token = set_kagent_span_attributes(span_attributes)
126126
try:
127-
await self._handle_request(context, event_queue, runner)
128-
except Exception as e:
129-
logger.error("Error handling A2A request: %s", e, exc_info=True)
130-
# Publish failure event
131-
try:
127+
# for new task, create a task submitted event
128+
if not context.current_task:
132129
await event_queue.enqueue_event(
133130
TaskStatusUpdateEvent(
134131
task_id=context.task_id,
135132
status=TaskStatus(
136-
state=TaskState.failed,
133+
state=TaskState.submitted,
134+
message=context.message,
137135
timestamp=datetime.now(timezone.utc).isoformat(),
138-
message=Message(
139-
message_id=str(uuid.uuid4()),
140-
role=Role.agent,
141-
parts=[Part(TextPart(text=str(e)))],
142-
),
143136
),
144137
context_id=context.context_id,
145-
final=True,
138+
final=False,
146139
)
147140
)
148-
except Exception as enqueue_error:
149-
logger.error("Failed to publish failure event: %s", enqueue_error, exc_info=True)
141+
142+
# Handle the request and publish updates to the event queue
143+
runner = await self._resolve_runner()
144+
try:
145+
await self._handle_request(context, event_queue, runner, run_args)
146+
except Exception as e:
147+
logger.error("Error handling A2A request: %s", e, exc_info=True)
148+
# Publish failure event
149+
try:
150+
await event_queue.enqueue_event(
151+
TaskStatusUpdateEvent(
152+
task_id=context.task_id,
153+
status=TaskStatus(
154+
state=TaskState.failed,
155+
timestamp=datetime.now(timezone.utc).isoformat(),
156+
message=Message(
157+
message_id=str(uuid.uuid4()),
158+
role=Role.agent,
159+
parts=[Part(TextPart(text=str(e)))],
160+
),
161+
),
162+
context_id=context.context_id,
163+
final=True,
164+
)
165+
)
166+
except Exception as enqueue_error:
167+
logger.error("Failed to publish failure event: %s", enqueue_error, exc_info=True)
168+
finally:
169+
clear_kagent_span_attributes(context_token)
150170

151171
async def _handle_request(
152172
self,
153173
context: RequestContext,
154174
event_queue: EventQueue,
155175
runner: Runner,
176+
run_args: dict[str, Any],
156177
):
157-
# Convert the a2a request to ADK run args
158-
run_args = convert_a2a_request_to_adk_run_args(context)
159-
160178
# ensure the session exists
161179
session = await self._prepare_session(context, run_args, runner)
162180

@@ -175,14 +193,6 @@ async def _handle_request(
175193

176194
await runner.session_service.append_event(session, system_event)
177195

178-
current_span = trace.get_current_span()
179-
if run_args["user_id"]:
180-
current_span.set_attribute("kagent.user_id", run_args["user_id"])
181-
if context.task_id:
182-
current_span.set_attribute("gen_ai.task.id", context.task_id)
183-
if run_args["session_id"]:
184-
current_span.set_attribute("gen_ai.converstation.id", run_args["session_id"])
185-
186196
# create invocation context
187197
invocation_context = runner._new_invocation_context(
188198
session=session,
@@ -236,7 +246,7 @@ async def _handle_request(
236246
),
237247
)
238248
)
239-
# public the final status update event
249+
# publish the final status update event
240250
await event_queue.enqueue_event(
241251
TaskStatusUpdateEvent(
242252
task_id=context.task_id,
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
"""Custom span processor to add kagent attributes to all spans in a request context."""
2+
3+
import logging
4+
from typing import Optional
5+
from contextvars import Token
6+
7+
from opentelemetry import context as otel_context
8+
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
9+
10+
logger = logging.getLogger(__name__)
11+
12+
KAGENT_ATTRIBUTES_KEY = "kagent_trace_span_attributes"
13+
14+
15+
class KagentAttributesSpanProcessor(SpanProcessor):
16+
"""A SpanProcessor that adds kagent-specific attributes to all spans."""
17+
18+
def on_start(self, span: Span, parent_context: Optional[otel_context.Context] = None) -> None:
19+
"""Called when a span is started. Adds kagent attributes if present in context."""
20+
try:
21+
ctx = parent_context if parent_context is not None else otel_context.get_current()
22+
attributes = ctx.get(KAGENT_ATTRIBUTES_KEY)
23+
24+
if attributes and isinstance(attributes, dict):
25+
for key, value in attributes.items():
26+
if value is not None:
27+
span.set_attribute(key, value)
28+
except Exception as e:
29+
logger.warning(f"Failed to add kagent attributes to span: {e}")
30+
31+
def on_end(self, span: ReadableSpan) -> None:
32+
"""Called when a span is ended. No action needed."""
33+
pass
34+
35+
def shutdown(self) -> None:
36+
"""Called when the tracer provider is shutdown."""
37+
pass
38+
39+
def force_flush(self, timeout_millis: int = 30000) -> bool:
40+
"""Force flush any buffered spans. No buffering in this processor."""
41+
return True
42+
43+
44+
def set_kagent_span_attributes(attributes: dict) -> Token[otel_context.Context]:
45+
"""Set kagent span attributes in the context.
46+
Args:
47+
attributes: Dictionary of kagent span attributes to store in context
48+
Returns:
49+
A context token that can be used to detach the context
50+
"""
51+
return otel_context.attach(otel_context.set_value(KAGENT_ATTRIBUTES_KEY, attributes))
52+
53+
54+
def clear_kagent_span_attributes(token: Token[otel_context.Context]) -> None:
55+
"""Clear kagent span attributes from the OpenTelemetry context.
56+
Args:
57+
token: The context token returned by set_kagent_span_attributes
58+
"""
59+
otel_context.detach(token)

python/packages/kagent-core/src/kagent/core/tracing/_utils.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
from opentelemetry.sdk.trace import TracerProvider
1818
from opentelemetry.sdk.trace.export import BatchSpanProcessor
1919

20+
from ._span_processor import KagentAttributesSpanProcessor
21+
2022

2123
def configure(fastapi_app: FastAPI | None = None):
2224
tracing_enabled = os.getenv("OTEL_TRACING_ENABLED", "false").lower() == "true"
@@ -38,13 +40,15 @@ def configure(fastapi_app: FastAPI | None = None):
3840
# Check if a TracerProvider already exists (e.g., set by CrewAI)
3941
current_provider = trace.get_tracer_provider()
4042
if isinstance(current_provider, TracerProvider):
41-
# TracerProvider already exists, just add our processor to it
43+
# TracerProvider already exists, just add our processors to it
4244
current_provider.add_span_processor(processor)
43-
logging.info("Added OTLP processor to existing TracerProvider")
45+
current_provider.add_span_processor(KagentAttributesSpanProcessor())
46+
logging.info("Added OTLP processors to existing TracerProvider")
4447
else:
4548
# No provider set, create new one
4649
tracer_provider = TracerProvider(resource=resource)
4750
tracer_provider.add_span_processor(processor)
51+
tracer_provider.add_span_processor(KagentAttributesSpanProcessor())
4852
trace.set_tracer_provider(tracer_provider)
4953
logging.info("Created new TracerProvider")
5054

0 commit comments

Comments
 (0)