From 130dc20f232254c3181e5851f7ca717b9b495237 Mon Sep 17 00:00:00 2001 From: max-montes <77820353+max-montes@users.noreply.github.com> Date: Fri, 6 Mar 2026 22:33:33 -0800 Subject: [PATCH 1/3] feat: add WorkflowCompletedEvent and WorkflowFailedEvent (#4063) Add two new public workflow event types to the core Workflows library: - WorkflowCompletedEvent: emitted when a workflow completes execution, providing the natural counterpart to WorkflowStartedEvent. Consumers can now observe workflow completion through the event stream instead of inferring it from stream termination. - WorkflowFailedEvent: supports string-based failure scenarios where no Exception object is available (e.g., errors from external orchestrators or deserialized error responses). Complements the existing WorkflowErrorEvent which requires an Exception. WorkflowCompletedEvent is automatically emitted in both streaming (StreamingRunEventStream) and lockstep (LockstepRunEventStream) execution modes before the event stream terminates. RequestHaltEvent remains internal as it serves as infrastructure plumbing for stream termination signaling. Closes #4063 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Execution/LockstepRunEventStream.cs | 6 + .../Execution/StreamingRunEventStream.cs | 2 + .../WorkflowCompletedEvent.cs | 9 ++ .../WorkflowEvent.cs | 2 + .../WorkflowFailedEvent.cs | 20 +++ .../WorkflowCompletedEventTests.cs | 135 ++++++++++++++++++ 6 files changed, 174 insertions(+) create mode 100644 dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowCompletedEvent.cs create mode 100644 dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowFailedEvent.cs create mode 100644 dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowCompletedEventTests.cs diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index 506a0d1039..c9a32619db 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -126,6 +126,11 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe if (hadRequestHaltEvent || linkedSource.Token.IsCancellationRequested) { // If we had a completion event, we are done. + if (hadRequestHaltEvent) + { + yield return new WorkflowCompletedEvent(); + } + yield break; } @@ -143,6 +148,7 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe } } while (!ShouldBreak()); + yield return new WorkflowCompletedEvent(); runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); } finally diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index a09dedd8ad..b30747de74 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -227,6 +227,7 @@ public async IAsyncEnumerable TakeEventStreamAsync( // Note: PendingRequests is handled by WatchStreamAsync's do-while loop if (completionSignal.Status is RunStatus.Idle or RunStatus.Ended) { + yield return new WorkflowCompletedEvent(); yield break; } @@ -242,6 +243,7 @@ public async IAsyncEnumerable TakeEventStreamAsync( // RequestHaltEvent signals the end of the event stream if (evt is RequestHaltEvent) { + yield return new WorkflowCompletedEvent(evt.Data); yield break; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowCompletedEvent.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowCompletedEvent.cs new file mode 100644 index 0000000000..b03b134007 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowCompletedEvent.cs @@ -0,0 +1,9 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Event triggered when a workflow completes execution successfully. +/// +/// The optional result produced by the workflow upon completion. +public sealed class WorkflowCompletedEvent(object? result = null) : WorkflowEvent(data: result); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowEvent.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowEvent.cs index 76b379a611..b65d90fe32 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowEvent.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowEvent.cs @@ -12,6 +12,8 @@ namespace Microsoft.Agents.AI.Workflows; [JsonDerivedType(typeof(WorkflowStartedEvent))] [JsonDerivedType(typeof(WorkflowErrorEvent))] [JsonDerivedType(typeof(WorkflowWarningEvent))] +[JsonDerivedType(typeof(WorkflowCompletedEvent))] +[JsonDerivedType(typeof(WorkflowFailedEvent))] [JsonDerivedType(typeof(WorkflowOutputEvent))] [JsonDerivedType(typeof(RequestInfoEvent))] public class WorkflowEvent(object? data = null) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowFailedEvent.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowFailedEvent.cs new file mode 100644 index 0000000000..bfd43508c8 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowFailedEvent.cs @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Event triggered when a workflow fails with an error message. +/// +/// +/// Unlike , which requires an object, +/// this event supports failure scenarios where only an error message string is available +/// (e.g., errors from external orchestrators or deserialized error responses). +/// +/// The error message describing the failure. +public sealed class WorkflowFailedEvent(string errorMessage) : WorkflowEvent(errorMessage) +{ + /// + /// Gets the error message describing the failure. + /// + public string ErrorMessage => errorMessage; +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowCompletedEventTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowCompletedEventTests.cs new file mode 100644 index 0000000000..1237cabe55 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowCompletedEventTests.cs @@ -0,0 +1,135 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using FluentAssertions; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +public class WorkflowCompletedEventTests +{ + [Fact] + public void WorkflowCompletedEvent_WithResult_SetsData() + { + var evt = new WorkflowCompletedEvent("done"); + + evt.Data.Should().Be("done"); + evt.Should().BeAssignableTo(); + } + + [Fact] + public void WorkflowCompletedEvent_WithoutResult_HasNullData() + { + var evt = new WorkflowCompletedEvent(); + + evt.Data.Should().BeNull(); + } + + [Fact] + public void WorkflowFailedEvent_HasErrorMessage() + { + var evt = new WorkflowFailedEvent("something broke"); + + evt.ErrorMessage.Should().Be("something broke"); + evt.Data.Should().Be("something broke"); + evt.Should().BeAssignableTo(); + evt.Should().NotBeAssignableTo(); + } + + [Fact] + public async Task WorkflowCompletedEvent_IsLastEvent_OffThreadAsync() + { + // Arrange + ForwardMessageExecutor executorA = new("A"); + ForwardMessageExecutor executorB = new("B"); + + Workflow workflow = new WorkflowBuilder(executorA) + .AddEdge(executorA, executorB) + .Build(); + + // Act + await using StreamingRun run = await InProcessExecution.OffThread + .RunStreamingAsync(workflow, "hello"); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + events.Should().NotBeEmpty(); + events.Last().Should().BeOfType(); + events.OfType().Should().NotBeEmpty( + "there should be executor events before the workflow completion event"); + } + + [Fact] + public async Task WorkflowCompletedEvent_IsLastEvent_LockstepAsync() + { + // Arrange + ForwardMessageExecutor executorA = new("A"); + ForwardMessageExecutor executorB = new("B"); + + Workflow workflow = new WorkflowBuilder(executorA) + .AddEdge(executorA, executorB) + .Build(); + + // Act + await using StreamingRun run = await InProcessExecution.Lockstep + .RunStreamingAsync(workflow, "hello"); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + events.Should().NotBeEmpty(); + events.Last().Should().BeOfType(); + events.OfType().Should().NotBeEmpty( + "there should be executor events before the workflow completion event"); + } + + [Fact] + public async Task WorkflowFailedEvent_CanBeEmittedByExecutorAsync() + { + // Arrange + FailingExecutor executor = new("Failing"); + + Workflow workflow = new WorkflowBuilder(executor).Build(); + + // Act + await using StreamingRun run = await InProcessExecution.OffThread + .RunStreamingAsync(workflow, "trigger"); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + events.OfType().Should().ContainSingle() + .Which.ErrorMessage.Should().Be("custom error from executor"); + } +} + +/// +/// An executor that emits a and then requests halt. +/// +internal sealed class FailingExecutor(string id) : Executor(id) +{ + protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder) + { + protocolBuilder.RouteBuilder.AddHandler(async (message, ctx) => + { + await ctx.AddEventAsync(new WorkflowFailedEvent("custom error from executor")); + await ctx.RequestHaltAsync(); + }); + + return protocolBuilder; + } +} From 834783cbd9f078a75c99d19fc22e298c6e8c25e6 Mon Sep 17 00:00:00 2001 From: max-montes <77820353+max-montes@users.noreply.github.com> Date: Sun, 8 Mar 2026 22:06:18 -0700 Subject: [PATCH 2/3] fix: address review feedback on WorkflowCompletedEvent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Update XML doc to clarify event fires on any terminal state, not just success - Add telemetry (ActivityEvent) on the RequestHaltEvent early-exit path to match the natural-completion path - Guard post-loop WorkflowCompletedEvent emission with RunStatus check: only emit when Idle or Ended, not when paused (PendingRequests) or cancelled — matching StreamingRunEventStream behavior - Add tests verifying WorkflowCompletedEvent is not emitted when the workflow is paused with pending external requests (both lockstep and off-thread modes) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Execution/LockstepRunEventStream.cs | 11 +++- .../WorkflowCompletedEvent.cs | 5 +- .../WorkflowCompletedEventTests.cs | 50 +++++++++++++++++++ 3 files changed, 62 insertions(+), 4 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index c9a32619db..940a270469 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -129,6 +129,7 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe if (hadRequestHaltEvent) { yield return new WorkflowCompletedEvent(); + runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); } yield break; @@ -148,8 +149,14 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe } } while (!ShouldBreak()); - yield return new WorkflowCompletedEvent(); - runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + // Only signal workflow completion when the run has actually reached a terminal state. + // When breaking due to PendingRequests (paused, not completed) or cancellation, + // the stream ends silently — matching StreamingRunEventStream behavior. + if (this.RunStatus is RunStatus.Idle or RunStatus.Ended) + { + yield return new WorkflowCompletedEvent(); + runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + } } finally { diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowCompletedEvent.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowCompletedEvent.cs index b03b134007..a34aa690b1 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowCompletedEvent.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowCompletedEvent.cs @@ -3,7 +3,8 @@ namespace Microsoft.Agents.AI.Workflows; /// -/// Event triggered when a workflow completes execution successfully. +/// Event triggered when a workflow run reaches a terminal state and the event stream completes, +/// regardless of whether the workflow completed successfully or was halted early. /// -/// The optional result produced by the workflow upon completion. +/// The optional result produced by the workflow upon run completion. public sealed class WorkflowCompletedEvent(object? result = null) : WorkflowEvent(data: result); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowCompletedEventTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowCompletedEventTests.cs index 1237cabe55..c8936c7246 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowCompletedEventTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowCompletedEventTests.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading.Tasks; using FluentAssertions; +using Microsoft.Agents.AI.Workflows.Sample; namespace Microsoft.Agents.AI.Workflows.UnitTests; @@ -115,6 +116,55 @@ public async Task WorkflowFailedEvent_CanBeEmittedByExecutorAsync() events.OfType().Should().ContainSingle() .Which.ErrorMessage.Should().Be("custom error from executor"); } + + [Fact] + public async Task WorkflowCompletedEvent_NotEmitted_WhenPendingRequests_LockstepAsync() + { + // Arrange: Use a workflow that makes external requests (guessing game from Sample 04). + // The RequestPort is the entry executor — it immediately posts an external request + // and the workflow pauses with RunStatus.PendingRequests. + Workflow workflow = Step4EntryPoint.WorkflowInstance; + + // Act: Run in lockstep mode with blockOnPendingRequest: false so the stream exits + // when the workflow pauses instead of waiting for a response. + await using StreamingRun run = await InProcessExecution.Lockstep + .RunStreamingAsync(workflow, NumberSignal.Init); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false)) + { + events.Add(evt); + } + + // Assert: The workflow is paused (not completed), so WorkflowCompletedEvent should NOT appear. + events.Should().NotContain(e => e is WorkflowCompletedEvent, + "WorkflowCompletedEvent should not be emitted when the workflow is paused with pending requests"); + events.OfType().Should().NotBeEmpty( + "workflow should have emitted at least one external request before pausing"); + } + + [Fact] + public async Task WorkflowCompletedEvent_NotEmitted_WhenPendingRequests_OffThreadAsync() + { + // Arrange: Same workflow, but verify the streaming (off-thread) path is also correct. + Workflow workflow = Step4EntryPoint.WorkflowInstance; + + // Act + await using StreamingRun run = await InProcessExecution.OffThread + .RunStreamingAsync(workflow, NumberSignal.Init); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false)) + { + events.Add(evt); + } + + // Assert + events.Should().NotContain(e => e is WorkflowCompletedEvent, + "WorkflowCompletedEvent should not be emitted when the workflow is paused with pending requests"); + events.OfType().Should().NotBeEmpty( + "workflow should have emitted at least one external request before pausing"); + } } /// From 840358e898c12ab5fd6e077b41d105b194690f85 Mon Sep 17 00:00:00 2001 From: max-montes <77820353+max-montes@users.noreply.github.com> Date: Sun, 8 Mar 2026 22:12:07 -0700 Subject: [PATCH 3/3] fix: preserve RequestHaltEvent payload in lockstep mode Capture RequestHaltEvent.Data and pass it to WorkflowCompletedEvent, matching StreamingRunEventStream behavior where halt result data is forwarded to the completion event. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Execution/LockstepRunEventStream.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index 940a270469..87ecb194be 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -104,7 +104,7 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe yield break; // Exit if cancellation is requested } - bool hadRequestHaltEvent = false; + RequestHaltEvent? haltEvent = null; foreach (WorkflowEvent raisedEvent in Interlocked.Exchange(ref eventSink, [])) { if (linkedSource.Token.IsCancellationRequested) @@ -113,9 +113,9 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe } // TODO: Do we actually want to interpret this as a termination request? - if (raisedEvent is RequestHaltEvent) + if (raisedEvent is RequestHaltEvent halt) { - hadRequestHaltEvent = true; + haltEvent = halt; } else { @@ -123,12 +123,12 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe } } - if (hadRequestHaltEvent || linkedSource.Token.IsCancellationRequested) + if (haltEvent is not null || linkedSource.Token.IsCancellationRequested) { // If we had a completion event, we are done. - if (hadRequestHaltEvent) + if (haltEvent is not null) { - yield return new WorkflowCompletedEvent(); + yield return new WorkflowCompletedEvent(haltEvent.Data); runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); }