Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ namespace Microsoft.Agents.AI.Workflows;
/// <summary>
/// Represents an event triggered when an agent produces a response.
/// </summary>
public class AgentResponseEvent : ExecutorEvent
public class AgentResponseEvent : WorkflowOutputEvent
{
/// <summary>
/// Initializes a new instance of the <see cref="AgentResponseEvent"/> class.
/// </summary>
/// <param name="executorId">The identifier of the executor that generated this event.</param>
/// <param name="response">The agent response.</param>
public AgentResponseEvent(string executorId, AgentResponse response) : base(executorId, data: response)
public AgentResponseEvent(string executorId, AgentResponse response) : base(response, executorId)
{
this.Response = Throw.IfNull(response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ namespace Microsoft.Agents.AI.Workflows;
/// <summary>
/// Represents an event triggered when an agent run produces an update.
/// </summary>
public class AgentResponseUpdateEvent : ExecutorEvent
public class AgentResponseUpdateEvent : WorkflowOutputEvent
{
/// <summary>
/// Initializes a new instance of the <see cref="AgentResponseUpdateEvent"/> class.
/// </summary>
/// <param name="executorId">The identifier of the executor that generated this event.</param>
/// <param name="update">The agent run response update.</param>
public AgentResponseUpdateEvent(string executorId, AgentResponseUpdate update) : base(executorId, data: update)
public AgentResponseUpdateEvent(string executorId, AgentResponseUpdate update) : base(update, executorId)
{
this.Update = Throw.IfNull(update);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,20 @@ private async ValueTask YieldOutputAsync(string sourceId, object output, Cancell
this.CheckEnded();
Throw.IfNull(output);

// Special-case AgentResponse and AgentResponseUpdate to create their specific event types
// and bypass the output filter (for backwards compatibility - these events were previously
// emitted directly via AddEventAsync without filtering)
if (output is AgentResponseUpdate update)
{
await this.AddEventAsync(new AgentResponseUpdateEvent(sourceId, update), cancellationToken).ConfigureAwait(false);
return;
}
else if (output is AgentResponse response)
{
await this.AddEventAsync(new AgentResponseEvent(sourceId, response), cancellationToken).ConfigureAwait(false);
return;
}

Executor sourceExecutor = await this.EnsureExecutorAsync(sourceId, tracer: null, cancellationToken).ConfigureAwait(false);
if (!sourceExecutor.CanOutput(output.GetType()))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ await this.EnsureSessionAsync(context, cancellationToken).ConfigureAwait(false),
List<AgentResponseUpdate> updates = [];
await foreach (AgentResponseUpdate update in agentStream.ConfigureAwait(false))
{
await context.AddEventAsync(new AgentResponseUpdateEvent(this.Id, update), cancellationToken).ConfigureAwait(false);
await context.YieldOutputAsync(update, cancellationToken).ConfigureAwait(false);
ExtractUnservicedRequests(update.Contents);
updates.Add(update);
}
Expand All @@ -200,7 +200,7 @@ await this.EnsureSessionAsync(context, cancellationToken).ConfigureAwait(false),

if (this._options.EmitAgentResponseEvents == true)
{
await context.AddEventAsync(new AgentResponseEvent(this.Id, response), cancellationToken).ConfigureAwait(false);
await context.YieldOutputAsync(response, cancellationToken).ConfigureAwait(false);
}

if (userInputRequests.Count > 0 || functionCalls.Count > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async Task AddUpdateAsync(AgentResponseUpdate update, CancellationToken cancella
updates.Add(update);
if (handoffState.TurnToken.EmitEvents is true)
{
await context.AddEventAsync(new AgentResponseUpdateEvent(this.Id, update), cancellationToken).ConfigureAwait(false);
await context.YieldOutputAsync(update, cancellationToken).ConfigureAwait(false);
}
}
});
Expand Down
22 changes: 18 additions & 4 deletions dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowOutputEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,37 @@

using System;
using System.Diagnostics.CodeAnalysis;
using System.Text.Json.Serialization;

namespace Microsoft.Agents.AI.Workflows;

/// <summary>
/// Event triggered when a workflow executor yields output.
/// </summary>
public sealed class WorkflowOutputEvent : WorkflowEvent
[JsonDerivedType(typeof(AgentResponseEvent))]
[JsonDerivedType(typeof(AgentResponseUpdateEvent))]
public class WorkflowOutputEvent : WorkflowEvent
{
internal WorkflowOutputEvent(object data, string sourceId) : base(data)
/// <summary>
/// Initializes a new instance of the <see cref="WorkflowOutputEvent"/> class.
/// </summary>
/// <param name="data">The output data.</param>
/// <param name="executorId">The identifier of the executor that yielded this output.</param>
public WorkflowOutputEvent(object data, string executorId) : base(data)
{
this.SourceId = sourceId;
this.ExecutorId = executorId;
}

/// <summary>
/// The unique identifier of the executor that yielded this output.
/// </summary>
public string SourceId { get; }
public string ExecutorId { get; }

/// <summary>
/// The unique identifier of the executor that yielded this output.
/// </summary>
[Obsolete("Use ExecutorId instead.")]
public string SourceId => this.ExecutorId;

/// <summary>
/// Determines whether the underlying data is of the specified type or a derived type.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.AI;

namespace Microsoft.Agents.AI.Workflows.UnitTests;

public class AgentEventsTests
{
/// <summary>
/// Regression test for https://github.com/microsoft/agent-framework/issues/2938
/// Verifies that WorkflowOutputEvent is triggered for agent workflows built with
/// WorkflowBuilder directly (without using AgentWorkflowBuilder helpers).
/// </summary>
[Fact]
public async Task WorkflowBuilder_WithAgents_EmitsWorkflowOutputEventAsync()
{
// Arrange - Build workflow using WorkflowBuilder directly (not AgentWorkflowBuilder.BuildSequential)
AIAgent agent1 = new TestEchoAgent("agent1");
AIAgent agent2 = new TestEchoAgent("agent2");

Workflow workflow = new WorkflowBuilder(agent1)
.AddEdge(agent1, agent2)
.Build();

// Act
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, new List<ChatMessage> { new(ChatRole.User, "Hello") });
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<WorkflowOutputEvent> outputEvents = new();
List<AgentResponseUpdateEvent> updateEvents = new();

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentResponseUpdateEvent updateEvt)
{
updateEvents.Add(updateEvt);
}

if (evt is WorkflowOutputEvent outputEvt)
{
outputEvents.Add(outputEvt);
}
}

// Assert - AgentResponseUpdateEvent should now be a WorkflowOutputEvent
Assert.NotEmpty(updateEvents);
Assert.NotEmpty(outputEvents);
// All update events should also be output events (since AgentResponseUpdateEvent now inherits from WorkflowOutputEvent)
Assert.All(updateEvents, updateEvt => Assert.Contains(updateEvt, outputEvents));
}

/// <summary>
/// Verifies that AgentResponseUpdateEvent inherits from WorkflowOutputEvent.
/// </summary>
[Fact]
public void AgentResponseUpdateEvent_IsWorkflowOutputEvent()
{
// Arrange
AgentResponseUpdate update = new(ChatRole.Assistant, "test");

// Act
AgentResponseUpdateEvent evt = new("executor1", update);

// Assert
Assert.IsAssignableFrom<WorkflowOutputEvent>(evt);
Assert.Equal("executor1", evt.ExecutorId);
Assert.Same(update, evt.Update);
Assert.Same(update, evt.Data);
}

/// <summary>
/// Verifies that AgentResponseEvent inherits from WorkflowOutputEvent.
/// </summary>
[Fact]
public void AgentResponseEvent_IsWorkflowOutputEvent()
{
// Arrange
AgentResponse response = new(new List<ChatMessage> { new(ChatRole.Assistant, "test") });

// Act
AgentResponseEvent evt = new("executor1", response);

// Assert
Assert.IsAssignableFrom<WorkflowOutputEvent>(evt);
Assert.Equal("executor1", evt.ExecutorId);
Assert.Same(response, evt.Response);
Assert.Same(response, evt.Data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static async ValueTask<string> RunAsync(TextWriter writer, Func<string, i
switch (evt)
{
case WorkflowOutputEvent outputEvent:
switch (outputEvent.SourceId)
switch (outputEvent.ExecutorId)
{
case JudgeId:
if (outputEvent.Is(out NumberSignal newSignal))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ await environment.StreamAsync(workflow, NumberSignal.Init, checkpointManager)
switch (evt)
{
case WorkflowOutputEvent outputEvent:
switch (outputEvent.SourceId)
switch (outputEvent.ExecutorId)
{
case Step4EntryPoint.JudgeId:
if (outputEvent.Is(out NumberSignal newSignal))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static async ValueTask<CheckpointInfo> RunAsync(TextWriter writer, string
{
foreach (ChatMessage message in messages)
{
writer.WriteLine($"{output.SourceId}: {message.Text}");
writer.WriteLine($"{output.ExecutorId}: {message.Text}");
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,20 @@ public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken ca
=> runnerContext.AddEventAsync(workflowEvent, cancellationToken);

public ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default)
=> this.AddEventAsync(new WorkflowOutputEvent(output, executorId), cancellationToken);
{
// Special-case AgentResponse and AgentResponseUpdate to create their specific event types
// (consistent with InProcessRunnerContext.YieldOutputAsync)
if (output is AgentResponseUpdate update)
{
return this.AddEventAsync(new AgentResponseUpdateEvent(executorId, update), cancellationToken);
}
else if (output is AgentResponse response)
{
return this.AddEventAsync(new AgentResponseEvent(executorId, response), cancellationToken);
}

return this.AddEventAsync(new WorkflowOutputEvent(output, executorId), cancellationToken);
}

public ValueTask RequestHaltAsync()
=> this.AddEventAsync(new RequestHaltEvent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken ca
public ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default)
{
this.YieldedOutputs.Enqueue(output);

// Special-case AgentResponse and AgentResponseUpdate to create their specific event types
// (consistent with InProcessRunnerContext.YieldOutputAsync)
if (output is AgentResponseUpdate update)
{
return this.AddEventAsync(new AgentResponseUpdateEvent(this._executorId, update), cancellationToken);
}
else if (output is AgentResponse response)
{
return this.AddEventAsync(new AgentResponseEvent(this._executorId, response), cancellationToken);
}

return this.AddEventAsync(new WorkflowOutputEvent(output, this._executorId), cancellationToken);
}

Expand Down
Loading