Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
yield break; // Exit if cancellation is requested
}

bool hadRequestHaltEvent = false;
RequestHaltEvent? haltEvent = null;
foreach (WorkflowEvent raisedEvent in Interlocked.Exchange(ref eventSink, []))
{
if (linkedSource.Token.IsCancellationRequested)
Expand All @@ -113,19 +113,25 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
}

// TODO: Do we actually want to interpret this as a termination request?
if (raisedEvent is RequestHaltEvent)
if (raisedEvent is RequestHaltEvent halt)
{
hadRequestHaltEvent = true;
haltEvent = halt;
}
else
{
yield return raisedEvent;
}
}

if (hadRequestHaltEvent || linkedSource.Token.IsCancellationRequested)
if (haltEvent is not null || linkedSource.Token.IsCancellationRequested)
{
// If we had a completion event, we are done.
if (haltEvent is not null)
{
yield return new WorkflowCompletedEvent(haltEvent.Data);
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
}

yield break;
}

Expand All @@ -143,7 +149,14 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
}
} while (!ShouldBreak());

runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
// Only signal workflow completion when the run has actually reached a terminal state.
// When breaking due to PendingRequests (paused, not completed) or cancellation,
// the stream ends silently — matching StreamingRunEventStream behavior.
if (this.RunStatus is RunStatus.Idle or RunStatus.Ended)
{
yield return new WorkflowCompletedEvent();
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
}
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(
// Note: PendingRequests is handled by WatchStreamAsync's do-while loop
if (completionSignal.Status is RunStatus.Idle or RunStatus.Ended)
{
yield return new WorkflowCompletedEvent();
yield break;
}

Expand All @@ -242,6 +243,7 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(
// RequestHaltEvent signals the end of the event stream
if (evt is RequestHaltEvent)
{
yield return new WorkflowCompletedEvent(evt.Data);
yield break;
}

Expand Down
10 changes: 10 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowCompletedEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Agents.AI.Workflows;

/// <summary>
/// Event triggered when a workflow run reaches a terminal state and the event stream completes,
/// regardless of whether the workflow completed successfully or was halted early.
/// </summary>
/// <param name="result">The optional result produced by the workflow upon run completion.</param>
public sealed class WorkflowCompletedEvent(object? result = null) : WorkflowEvent(data: result);
2 changes: 2 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ namespace Microsoft.Agents.AI.Workflows;
[JsonDerivedType(typeof(WorkflowStartedEvent))]
[JsonDerivedType(typeof(WorkflowErrorEvent))]
[JsonDerivedType(typeof(WorkflowWarningEvent))]
[JsonDerivedType(typeof(WorkflowCompletedEvent))]
[JsonDerivedType(typeof(WorkflowFailedEvent))]
[JsonDerivedType(typeof(WorkflowOutputEvent))]
[JsonDerivedType(typeof(RequestInfoEvent))]
public class WorkflowEvent(object? data = null)
Expand Down
20 changes: 20 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowFailedEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Agents.AI.Workflows;

/// <summary>
/// Event triggered when a workflow fails with an error message.
/// </summary>
/// <remarks>
/// Unlike <see cref="WorkflowErrorEvent"/>, which requires an <see cref="System.Exception"/> object,
/// this event supports failure scenarios where only an error message string is available
/// (e.g., errors from external orchestrators or deserialized error responses).
/// </remarks>
/// <param name="errorMessage">The error message describing the failure.</param>
public sealed class WorkflowFailedEvent(string errorMessage) : WorkflowEvent(errorMessage)
{
/// <summary>
/// Gets the error message describing the failure.
/// </summary>
public string ErrorMessage => errorMessage;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Agents.AI.Workflows.Sample;

namespace Microsoft.Agents.AI.Workflows.UnitTests;

public class WorkflowCompletedEventTests
{
[Fact]
public void WorkflowCompletedEvent_WithResult_SetsData()
{
var evt = new WorkflowCompletedEvent("done");

evt.Data.Should().Be("done");
evt.Should().BeAssignableTo<WorkflowEvent>();
}

[Fact]
public void WorkflowCompletedEvent_WithoutResult_HasNullData()
{
var evt = new WorkflowCompletedEvent();

evt.Data.Should().BeNull();
}

[Fact]
public void WorkflowFailedEvent_HasErrorMessage()
{
var evt = new WorkflowFailedEvent("something broke");

evt.ErrorMessage.Should().Be("something broke");
evt.Data.Should().Be("something broke");
evt.Should().BeAssignableTo<WorkflowEvent>();
evt.Should().NotBeAssignableTo<WorkflowErrorEvent>();
}

[Fact]
public async Task WorkflowCompletedEvent_IsLastEvent_OffThreadAsync()
{
// Arrange
ForwardMessageExecutor<string> executorA = new("A");
ForwardMessageExecutor<string> executorB = new("B");

Workflow workflow = new WorkflowBuilder(executorA)
.AddEdge(executorA, executorB)
.Build();

// Act
await using StreamingRun run = await InProcessExecution.OffThread
.RunStreamingAsync(workflow, "hello");

List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}

// Assert
events.Should().NotBeEmpty();
events.Last().Should().BeOfType<WorkflowCompletedEvent>();
events.OfType<ExecutorCompletedEvent>().Should().NotBeEmpty(
"there should be executor events before the workflow completion event");
}

[Fact]
public async Task WorkflowCompletedEvent_IsLastEvent_LockstepAsync()
{
// Arrange
ForwardMessageExecutor<string> executorA = new("A");
ForwardMessageExecutor<string> executorB = new("B");

Workflow workflow = new WorkflowBuilder(executorA)
.AddEdge(executorA, executorB)
.Build();

// Act
await using StreamingRun run = await InProcessExecution.Lockstep
.RunStreamingAsync(workflow, "hello");

List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}

// Assert
events.Should().NotBeEmpty();
events.Last().Should().BeOfType<WorkflowCompletedEvent>();
events.OfType<ExecutorCompletedEvent>().Should().NotBeEmpty(
"there should be executor events before the workflow completion event");
}

[Fact]
public async Task WorkflowFailedEvent_CanBeEmittedByExecutorAsync()
{
// Arrange
FailingExecutor executor = new("Failing");

Workflow workflow = new WorkflowBuilder(executor).Build();

// Act
await using StreamingRun run = await InProcessExecution.OffThread
.RunStreamingAsync(workflow, "trigger");

List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}

// Assert
events.OfType<WorkflowFailedEvent>().Should().ContainSingle()
.Which.ErrorMessage.Should().Be("custom error from executor");
}

[Fact]
public async Task WorkflowCompletedEvent_NotEmitted_WhenPendingRequests_LockstepAsync()
{
// Arrange: Use a workflow that makes external requests (guessing game from Sample 04).
// The RequestPort is the entry executor — it immediately posts an external request
// and the workflow pauses with RunStatus.PendingRequests.
Workflow workflow = Step4EntryPoint.WorkflowInstance;

// Act: Run in lockstep mode with blockOnPendingRequest: false so the stream exits
// when the workflow pauses instead of waiting for a response.
await using StreamingRun run = await InProcessExecution.Lockstep
.RunStreamingAsync(workflow, NumberSignal.Init);

List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false))
{
events.Add(evt);
}

