diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index 506a0d1039..87ecb194be 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -104,7 +104,7 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe yield break; // Exit if cancellation is requested } - bool hadRequestHaltEvent = false; + RequestHaltEvent? haltEvent = null; foreach (WorkflowEvent raisedEvent in Interlocked.Exchange(ref eventSink, [])) { if (linkedSource.Token.IsCancellationRequested) @@ -113,9 +113,9 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe } // TODO: Do we actually want to interpret this as a termination request? - if (raisedEvent is RequestHaltEvent) + if (raisedEvent is RequestHaltEvent halt) { - hadRequestHaltEvent = true; + haltEvent = halt; } else { @@ -123,9 +123,15 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe } } - if (hadRequestHaltEvent || linkedSource.Token.IsCancellationRequested) + if (haltEvent is not null || linkedSource.Token.IsCancellationRequested) { // If we had a completion event, we are done. + if (haltEvent is not null) + { + yield return new WorkflowCompletedEvent(haltEvent.Data); + runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + } + yield break; } @@ -143,7 +149,14 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe } } while (!ShouldBreak()); - runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + // Only signal workflow completion when the run has actually reached a terminal state. + // When breaking due to PendingRequests (paused, not completed) or cancellation, + // the stream ends silently — matching StreamingRunEventStream behavior. + if (this.RunStatus is RunStatus.Idle or RunStatus.Ended) + { + yield return new WorkflowCompletedEvent(); + runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + } } finally { diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index a09dedd8ad..b30747de74 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -227,6 +227,7 @@ public async IAsyncEnumerable TakeEventStreamAsync( // Note: PendingRequests is handled by WatchStreamAsync's do-while loop if (completionSignal.Status is RunStatus.Idle or RunStatus.Ended) { + yield return new WorkflowCompletedEvent(); yield break; } @@ -242,6 +243,7 @@ public async IAsyncEnumerable TakeEventStreamAsync( // RequestHaltEvent signals the end of the event stream if (evt is RequestHaltEvent) { + yield return new WorkflowCompletedEvent(evt.Data); yield break; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowCompletedEvent.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowCompletedEvent.cs new file mode 100644 index 0000000000..a34aa690b1 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowCompletedEvent.cs @@ -0,0 +1,10 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Event triggered when a workflow run reaches a terminal state and the event stream completes, +/// regardless of whether the workflow completed successfully or was halted early. +/// +/// The optional result produced by the workflow upon run completion. +public sealed class WorkflowCompletedEvent(object? result = null) : WorkflowEvent(data: result); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowEvent.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowEvent.cs index 76b379a611..b65d90fe32 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowEvent.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowEvent.cs @@ -12,6 +12,8 @@ namespace Microsoft.Agents.AI.Workflows; [JsonDerivedType(typeof(WorkflowStartedEvent))] [JsonDerivedType(typeof(WorkflowErrorEvent))] [JsonDerivedType(typeof(WorkflowWarningEvent))] +[JsonDerivedType(typeof(WorkflowCompletedEvent))] +[JsonDerivedType(typeof(WorkflowFailedEvent))] [JsonDerivedType(typeof(WorkflowOutputEvent))] [JsonDerivedType(typeof(RequestInfoEvent))] public class WorkflowEvent(object? data = null) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowFailedEvent.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowFailedEvent.cs new file mode 100644 index 0000000000..bfd43508c8 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowFailedEvent.cs @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Event triggered when a workflow fails with an error message. +/// +/// +/// Unlike , which requires an object, +/// this event supports failure scenarios where only an error message string is available +/// (e.g., errors from external orchestrators or deserialized error responses). +/// +/// The error message describing the failure. +public sealed class WorkflowFailedEvent(string errorMessage) : WorkflowEvent(errorMessage) +{ + /// + /// Gets the error message describing the failure. + /// + public string ErrorMessage => errorMessage; +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowCompletedEventTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowCompletedEventTests.cs new file mode 100644 index 0000000000..c8936c7246 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowCompletedEventTests.cs @@ -0,0 +1,185 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Agents.AI.Workflows.Sample; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +public class WorkflowCompletedEventTests +{ + [Fact] + public void WorkflowCompletedEvent_WithResult_SetsData() + { + var evt = new WorkflowCompletedEvent("done"); + + evt.Data.Should().Be("done"); + evt.Should().BeAssignableTo(); + } + + [Fact] + public void WorkflowCompletedEvent_WithoutResult_HasNullData() + { + var evt = new WorkflowCompletedEvent(); + + evt.Data.Should().BeNull(); + } + + [Fact] + public void WorkflowFailedEvent_HasErrorMessage() + { + var evt = new WorkflowFailedEvent("something broke"); + + evt.ErrorMessage.Should().Be("something broke"); + evt.Data.Should().Be("something broke"); + evt.Should().BeAssignableTo(); + evt.Should().NotBeAssignableTo(); + } + + [Fact] + public async Task WorkflowCompletedEvent_IsLastEvent_OffThreadAsync() + { + // Arrange + ForwardMessageExecutor executorA = new("A"); + ForwardMessageExecutor executorB = new("B"); + + Workflow workflow = new WorkflowBuilder(executorA) + .AddEdge(executorA, executorB) + .Build(); + + // Act + await using StreamingRun run = await InProcessExecution.OffThread + .RunStreamingAsync(workflow, "hello"); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + events.Should().NotBeEmpty(); + events.Last().Should().BeOfType(); + events.OfType().Should().NotBeEmpty( + "there should be executor events before the workflow completion event"); + } + + [Fact] + public async Task WorkflowCompletedEvent_IsLastEvent_LockstepAsync() + { + // Arrange + ForwardMessageExecutor executorA = new("A"); + ForwardMessageExecutor executorB = new("B"); + + Workflow workflow = new WorkflowBuilder(executorA) + .AddEdge(executorA, executorB) + .Build(); + + // Act + await using StreamingRun run = await InProcessExecution.Lockstep + .RunStreamingAsync(workflow, "hello"); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + events.Should().NotBeEmpty(); + events.Last().Should().BeOfType(); + events.OfType().Should().NotBeEmpty( + "there should be executor events before the workflow completion event"); + } + + [Fact] + public async Task WorkflowFailedEvent_CanBeEmittedByExecutorAsync() + { + // Arrange + FailingExecutor executor = new("Failing"); + + Workflow workflow = new WorkflowBuilder(executor).Build(); + + // Act + await using StreamingRun run = await InProcessExecution.OffThread + .RunStreamingAsync(workflow, "trigger"); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + events.OfType().Should().ContainSingle() + .Which.ErrorMessage.Should().Be("custom error from executor"); + } + + [Fact] + public async Task WorkflowCompletedEvent_NotEmitted_WhenPendingRequests_LockstepAsync() + { + // Arrange: Use a workflow that makes external requests (guessing game from Sample 04). + // The RequestPort is the entry executor — it immediately posts an external request + // and the workflow pauses with RunStatus.PendingRequests. + Workflow workflow = Step4EntryPoint.WorkflowInstance; + + // Act: Run in lockstep mode with blockOnPendingRequest: false so the stream exits + // when the workflow pauses instead of waiting for a response. + await using StreamingRun run = await InProcessExecution.Lockstep + .RunStreamingAsync(workflow, NumberSignal.Init); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false)) + { + events.Add(evt); + } + + // Assert: The workflow is paused (not completed), so WorkflowCompletedEvent should NOT appear. + events.Should().NotContain(e => e is WorkflowCompletedEvent, + "WorkflowCompletedEvent should not be emitted when the workflow is paused with pending requests"); + events.OfType().Should().NotBeEmpty( + "workflow should have emitted at least one external request before pausing"); + } + + [Fact] + public async Task WorkflowCompletedEvent_NotEmitted_WhenPendingRequests_OffThreadAsync() + { + // Arrange: Same workflow, but verify the streaming (off-thread) path is also correct. + Workflow workflow = Step4EntryPoint.WorkflowInstance; + + // Act + await using StreamingRun run = await InProcessExecution.OffThread + .RunStreamingAsync(workflow, NumberSignal.Init); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false)) + { + events.Add(evt); + } + + // Assert + events.Should().NotContain(e => e is WorkflowCompletedEvent, + "WorkflowCompletedEvent should not be emitted when the workflow is paused with pending requests"); + events.OfType().Should().NotBeEmpty( + "workflow should have emitted at least one external request before pausing"); + } +} + +/// +/// An executor that emits a and then requests halt. +/// +internal sealed class FailingExecutor(string id) : Executor(id) +{ + protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder) + { + protocolBuilder.RouteBuilder.AddHandler(async (message, ctx) => + { + await ctx.AddEventAsync(new WorkflowFailedEvent("custom error from executor")); + await ctx.RequestHaltAsync(); + }); + + return protocolBuilder; + } +}