// Assert: The workflow is paused (not completed), so WorkflowCompletedEvent should NOT appear.
events.Should().NotContain(e => e is WorkflowCompletedEvent,
"WorkflowCompletedEvent should not be emitted when the workflow is paused with pending requests");
events.OfType<RequestInfoEvent>().Should().NotBeEmpty(
"workflow should have emitted at least one external request before pausing");
}

[Fact]
public async Task WorkflowCompletedEvent_NotEmitted_WhenPendingRequests_OffThreadAsync()
{
// Arrange: Same workflow, but verify the streaming (off-thread) path is also correct.
Workflow workflow = Step4EntryPoint.WorkflowInstance;

// Act
await using StreamingRun run = await InProcessExecution.OffThread
.RunStreamingAsync(workflow, NumberSignal.Init);

List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false))
{
events.Add(evt);
}

// Assert
events.Should().NotContain(e => e is WorkflowCompletedEvent,
"WorkflowCompletedEvent should not be emitted when the workflow is paused with pending requests");
events.OfType<RequestInfoEvent>().Should().NotBeEmpty(
"workflow should have emitted at least one external request before pausing");
}
}

/// <summary>
/// An executor that emits a <see cref="WorkflowFailedEvent"/> and then requests halt.
/// </summary>
internal sealed class FailingExecutor(string id) : Executor(id)
{
protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder)
{
protocolBuilder.RouteBuilder.AddHandler<string>(async (message, ctx) =>
{
await ctx.AddEventAsync(new WorkflowFailedEvent("custom error from executor"));
await ctx.RequestHaltAsync();
});

return protocolBuilder;
}
}
Loading