From c19be05654a9d98574947c129ec1b48404a9352d Mon Sep 17 00:00:00 2001 From: Shyju Krishnankutty Date: Thu, 25 Jun 2026 10:51:51 -0700 Subject: [PATCH 1/6] Add support for AddSwitch in the durable workflow runner. --- dotnet/agent-framework-dotnet.slnx | 1 + .../09_SwitchRouting/09_SwitchRouting.csproj | 28 +++++ .../ConsoleApps/09_SwitchRouting/Executors.cs | 36 ++++++ .../ConsoleApps/09_SwitchRouting/Program.cs | 103 ++++++++++++++++ .../ConsoleApps/09_SwitchRouting/README.md | 83 +++++++++++++ .../Microsoft.Agents.AI.DurableTask/Logs.cs | 19 +++ .../Workflows/DurableSerialization.cs | 24 ++++ .../EdgeRouters/DurableDirectEdgeRouter.cs | 38 +----- .../Workflows/EdgeRouters/DurableEdgeMap.cs | 17 +++ .../EdgeRouters/DurableFanOutEdgeRouter.cs | 57 ++++++++- .../Workflows/WorkflowAnalyzer.cs | 22 ++++ .../Workflows/WorkflowGraphInfo.cs | 8 ++ .../WorkflowConsoleAppSamplesValidation.cs | 62 ++++++++++ .../Workflows/DurableEdgeMapSwitchTests.cs | 111 ++++++++++++++++++ 14 files changed, 567 insertions(+), 42 deletions(-) create mode 100644 dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/09_SwitchRouting.csproj create mode 100644 dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs create mode 100644 dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Program.cs create mode 100644 dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/README.md create mode 100644 dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index 3be35c1cda0..9e47424cd98 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -79,6 +79,7 @@ + diff --git a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/09_SwitchRouting.csproj b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/09_SwitchRouting.csproj new file mode 100644 index 00000000000..5db719ec651 --- /dev/null +++ b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/09_SwitchRouting.csproj @@ -0,0 +1,28 @@ + + + net10.0 + Exe + enable + enable + SwitchRouting + SwitchRouting + + + + + + + + + + + + + + + diff --git a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs new file mode 100644 index 00000000000..4fe9062d925 --- /dev/null +++ b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.Workflows; + +namespace SwitchRouting; + +internal sealed record Expense(string Id, decimal Amount); + +internal sealed class ExpenseParser() : Executor("ExpenseParser") +{ + public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default) + { + // The input is the expense amount (e.g., "50" or "4500"). A real workflow would + // look the expense up from a store; here we just attach a generated id. + decimal amount = decimal.TryParse(message, out decimal parsed) ? parsed : 0m; + return new Expense($"EXP-{Guid.NewGuid().ToString()[..4]}", amount); + } +} + +internal sealed class AutoApprove() : Executor("AutoApprove") +{ + public override async ValueTask HandleAsync(Expense message, IWorkflowContext context, CancellationToken cancellationToken = default) + => $"Expense {message.Id} for {message.Amount:C} was auto-approved."; +} + +internal sealed class ManagerApproval() : Executor("ManagerApproval") +{ + public override async ValueTask HandleAsync(Expense message, IWorkflowContext context, CancellationToken cancellationToken = default) + => $"Expense {message.Id} for {message.Amount:C} was routed to a manager for approval."; +} + +internal sealed class DirectorApproval() : Executor("DirectorApproval") +{ + public override async ValueTask HandleAsync(Expense message, IWorkflowContext context, CancellationToken cancellationToken = default) + => $"Expense {message.Id} for {message.Amount:C} was routed to a director for approval."; +} diff --git a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Program.cs b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Program.cs new file mode 100644 index 00000000000..7eebd4c1262 --- /dev/null +++ b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Program.cs @@ -0,0 +1,103 @@ +// Copyright (c) Microsoft. All rights reserved. + +// This sample demonstrates multi-way routing in a workflow using AddSwitch. +// An expense is routed to a different approval path based on its amount: +// - Amount < 100 -> AutoApprove +// - Amount < 1000 -> ManagerApproval +// - Otherwise -> DirectorApproval (default) +// +// Unlike AddEdge(..., condition:), which adds an independent boolean condition per edge, +// AddSwitch evaluates its cases in order and routes to the FIRST matching branch (or the +// default when none match), making it a natural fit for multi-way routing. + +using Microsoft.Agents.AI.DurableTask; +using Microsoft.Agents.AI.DurableTask.Workflows; +using Microsoft.Agents.AI.Workflows; +using Microsoft.DurableTask.Client.AzureManaged; +using Microsoft.DurableTask.Worker.AzureManaged; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using SwitchRouting; + +string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING") + ?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"; + +// Create executor instances +ExpenseParser expenseParser = new(); +AutoApprove autoApprove = new(); +ManagerApproval managerApproval = new(); +DirectorApproval directorApproval = new(); + +// Build a workflow that switches on the parsed expense amount. Cases are evaluated in order; +// the first matching case wins, and WithDefault handles everything else. +WorkflowBuilder builder = new(expenseParser); +builder.AddSwitch(expenseParser, switchBuilder => + switchBuilder + .AddCase(expense => expense!.Amount < 100m, autoApprove) + .AddCase(expense => expense!.Amount < 1000m, managerApproval) + .WithDefault(directorApproval)); + +Workflow approveExpense = builder.WithName("ApproveExpense").Build(); + +IHost host = Host.CreateDefaultBuilder(args) +.ConfigureLogging(logging => logging.SetMinimumLevel(LogLevel.Warning)) +.ConfigureServices(services => +{ + services.ConfigureDurableWorkflows( + workflowOptions => workflowOptions.AddWorkflow(approveExpense), + workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString), + clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString)); +}) +.Build(); + +await host.StartAsync(); + +IWorkflowClient workflowClient = host.Services.GetRequiredService(); + +Console.WriteLine("Enter an expense amount (or 'exit'):"); +Console.WriteLine("Tip: try 50, 450, and 5000 to see each branch.\n"); + +while (true) +{ + Console.Write("> "); + string? input = Console.ReadLine(); + if (string.IsNullOrWhiteSpace(input) || input.Equals("exit", StringComparison.OrdinalIgnoreCase)) + { + break; + } + + try + { + await StartNewWorkflowAsync(input, approveExpense, workflowClient); + } + catch (Exception ex) + { + Console.WriteLine($"Error: {ex.Message}"); + } + + Console.WriteLine(); +} + +await host.StopAsync(); + +// Start a new workflow and wait for completion +static async Task StartNewWorkflowAsync(string amount, Workflow workflow, IWorkflowClient client) +{ + Console.WriteLine($"Starting workflow for expense amount '{amount}'..."); + + // Cast to IAwaitableWorkflowRun to access WaitForCompletionAsync + IAwaitableWorkflowRun run = (IAwaitableWorkflowRun)await client.RunAsync(workflow, amount); + Console.WriteLine($"Run ID: {run.RunId}"); + + try + { + Console.WriteLine("Waiting for workflow to complete..."); + string? result = await run.WaitForCompletionAsync(); + Console.WriteLine($"Workflow completed. {result}"); + } + catch (InvalidOperationException ex) + { + Console.WriteLine($"Failed: {ex.Message}"); + } +} diff --git a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/README.md b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/README.md new file mode 100644 index 00000000000..b3de9fbc686 --- /dev/null +++ b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/README.md @@ -0,0 +1,83 @@ +# Switch Routing Workflow Sample + +This sample demonstrates how to build a workflow with **multi-way routing** using `AddSwitch`. A switch evaluates a set of ordered cases against the output of an executor and routes execution to the **first matching** branch — or to a **default** branch when no case matches. + +> **Related sample:** [`03_ConditionalEdges`](../03_ConditionalEdges/README.md) solves a similar branching problem with a different API — `AddEdge(..., condition:)` for per-edge boolean conditions. See [`AddSwitch` vs. conditional edges](#addswitch-vs-conditional-edges) below for when to use which. + +## Key Concepts Demonstrated + +- Building workflows with **multi-way routing** using `AddSwitch` and `AddCase` / `WithDefault` +- Ordered, first-match-wins case evaluation +- Falling back to a default branch when no case matches +- Using `ConfigureDurableWorkflows` to register workflows with dependency injection + +## Overview + +The sample implements an expense approval workflow that routes each expense to a different approval path based on its amount: + +``` +ExpenseParser --[amount < 100]---> AutoApprove + |--[amount < 1000]--> ManagerApproval + +--[default]--------> DirectorApproval +``` + +| Executor | Description | +|----------|-------------| +| ExpenseParser | Parses the entered amount into an `Expense` | +| AutoApprove | Auto-approves small expenses (`< 100`) | +| ManagerApproval | Routes mid-range expenses (`< 1000`) to a manager | +| DirectorApproval | Default branch for everything else (`>= 1000`) | + +## How `AddSwitch` Works + +`AddSwitch` configures a switch on the output of a source executor. Cases are evaluated **in order**, and the **first** matching case wins; `WithDefault` handles anything that matches no case: + +```csharp +builder.AddSwitch(expenseParser, switchBuilder => + switchBuilder + .AddCase(expense => expense!.Amount < 100m, autoApprove) + .AddCase(expense => expense!.Amount < 1000m, managerApproval) + .WithDefault(directorApproval)); +``` + +Each case predicate receives the output of the source executor and returns a boolean. + +### `AddSwitch` vs. conditional edges + +The [`03_ConditionalEdges`](../03_ConditionalEdges/README.md) sample uses `AddEdge(..., condition:)`, where each edge carries its own **independent** boolean condition (an edge is traversed whenever its condition is true). `AddSwitch` instead models **mutually exclusive, first-match-wins** routing with a single default — a better fit when exactly one of several branches should run. + +## Environment Setup + +See the [README.md](../../README.md) file in the parent directory for information on configuring the environment, including how to install and run the Durable Task Scheduler. + +## Running the Sample + +```bash +cd dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting +dotnet run --framework net10.0 +``` + +### Sample Output + +```text +Enter an expense amount (or 'exit'): +Tip: try 50, 450, and 5000 to see each branch. + +> 50 +Starting workflow for expense amount '50'... +Run ID: abc123... +Waiting for workflow to complete... +Workflow completed. Expense EXP-1a2b for $50.00 was auto-approved. + +> 450 +Starting workflow for expense amount '450'... +Run ID: def456... +Waiting for workflow to complete... +Workflow completed. Expense EXP-3c4d for $450.00 was routed to a manager for approval. + +> 5000 +Starting workflow for expense amount '5000'... +Run ID: ghi789... +Waiting for workflow to complete... +Workflow completed. Expense EXP-5e6f for $5,000.00 was routed to a director for approval. +``` diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs index 57ef010a2f9..76918d7a5e5 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs @@ -227,4 +227,23 @@ public static partial void LogWaitingForExternalEvent( public static partial void LogReceivedExternalEvent( this ILogger logger, string requestPortId); + + [LoggerMessage( + EventId = 114, + Level = LogLevel.Debug, + Message = "Fan-out from {Source}: selector matched {SelectedCount} of {TotalCount} target(s)")] + public static partial void LogFanOutSelectorMatched( + this ILogger logger, + string source, + int selectedCount, + int totalCount); + + [LoggerMessage( + EventId = 115, + Level = LogLevel.Warning, + Message = "Failed to evaluate fan-out selector for source {Source}, skipping")] + public static partial void LogFanOutSelectorEvaluationFailed( + this ILogger logger, + Exception ex, + string source); } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableSerialization.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableSerialization.cs index 245ec36fb83..11fbc894562 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableSerialization.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableSerialization.cs @@ -1,5 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. +using System.Diagnostics.CodeAnalysis; using System.Text.Json; namespace Microsoft.Agents.AI.DurableTask.Workflows; @@ -19,4 +20,27 @@ internal static class DurableSerialization PropertyNamingPolicy = JsonNamingPolicy.CamelCase, PropertyNameCaseInsensitive = true }; + + /// + /// Deserializes a workflow message's JSON to the source executor's output type so an edge condition or + /// fan-out selector can evaluate it as a strongly-typed value. Falls back to a generic object when the + /// type is unknown, and returns null for empty input. + /// + /// The serialized message. + /// The source executor's output type, or null if unknown. + /// The deserialized object, or null if the JSON is empty. + /// Thrown when the JSON is invalid or cannot be deserialized to the target type. + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow types registered at startup.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow types registered at startup.")] + internal static object? DeserializeMessage(string json, Type? targetType) + { + if (string.IsNullOrEmpty(json)) + { + return null; + } + + return targetType is null + ? JsonSerializer.Deserialize(json, Options) + : JsonSerializer.Deserialize(json, targetType, Options); + } } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableDirectEdgeRouter.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableDirectEdgeRouter.cs index 3f780931839..54a11a15a3e 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableDirectEdgeRouter.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableDirectEdgeRouter.cs @@ -29,8 +29,6 @@ // Enqueue to // D's queue -using System.Diagnostics.CodeAnalysis; -using System.Text.Json; using Microsoft.Extensions.Logging; namespace Microsoft.Agents.AI.DurableTask.Workflows.EdgeRouters; @@ -88,7 +86,7 @@ public void RouteMessage( { try { - object? messageObj = DeserializeForCondition(envelope.Message, this._sourceOutputType); + object? messageObj = DurableSerialization.DeserializeMessage(envelope.Message, this._sourceOutputType); if (!this._condition(messageObj)) { logger.LogEdgeConditionFalse(this._sourceId, this._sinkId); @@ -106,40 +104,6 @@ public void RouteMessage( EnqueueMessage(messageQueues, this._sinkId, envelope); } - /// - /// Deserializes a JSON message to an object for condition evaluation. - /// - /// - /// Messages travel through the durable workflow as serialized JSON strings, but condition - /// delegates need typed objects to evaluate (e.g., order => order.Status == "Approved"). - /// This method converts the JSON back to an object the condition delegate can evaluate. - /// - /// The JSON string representation of the message. - /// - /// The expected type of the message. When provided, enables strongly-typed deserialization - /// so the condition function receives the correct type to evaluate against. - /// - /// - /// The deserialized object, or null if the JSON is empty. - /// - /// Thrown when the JSON is invalid or cannot be deserialized to the target type. - [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow types registered at startup.")] - [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow types registered at startup.")] - private static object? DeserializeForCondition(string json, Type? targetType) - { - if (string.IsNullOrEmpty(json)) - { - return null; - } - - // If we know the source executor's output type, deserialize to that specific type - // so the condition function can access strongly-typed properties. - // Otherwise, deserialize as a generic object for basic inspection. - return targetType is null - ? JsonSerializer.Deserialize(json, DurableSerialization.Options) - : JsonSerializer.Deserialize(json, targetType, DurableSerialization.Options); - } - private static void EnqueueMessage( Dictionary> queues, string executorId, diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs index 69b8b7cc1c8..6ff03b94e90 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs @@ -101,6 +101,23 @@ internal DurableEdgeMap(WorkflowGraphInfo graphInfo) graphInfo.ExecutorOutputTypes.TryGetValue(sourceId, out Type? sourceOutputType); + // A switch (AddSwitch) or target-selecting fan-out edge is represented as a single + // fan-out edge with an assigner. Build a fan-out router that evaluates the assigner so + // only the selected targets receive the message, mirroring the in-process FanOutEdgeRunner. + if (graphInfo.FanOutRoutings.TryGetValue(sourceId, out (List SinkIds, Func> Assigner) fanOutRouting)) + { + List orderedRouters = []; + foreach (string sinkId in fanOutRouting.SinkIds) + { + orderedRouters.Add(new DurableDirectEdgeRouter(sourceId, sinkId, condition: null, sourceOutputType)); + } + + this._routersBySource[sourceId] = + [new DurableFanOutEdgeRouter(sourceId, orderedRouters, fanOutRouting.Assigner, sourceOutputType)]; + + continue; + } + List routers = []; foreach (string sinkId in successorIds) { diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs index f13a0def923..d067d7a3074 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs @@ -36,16 +36,33 @@ internal sealed class DurableFanOutEdgeRouter : IDurableEdgeRouter { private readonly string _sourceId; private readonly List _targetRouters; + private readonly Func>? _edgeAssigner; + private readonly Type? _sourceOutputType; /// /// Initializes a new instance of . /// /// The source executor ID. /// The routers for each target executor. - internal DurableFanOutEdgeRouter(string sourceId, List targetRouters) + /// + /// Optional target selector. When provided (e.g., for a switch built via AddSwitch), it maps the + /// incoming message to the indices of that should receive it, so only the + /// selected targets run. When null, the message is forwarded to all targets. + /// + /// + /// The output type of the source executor, used to deserialize the JSON message before evaluating the + /// . Ignored when is null. + /// + internal DurableFanOutEdgeRouter( + string sourceId, + List targetRouters, + Func>? edgeAssigner = null, + Type? sourceOutputType = null) { this._sourceId = sourceId; this._targetRouters = targetRouters; + this._edgeAssigner = edgeAssigner; + this._sourceOutputType = sourceOutputType; } /// @@ -54,14 +71,44 @@ public void RouteMessage( Dictionary> messageQueues, ILogger logger) { - if (logger.IsEnabled(LogLevel.Debug)) + // No assigner: plain fan-out, forward the message to every target. + if (this._edgeAssigner is null) { - logger.LogDebug("Fan-Out from {Source}: routing to {Count} targets", this._sourceId, this._targetRouters.Count); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug("Fan-Out from {Source}: routing to {Count} targets", this._sourceId, this._targetRouters.Count); + } + + foreach (IDurableEdgeRouter targetRouter in this._targetRouters) + { + targetRouter.RouteMessage(envelope, messageQueues, logger); + } + + return; + } + + // Assigner present (e.g., a switch): select only the matching targets, mirroring the + // in-process FanOutEdgeRunner. The assigner returns indices into the ordered target list. + List selectedIndices; + try + { + object? messageObj = DurableSerialization.DeserializeMessage(envelope.Message, this._sourceOutputType); + selectedIndices = this._edgeAssigner(messageObj, this._targetRouters.Count) + .Where(i => i >= 0 && i < this._targetRouters.Count) + .Distinct() + .ToList(); + } + catch (Exception ex) + { + logger.LogFanOutSelectorEvaluationFailed(ex, this._sourceId); + return; } - foreach (IDurableEdgeRouter targetRouter in this._targetRouters) + logger.LogFanOutSelectorMatched(this._sourceId, selectedIndices.Count, this._targetRouters.Count); + + foreach (int index in selectedIndices) { - targetRouter.RouteMessage(envelope, messageQueues, logger); + this._targetRouters[index].RouteMessage(envelope, messageQueues, logger); } } } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowAnalyzer.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowAnalyzer.cs index bb4d2956166..6061bd7b6b3 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowAnalyzer.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowAnalyzer.cs @@ -123,6 +123,7 @@ private static void PopulateGraphFromEdges(WorkflowGraphInfo graphInfo, Dictiona { AddSuccessorsFromEdge(graphInfo, sourceId, edge, successors); TryAddEdgeCondition(graphInfo, edge); + TryAddFanOutRouting(graphInfo, edge); } } } @@ -167,6 +168,27 @@ private static void TryAddEdgeCondition(WorkflowGraphInfo graphInfo, Edge edge) } } + /// + /// Captures the target-selecting assigner from a fan-out edge if present. + /// + /// + /// A switch (AddSwitch) or a target-selecting fan-out edge (AddFanOutEdge with a + /// target selector) is represented as a single carrying an + /// EdgeAssigner. The assigner maps an incoming message to the subset of targets that should + /// receive it. Capturing it here lets the durable runtime route to only the selected target(s). + /// + /// The graph info to update. + /// The edge that may be a fan-out edge with an assigner. + private static void TryAddFanOutRouting(WorkflowGraphInfo graphInfo, Edge edge) + { + FanOutEdgeData? fanOutEdge = edge.FanOutEdgeData; + + if (fanOutEdge?.EdgeAssigner is not null) + { + graphInfo.FanOutRoutings[fanOutEdge.SourceId] = (fanOutEdge.SinkIds, fanOutEdge.EdgeAssigner); + } + } + /// /// Extracts the output type from an executor type by walking the inheritance chain. /// diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowGraphInfo.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowGraphInfo.cs index a504a07b13a..ed23b239a58 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowGraphInfo.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowGraphInfo.cs @@ -95,4 +95,12 @@ internal sealed class WorkflowGraphInfo /// Maps executor IDs to their output types (for proper deserialization during condition evaluation). /// public Dictionary ExecutorOutputTypes { get; } = []; + + /// + /// Maps a source executor ID to a fan-out target selector produced by a switch (AddSwitch) or a + /// target-selecting fan-out edge. The Assigner maps an incoming message to the indices of the + /// ordered SinkIds that should receive it, so only the selected targets run (mirroring the + /// in-process FanOutEdgeData.EdgeAssigner). + /// + public Dictionary SinkIds, Func> Assigner)> FanOutRoutings { get; } = []; } diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs index 4624b6fb792..ee83d06879a 100644 --- a/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs @@ -173,6 +173,68 @@ await this.RunSampleTestAsync(samplePath, async (process, logs) => }); } + [RetryFact(2, 5000)] + public async Task SwitchRoutingWorkflowSampleValidationAsync() + { + using CancellationTokenSource testTimeoutCts = this.CreateTestTimeoutCts(s_testTimeout); + string samplePath = Path.Combine(s_samplesPath, "09_SwitchRouting"); + + await this.RunSampleTestAsync(samplePath, async (process, logs) => + { + bool autoApproveSent = false; + bool managerApprovalSent = false; + bool directorApprovalSent = false; + bool autoApproveCompleted = false; + bool managerApprovalCompleted = false; + bool directorApprovalCompleted = false; + + string? line; + while ((line = this.ReadLogLine(logs, testTimeoutCts.Token)) != null) + { + // Amount < 100 routes to the first switch case (AutoApprove). + if (!autoApproveSent && line.Contains("Enter an expense amount", StringComparison.OrdinalIgnoreCase)) + { + await this.WriteInputAsync(process, "50", testTimeoutCts.Token); + autoApproveSent = true; + } + + if (autoApproveSent && !autoApproveCompleted && + line.Contains("was auto-approved", StringComparison.OrdinalIgnoreCase)) + { + autoApproveCompleted = true; + + // Amount < 1000 routes to the second switch case (ManagerApproval). + await this.WriteInputAsync(process, "450", testTimeoutCts.Token); + managerApprovalSent = true; + } + + if (managerApprovalSent && !managerApprovalCompleted && + line.Contains("routed to a manager", StringComparison.OrdinalIgnoreCase)) + { + managerApprovalCompleted = true; + + // Everything else routes to the default branch (DirectorApproval). + await this.WriteInputAsync(process, "5000", testTimeoutCts.Token); + directorApprovalSent = true; + } + + if (directorApprovalSent && line.Contains("routed to a director", StringComparison.OrdinalIgnoreCase)) + { + directorApprovalCompleted = true; + break; + } + + this.AssertNoError(line); + } + + Assert.True(autoApproveCompleted, "Expense < 100 did not route to AutoApprove."); + Assert.True(managerApprovalCompleted, "Expense < 1000 did not route to ManagerApproval."); + Assert.True(directorApprovalCompleted, "Expense >= 1000 did not route to the default DirectorApproval branch."); + + await this.WriteInputAsync(process, "exit", testTimeoutCts.Token); + }); + } + private void AssertNoError(string line) { if (line.Contains("Failed:", StringComparison.OrdinalIgnoreCase) || diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs new file mode 100644 index 00000000000..34dd21f2764 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs @@ -0,0 +1,111 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text.Json; +using Microsoft.Agents.AI.DurableTask.Workflows; +using Microsoft.Agents.AI.DurableTask.Workflows.EdgeRouters; +using Microsoft.Agents.AI.Workflows; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Microsoft.Agents.AI.DurableTask.UnitTests.Workflows; + +/// +/// Tests for AddSwitch support in durable workflows. +/// A switch (AddSwitch) reduces to a single fan-out edge carrying an +/// EdgeAssigner that selects the matching case's target(s). The durable +/// routing layer must honor that selection so only the matching executor runs. +/// +public sealed class DurableEdgeMapSwitchTests +{ + private const string RouterId = "router"; + private const string EvenId = "evenSink"; + private const string OddId = "oddSink"; + + [Theory] + [InlineData(4, EvenId, OddId)] // even -> first case matches + [InlineData(7, OddId, EvenId)] // odd -> second case matches + public void RouteMessage_Switch_RoutesToMatchingCaseOnly(int number, string expected, string notExpected) + { + // Arrange: a switch with two mutually exclusive cases. + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n % 2 == 0, Sink(EvenId)); + sb.AddCase(n => n % 2 != 0, Sink(OddId)); + }); + + // Act + Dictionary> queues = Route(builder.Build(), number); + + // Assert: only the matching branch receives the message. + Assert.Equal(1, QueuedCount(queues, expected)); + Assert.Equal(0, QueuedCount(queues, notExpected)); + } + + [Fact] + public void RouteMessage_NoCaseMatches_RoutesToDefaultExecutorOnly() + { + // Arrange: a switch whose only case never matches, plus a default branch. + const string MatchId = "matchSink"; + const string DefaultId = "defaultSink"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n > 1000, Sink(MatchId)); + sb.WithDefault(Sink(DefaultId)); + }); + + // Act: 5 does not match the case, so it must fall through to the default. + Dictionary> queues = Route(builder.Build(), 5); + + // Assert: only the default branch receives the message. + Assert.Equal(1, QueuedCount(queues, DefaultId)); + Assert.Equal(0, QueuedCount(queues, MatchId)); + } + + [Fact] + public void RouteMessage_FanOutWithoutSelector_RoutesToAllTargets() + { + // Arrange: a plain fan-out edge (no target selector) must still reach every target. + const string TargetAId = "targetA"; + const string TargetBId = "targetB"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddFanOutEdge(router, [Sink(TargetAId), Sink(TargetBId)]); + + // Act + Dictionary> queues = Route(builder.Build(), 42); + + // Assert: both targets receive the message. + Assert.Equal(1, QueuedCount(queues, TargetAId)); + Assert.Equal(1, QueuedCount(queues, TargetBId)); + } + + private static FunctionExecutor Router() + => new(RouterId, (input, _, _) => input, outputTypes: [typeof(int)]); + + private static FunctionExecutor Sink(string id) + => new(id, (_, _, _) => default); + + private static Dictionary> Route(Workflow workflow, int number) + { + WorkflowGraphInfo graphInfo = WorkflowAnalyzer.BuildGraphInfo(workflow); + DurableEdgeMap edgeMap = new(graphInfo); + Dictionary> queues = []; + + edgeMap.RouteMessage( + RouterId, + JsonSerializer.Serialize(number, DurableSerialization.Options), + typeof(int).AssemblyQualifiedName, + queues, + NullLogger.Instance); + + return queues; + } + + private static int QueuedCount(Dictionary> queues, string executorId) + => queues.TryGetValue(executorId, out Queue? queue) ? queue.Count : 0; +} From f9447f0bc4ec2b6a28649c7d22aaff841c8edcdb Mon Sep 17 00:00:00 2001 From: Shyju Krishnankutty Date: Thu, 25 Jun 2026 11:34:14 -0700 Subject: [PATCH 2/6] Fixed to address PR feedback, --- .../CHANGELOG.md | 1 + .../Workflows/EdgeRouters/DurableEdgeMap.cs | 45 +++++++-- .../EdgeRouters/DurableFanOutEdgeRouter.cs | 19 ++-- .../Workflows/WorkflowAnalyzer.cs | 13 ++- .../Workflows/WorkflowGraphInfo.cs | 20 +++- .../Workflows/DurableEdgeMapSwitchTests.cs | 91 +++++++++++++++++++ 6 files changed, 163 insertions(+), 26 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md b/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md index 1fa375e8a91..8cce304eba6 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md @@ -4,6 +4,7 @@ - Bind MCP threadId to the current agent and guard cross-agent session dispatch ([#6531](https://github.com/microsoft/agent-framework/pull/6531)) - Added support for durable workflows ([#4436](https://github.com/microsoft/agent-framework/pull/4436)) +- Added support for `AddSwitch` and target-selecting fan-out edges in the durable workflow runner ([#6749](https://github.com/microsoft/agent-framework/pull/6749)) ## v1.0.0-preview.260219.1 diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs index 6ff03b94e90..e569052d04d 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs @@ -101,19 +101,46 @@ internal DurableEdgeMap(WorkflowGraphInfo graphInfo) graphInfo.ExecutorOutputTypes.TryGetValue(sourceId, out Type? sourceOutputType); - // A switch (AddSwitch) or target-selecting fan-out edge is represented as a single - // fan-out edge with an assigner. Build a fan-out router that evaluates the assigner so - // only the selected targets receive the message, mirroring the in-process FanOutEdgeRunner. - if (graphInfo.FanOutRoutings.TryGetValue(sourceId, out (List SinkIds, Func> Assigner) fanOutRouting)) + // A switch (AddSwitch) or target-selecting fan-out edge is represented as a single fan-out edge + // with an assigner. A source can also declare such selector edges alongside ordinary direct or + // plain fan-out edges. Because the graph flattens every edge's targets into a single successor + // list, we build one selector-aware fan-out router per selector routing (so only the chosen + // targets receive the message) and ordinary direct routers for the remaining sibling edges, so + // none of them are dropped. This mirrors the in-process runner, which creates one runner per edge. + if (graphInfo.SelectiveFanOuts.TryGetValue(sourceId, out List? selectiveFanOuts) + && selectiveFanOuts.Count > 0) { - List orderedRouters = []; - foreach (string sinkId in fanOutRouting.SinkIds) + List sourceRouters = []; + + // Targets reached through a selector are chosen by its assigner; track them so we don't also + // wire an unconditional direct router to them below. + HashSet selectorSinks = []; + foreach (SelectiveFanOut selectiveFanOut in selectiveFanOuts) { - orderedRouters.Add(new DurableDirectEdgeRouter(sourceId, sinkId, condition: null, sourceOutputType)); + List orderedRouters = []; + foreach (string sinkId in selectiveFanOut.SinkIds) + { + orderedRouters.Add(new DurableDirectEdgeRouter(sourceId, sinkId, condition: null, sourceOutputType)); + selectorSinks.Add(sinkId); + } + + sourceRouters.Add(new DurableFanOutEdgeRouter(sourceId, orderedRouters, selectiveFanOut.Assigner, sourceOutputType)); + } + + // Sibling edges from the same source (direct or plain fan-out, possibly conditional) that are + // not part of any selector still need to deliver. + foreach (string sinkId in successorIds) + { + if (selectorSinks.Contains(sinkId)) + { + continue; + } + + graphInfo.EdgeConditions.TryGetValue((sourceId, sinkId), out Func? siblingCondition); + sourceRouters.Add(new DurableDirectEdgeRouter(sourceId, sinkId, siblingCondition, sourceOutputType)); } - this._routersBySource[sourceId] = - [new DurableFanOutEdgeRouter(sourceId, orderedRouters, fanOutRouting.Assigner, sourceOutputType)]; + this._routersBySource[sourceId] = sourceRouters; continue; } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs index d067d7a3074..03d39b6067d 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs @@ -87,15 +87,16 @@ public void RouteMessage( return; } - // Assigner present (e.g., a switch): select only the matching targets, mirroring the - // in-process FanOutEdgeRunner. The assigner returns indices into the ordered target list. - List selectedIndices; + // Assigner present (e.g., a switch): select only the matching targets, mirroring the in-process + // FanOutEdgeRunner. The assigner returns indices into the ordered target list. Indices map directly + // to targets with no range filtering and no de-duplication, so an out-of-range index surfaces as an + // error (logged below) instead of being silently dropped, and duplicate indices deliver more than once. + List selectedRouters; try { object? messageObj = DurableSerialization.DeserializeMessage(envelope.Message, this._sourceOutputType); - selectedIndices = this._edgeAssigner(messageObj, this._targetRouters.Count) - .Where(i => i >= 0 && i < this._targetRouters.Count) - .Distinct() + selectedRouters = this._edgeAssigner(messageObj, this._targetRouters.Count) + .Select(index => this._targetRouters[index]) .ToList(); } catch (Exception ex) @@ -104,11 +105,11 @@ public void RouteMessage( return; } - logger.LogFanOutSelectorMatched(this._sourceId, selectedIndices.Count, this._targetRouters.Count); + logger.LogFanOutSelectorMatched(this._sourceId, selectedRouters.Count, this._targetRouters.Count); - foreach (int index in selectedIndices) + foreach (IDurableEdgeRouter targetRouter in selectedRouters) { - this._targetRouters[index].RouteMessage(envelope, messageQueues, logger); + targetRouter.RouteMessage(envelope, messageQueues, logger); } } } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowAnalyzer.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowAnalyzer.cs index 6061bd7b6b3..f48b8248249 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowAnalyzer.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowAnalyzer.cs @@ -123,7 +123,7 @@ private static void PopulateGraphFromEdges(WorkflowGraphInfo graphInfo, Dictiona { AddSuccessorsFromEdge(graphInfo, sourceId, edge, successors); TryAddEdgeCondition(graphInfo, edge); - TryAddFanOutRouting(graphInfo, edge); + TryAddSelectiveFanOut(graphInfo, edge); } } } @@ -176,16 +176,23 @@ private static void TryAddEdgeCondition(WorkflowGraphInfo graphInfo, Edge edge) /// target selector) is represented as a single carrying an /// EdgeAssigner. The assigner maps an incoming message to the subset of targets that should /// receive it. Capturing it here lets the durable runtime route to only the selected target(s). + /// A source can declare several such edges, so each one is appended to the source's list. /// /// The graph info to update. /// The edge that may be a fan-out edge with an assigner. - private static void TryAddFanOutRouting(WorkflowGraphInfo graphInfo, Edge edge) + private static void TryAddSelectiveFanOut(WorkflowGraphInfo graphInfo, Edge edge) { FanOutEdgeData? fanOutEdge = edge.FanOutEdgeData; if (fanOutEdge?.EdgeAssigner is not null) { - graphInfo.FanOutRoutings[fanOutEdge.SourceId] = (fanOutEdge.SinkIds, fanOutEdge.EdgeAssigner); + if (!graphInfo.SelectiveFanOuts.TryGetValue(fanOutEdge.SourceId, out List? selectiveFanOuts)) + { + selectiveFanOuts = []; + graphInfo.SelectiveFanOuts[fanOutEdge.SourceId] = selectiveFanOuts; + } + + selectiveFanOuts.Add(new SelectiveFanOut(fanOutEdge.SinkIds, fanOutEdge.EdgeAssigner)); } } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowGraphInfo.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowGraphInfo.cs index ed23b239a58..fc8b04d8f15 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowGraphInfo.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowGraphInfo.cs @@ -97,10 +97,20 @@ internal sealed class WorkflowGraphInfo public Dictionary ExecutorOutputTypes { get; } = []; /// - /// Maps a source executor ID to a fan-out target selector produced by a switch (AddSwitch) or a - /// target-selecting fan-out edge. The Assigner maps an incoming message to the indices of the - /// ordered SinkIds that should receive it, so only the selected targets run (mirroring the - /// in-process FanOutEdgeData.EdgeAssigner). + /// Maps a source executor ID to the selective fan-outs originating from it. A source can have more than one + /// (for example, multiple switches), so the value is a list rather than a single entry. /// - public Dictionary SinkIds, Func> Assigner)> FanOutRoutings { get; } = []; + public Dictionary> SelectiveFanOuts { get; } = []; } + +/// +/// Represents a fan-out that delivers to a selected subset of its targets, produced by a switch +/// (AddSwitch) or a target-selecting fan-out edge. Only the targets chosen by +/// receive the message, mirroring the in-process FanOutEdgeData.EdgeAssigner. +/// +/// The ordered target executor IDs of the fan-out edge. +/// +/// Maps an incoming message and the target count to the indices of that should +/// receive it. +/// +internal sealed record SelectiveFanOut(List SinkIds, Func> Assigner); diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs index 34dd21f2764..7b7aa4e9ad1 100644 --- a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs @@ -84,6 +84,97 @@ public void RouteMessage_FanOutWithoutSelector_RoutesToAllTargets() Assert.Equal(1, QueuedCount(queues, TargetBId)); } + [Fact] + public void RouteMessage_SwitchWithSiblingDirectEdge_DeliversToSelectedCaseAndSibling() + { + // Arrange: a switch plus an ordinary direct edge from the same source. The switch selects one case, + // while the sibling edge must always deliver (the switch must not suppress unrelated edges). + const string AuditId = "audit"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n % 2 == 0, Sink(EvenId)); + sb.AddCase(n => n % 2 != 0, Sink(OddId)); + }); + builder.AddEdge(router, Sink(AuditId)); + + // Act: 4 is even, so the even case matches; the audit sibling always receives the message. + Dictionary> queues = Route(builder.Build(), 4); + + // Assert + Assert.Equal(1, QueuedCount(queues, EvenId)); + Assert.Equal(0, QueuedCount(queues, OddId)); + Assert.Equal(1, QueuedCount(queues, AuditId)); + } + + [Fact] + public void RouteMessage_MultipleSwitchesFromSameSource_HonorsEachIndependently() + { + // Arrange: two switches from the same source. Both must be evaluated; neither should overwrite the other. + const string PositiveId = "positive"; + const string NonPositiveId = "nonPositive"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n > 0, Sink(PositiveId)); + sb.WithDefault(Sink(NonPositiveId)); + }); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n % 2 == 0, Sink(EvenId)); + sb.WithDefault(Sink(OddId)); + }); + + // Act: 4 is both positive and even. + Dictionary> queues = Route(builder.Build(), 4); + + // Assert: the matching branch of each switch receives the message. + Assert.Equal(1, QueuedCount(queues, PositiveId)); + Assert.Equal(0, QueuedCount(queues, NonPositiveId)); + Assert.Equal(1, QueuedCount(queues, EvenId)); + Assert.Equal(0, QueuedCount(queues, OddId)); + } + + [Fact] + public void RouteMessage_SelectorReturnsDuplicateIndex_DeliversMessageOncePerIndex() + { + // Arrange: a selector that returns the same index twice must deliver twice (no de-duplication), + // mirroring the in-process FanOutEdgeRunner. + const string TargetId = "dupTarget"; + const string OtherId = "other"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddFanOutEdge(router, [Sink(TargetId), Sink(OtherId)], (_, _) => [0, 0]); + + // Act + Dictionary> queues = Route(builder.Build(), 7); + + // Assert: index 0 was selected twice, so the target receives two messages. + Assert.Equal(2, QueuedCount(queues, TargetId)); + Assert.Equal(0, QueuedCount(queues, OtherId)); + } + + [Fact] + public void RouteMessage_SelectorReturnsOutOfRangeIndex_DoesNotThrow() + { + // Arrange: an out-of-range index must be surfaced (logged) rather than crash the routing layer. + const string TargetId = "target"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddFanOutEdge(router, [Sink(TargetId)], (_, _) => [5]); + + // Act + Assert: routing swallows the bad index, nothing is delivered, and no exception escapes. + Dictionary> queues = Route(builder.Build(), 7); + + Assert.Equal(0, QueuedCount(queues, TargetId)); + } + private static FunctionExecutor Router() => new(RouterId, (input, _, _) => input, outputTypes: [typeof(int)]); From cdcdc01ad648bd195b3d7a0edc4b205c3f6a733a Mon Sep 17 00:00:00 2001 From: Shyju Krishnankutty Date: Thu, 25 Jun 2026 11:59:48 -0700 Subject: [PATCH 3/6] PR feedback fixes. --- .../ConsoleApps/09_SwitchRouting/Executors.cs | 16 +++---- .../Microsoft.Agents.AI.DurableTask/Logs.cs | 19 ++++++++ .../Workflows/EdgeRouters/DurableEdgeMap.cs | 14 +++--- .../EdgeRouters/DurableFanOutEdgeRouter.cs | 30 +++++++------ .../Workflows/DurableEdgeMapSwitchTests.cs | 44 +++++++++++++++++++ 5 files changed, 96 insertions(+), 27 deletions(-) diff --git a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs index 4fe9062d925..08efc3fcfd3 100644 --- a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs +++ b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs @@ -8,29 +8,29 @@ internal sealed record Expense(string Id, decimal Amount); internal sealed class ExpenseParser() : Executor("ExpenseParser") { - public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default) + public override ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default) { // The input is the expense amount (e.g., "50" or "4500"). A real workflow would // look the expense up from a store; here we just attach a generated id. decimal amount = decimal.TryParse(message, out decimal parsed) ? parsed : 0m; - return new Expense($"EXP-{Guid.NewGuid().ToString()[..4]}", amount); + return ValueTask.FromResult(new Expense($"EXP-{Guid.NewGuid().ToString()[..4]}", amount)); } } internal sealed class AutoApprove() : Executor("AutoApprove") { - public override async ValueTask HandleAsync(Expense message, IWorkflowContext context, CancellationToken cancellationToken = default) - => $"Expense {message.Id} for {message.Amount:C} was auto-approved."; + public override ValueTask HandleAsync(Expense message, IWorkflowContext context, CancellationToken cancellationToken = default) + => ValueTask.FromResult($"Expense {message.Id} for {message.Amount:C} was auto-approved."); } internal sealed class ManagerApproval() : Executor("ManagerApproval") { - public override async ValueTask HandleAsync(Expense message, IWorkflowContext context, CancellationToken cancellationToken = default) - => $"Expense {message.Id} for {message.Amount:C} was routed to a manager for approval."; + public override ValueTask HandleAsync(Expense message, IWorkflowContext context, CancellationToken cancellationToken = default) + => ValueTask.FromResult($"Expense {message.Id} for {message.Amount:C} was routed to a manager for approval."); } internal sealed class DirectorApproval() : Executor("DirectorApproval") { - public override async ValueTask HandleAsync(Expense message, IWorkflowContext context, CancellationToken cancellationToken = default) - => $"Expense {message.Id} for {message.Amount:C} was routed to a director for approval."; + public override ValueTask HandleAsync(Expense message, IWorkflowContext context, CancellationToken cancellationToken = default) + => ValueTask.FromResult($"Expense {message.Id} for {message.Amount:C} was routed to a director for approval."); } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs index 76918d7a5e5..2dd1e2add92 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs @@ -246,4 +246,23 @@ public static partial void LogFanOutSelectorEvaluationFailed( this ILogger logger, Exception ex, string source); + + [LoggerMessage( + EventId = 116, + Level = LogLevel.Debug, + Message = "Fan-out from {Source}: routing to {Count} target(s)")] + public static partial void LogFanOutRouting( + this ILogger logger, + string source, + int count); + + [LoggerMessage( + EventId = 117, + Level = LogLevel.Warning, + Message = "Fan-out from {Source}: selector returned out-of-range index {Index} (target count {Count}), skipping")] + public static partial void LogFanOutSelectorIndexOutOfRange( + this ILogger logger, + string source, + int index, + int count); } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs index e569052d04d..ddf6e022d7c 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs @@ -112,16 +112,18 @@ internal DurableEdgeMap(WorkflowGraphInfo graphInfo) { List sourceRouters = []; - // Targets reached through a selector are chosen by its assigner; track them so we don't also - // wire an unconditional direct router to them below. - HashSet selectorSinks = []; + // Count how many times each target is claimed by a selector. The flattened successor list can + // contain the same target more than once (a selector case and a sibling edge can point at the + // same executor), so we consume one occurrence per selector claim rather than skipping the + // target entirely, ensuring sibling edges to a selector's target are still wired. + Dictionary selectorSinkCounts = []; foreach (SelectiveFanOut selectiveFanOut in selectiveFanOuts) { List orderedRouters = []; foreach (string sinkId in selectiveFanOut.SinkIds) { orderedRouters.Add(new DurableDirectEdgeRouter(sourceId, sinkId, condition: null, sourceOutputType)); - selectorSinks.Add(sinkId); + selectorSinkCounts[sinkId] = selectorSinkCounts.GetValueOrDefault(sinkId) + 1; } sourceRouters.Add(new DurableFanOutEdgeRouter(sourceId, orderedRouters, selectiveFanOut.Assigner, sourceOutputType)); @@ -131,8 +133,10 @@ internal DurableEdgeMap(WorkflowGraphInfo graphInfo) // not part of any selector still need to deliver. foreach (string sinkId in successorIds) { - if (selectorSinks.Contains(sinkId)) + // Consume one selector occurrence of this target; any remaining occurrence is a sibling edge. + if (selectorSinkCounts.TryGetValue(sinkId, out int remaining) && remaining > 0) { + selectorSinkCounts[sinkId] = remaining - 1; continue; } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs index 03d39b6067d..999c33ca721 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs @@ -74,10 +74,7 @@ public void RouteMessage( // No assigner: plain fan-out, forward the message to every target. if (this._edgeAssigner is null) { - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug("Fan-Out from {Source}: routing to {Count} targets", this._sourceId, this._targetRouters.Count); - } + logger.LogFanOutRouting(this._sourceId, this._targetRouters.Count); foreach (IDurableEdgeRouter targetRouter in this._targetRouters) { @@ -88,16 +85,13 @@ public void RouteMessage( } // Assigner present (e.g., a switch): select only the matching targets, mirroring the in-process - // FanOutEdgeRunner. The assigner returns indices into the ordered target list. Indices map directly - // to targets with no range filtering and no de-duplication, so an out-of-range index surfaces as an - // error (logged below) instead of being silently dropped, and duplicate indices deliver more than once. - List selectedRouters; + // FanOutEdgeRunner. The assigner returns indices into the ordered target list, with no de-duplication, + // so duplicate indices deliver the message more than once. + List selectedIndices; try { object? messageObj = DurableSerialization.DeserializeMessage(envelope.Message, this._sourceOutputType); - selectedRouters = this._edgeAssigner(messageObj, this._targetRouters.Count) - .Select(index => this._targetRouters[index]) - .ToList(); + selectedIndices = this._edgeAssigner(messageObj, this._targetRouters.Count).ToList(); } catch (Exception ex) { @@ -105,11 +99,19 @@ public void RouteMessage( return; } - logger.LogFanOutSelectorMatched(this._sourceId, selectedRouters.Count, this._targetRouters.Count); + logger.LogFanOutSelectorMatched(this._sourceId, selectedIndices.Count, this._targetRouters.Count); - foreach (IDurableEdgeRouter targetRouter in selectedRouters) + foreach (int index in selectedIndices) { - targetRouter.RouteMessage(envelope, messageQueues, logger); + // Range-check each index individually so an out-of-range value is surfaced (logged) and skipped on + // its own, without dropping deliveries to the other valid targets selected for this message. + if (index < 0 || index >= this._targetRouters.Count) + { + logger.LogFanOutSelectorIndexOutOfRange(this._sourceId, index, this._targetRouters.Count); + continue; + } + + this._targetRouters[index].RouteMessage(envelope, messageQueues, logger); } } } diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs index 7b7aa4e9ad1..c2d6d69ca8d 100644 --- a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs @@ -175,6 +175,50 @@ public void RouteMessage_SelectorReturnsOutOfRangeIndex_DoesNotThrow() Assert.Equal(0, QueuedCount(queues, TargetId)); } + [Fact] + public void RouteMessage_SelectorReturnsMixedValidAndInvalidIndex_DeliversValidTargetsOnly() + { + // Arrange: a selector returning [0, 5] for two targets. The out-of-range index 5 must be skipped + // without dropping the valid delivery to index 0. + const string ValidId = "valid"; + const string OtherId = "other"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddFanOutEdge(router, [Sink(ValidId), Sink(OtherId)], (_, _) => [0, 5]); + + // Act + Dictionary> queues = Route(builder.Build(), 7); + + // Assert: index 0 still receives the message; the invalid index is skipped. + Assert.Equal(1, QueuedCount(queues, ValidId)); + Assert.Equal(0, QueuedCount(queues, OtherId)); + } + + [Fact] + public void RouteMessage_SwitchCaseAndSiblingEdgeToSameTarget_BothDeliver() + { + // Arrange: a switch case and a sibling direct edge both target the same executor. The sibling edge must + // still be wired even though the switch already routes to that target. + FunctionExecutor evenSink = Sink(EvenId); + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n % 2 == 0, evenSink); + sb.AddCase(n => n % 2 != 0, Sink(OddId)); + }); + builder.AddEdge(router, evenSink); + + // Act: 4 is even, so the switch routes to evenSink; the sibling direct edge also delivers to it. + Dictionary> queues = Route(builder.Build(), 4); + + // Assert: evenSink receives the message twice (once via the switch case, once via the sibling edge). + Assert.Equal(2, QueuedCount(queues, EvenId)); + Assert.Equal(0, QueuedCount(queues, OddId)); + } + private static FunctionExecutor Router() => new(RouterId, (input, _, _) => input, outputTypes: [typeof(int)]); From 209cfe63e318b2ecd7865ad5d842ee8391c208a5 Mon Sep 17 00:00:00 2001 From: Shyju Krishnankutty Date: Thu, 25 Jun 2026 13:12:39 -0700 Subject: [PATCH 4/6] Fix file encoding to UTF-8 BOM for switch routing sample Add the UTF-8 BOM required by dotnet/.editorconfig (charset = utf-8-bom) to Executors.cs and Program.cs, which was lost during EOL normalization and caused the dotnet format CHARSET check to fail. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs | 2 +- .../DurableWorkflows/ConsoleApps/09_SwitchRouting/Program.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs index 08efc3fcfd3..484a397a512 100644 --- a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs +++ b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using Microsoft.Agents.AI.Workflows; diff --git a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Program.cs b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Program.cs index 7eebd4c1262..a115a1d93de 100644 --- a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Program.cs +++ b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Program.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. // This sample demonstrates multi-way routing in a workflow using AddSwitch. // An expense is routed to a different approval path based on its amount: From 97244026371cd1e42bca518047cbe57ae55e5e37 Mon Sep 17 00:00:00 2001 From: Shyju Krishnankutty Date: Thu, 25 Jun 2026 13:26:32 -0700 Subject: [PATCH 5/6] Fix file encoding to UTF-8 BOM for DurableEdgeMapSwitchTests Add the UTF-8 BOM required by dotnet/.editorconfig (charset = utf-8-bom) to the new test file, which caused the dotnet format CHARSET check to fail. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Workflows/DurableEdgeMapSwitchTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs index c2d6d69ca8d..5c027ce47c9 100644 --- a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using System.Text.Json; using Microsoft.Agents.AI.DurableTask.Workflows; From efe227c290965f9cd9acb775ee7bd516ac253e9a Mon Sep 17 00:00:00 2001 From: Shyju Krishnankutty Date: Thu, 25 Jun 2026 14:24:06 -0700 Subject: [PATCH 6/6] Count distinct predecessors for fan-in detection A single source can reach the same target through more than one edge (for example a switch case plus a sibling direct edge to the same executor). Counting those repeated deliveries as multiple predecessors made the target look like a fan-in point, so the durable runner aggregated the deliveries into one invocation instead of running the target once per delivery as in-process does. Count distinct predecessor sources so only genuine fan-in (multiple distinct sources) is aggregated. Adds regression tests for the selector-throws path, a conditional sibling edge alongside a switch, and fan-in detection. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Workflows/EdgeRouters/DurableEdgeMap.cs | 8 +- .../Workflows/DurableEdgeMapSwitchTests.cs | 96 ++++++++++++++++++- 2 files changed, 100 insertions(+), 4 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs index ddf6e022d7c..2d31a11e3e9 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs @@ -168,10 +168,14 @@ internal DurableEdgeMap(WorkflowGraphInfo graphInfo) } } - // Store predecessor counts for fan-in detection + // Store predecessor counts for fan-in detection. Count distinct source executors: a single source can + // reach the same target through more than one edge (for example a switch case plus a sibling direct + // edge to the same executor), and those repeated deliveries must not be mistaken for a fan-in. True + // fan-in aggregates deliveries from multiple distinct sources, mirroring the in-process FanInEdgeData + // contract, so a target fed twice by the same source still runs once per delivery. foreach (KeyValuePair> entry in graphInfo.Predecessors) { - this._predecessorCounts[entry.Key] = entry.Value.Count; + this._predecessorCounts[entry.Key] = entry.Value.Distinct().Count(); } } diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs index 5c027ce47c9..8a4b4afaa9f 100644 --- a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs @@ -219,16 +219,108 @@ public void RouteMessage_SwitchCaseAndSiblingEdgeToSameTarget_BothDeliver() Assert.Equal(0, QueuedCount(queues, OddId)); } + [Theory] + [InlineData(4, 1)] // 4 > 3, sibling condition holds, audit receives the message + [InlineData(2, 0)] // 2 > 3 is false, sibling condition fails, audit is skipped + public void RouteMessage_SwitchWithConditionalSiblingEdge_HonorsSiblingCondition(int number, int expectedAuditCount) + { + // Arrange: a switch plus a conditional sibling direct edge from the same source. The sibling edge's + // condition must be honored even though it shares a source with the selector. + const string AuditId = "audit"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n % 2 == 0, Sink(EvenId)); + sb.AddCase(n => n % 2 != 0, Sink(OddId)); + }); + builder.AddEdge(router, Sink(AuditId), n => n > 3); + + // Act: both inputs are even, so the switch always routes to the even branch. + Dictionary> queues = Route(builder.Build(), number); + + // Assert: the switch delivers regardless; the audit sibling only delivers when its condition holds. + Assert.Equal(1, QueuedCount(queues, EvenId)); + Assert.Equal(expectedAuditCount, QueuedCount(queues, AuditId)); + } + + [Fact] + public void RouteMessage_SelectorThrows_DoesNotDeliverOrThrow() + { + // Arrange: a selector that throws when evaluated. Routing must swallow and log the failure rather + // than crash the orchestration, and nothing should be delivered. + const string TargetId = "target"; + const string OtherId = "other"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddFanOutEdge(router, [Sink(TargetId), Sink(OtherId)], (_, _) => throw new InvalidOperationException("boom")); + + // Act + Assert: the selector exception is swallowed, no message is delivered, and nothing escapes. + Dictionary> queues = Route(builder.Build(), 7); + + Assert.Equal(0, QueuedCount(queues, TargetId)); + Assert.Equal(0, QueuedCount(queues, OtherId)); + } + + [Fact] + public void IsFanInExecutor_SwitchCaseAndSiblingEdgeToSameTarget_NotTreatedAsFanIn() + { + // Arrange: a switch case and a sibling direct edge both target the same executor from the same source. + // Those repeated deliveries originate from a single source, so the target must not be treated as a + // fan-in (which would aggregate them into one invocation); it should run once per delivery, matching + // the in-process contract where aggregation is reserved for explicit fan-in edges. + FunctionExecutor evenSink = Sink(EvenId); + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n % 2 == 0, evenSink); + sb.AddCase(n => n % 2 != 0, Sink(OddId)); + }); + builder.AddEdge(router, evenSink); + + DurableEdgeMap edgeMap = BuildEdgeMap(builder.Build()); + + // Assert: a target fed twice by the same source is not a fan-in point. + Assert.False(edgeMap.IsFanInExecutor(EvenId)); + } + + [Fact] + public void IsFanInExecutor_MultipleDistinctSources_TreatedAsFanIn() + { + // Arrange: a diamond where two distinct sources converge on one target — a genuine fan-in. + FunctionExecutor start = new("start", (input, _, _) => input, outputTypes: [typeof(int)]); + FunctionExecutor left = new("left", (input, _, _) => input, outputTypes: [typeof(int)]); + FunctionExecutor right = new("right", (input, _, _) => input, outputTypes: [typeof(int)]); + FunctionExecutor target = Sink("target"); + + WorkflowBuilder builder = new(start); + builder.AddEdge(start, left); + builder.AddEdge(start, right); + builder.AddEdge(left, target); + builder.AddEdge(right, target); + + DurableEdgeMap edgeMap = BuildEdgeMap(builder.Build()); + + // Assert: two distinct predecessors still register as a fan-in point. + Assert.True(edgeMap.IsFanInExecutor("target")); + } + private static FunctionExecutor Router() => new(RouterId, (input, _, _) => input, outputTypes: [typeof(int)]); private static FunctionExecutor Sink(string id) => new(id, (_, _, _) => default); + private static DurableEdgeMap BuildEdgeMap(Workflow workflow) + => new(WorkflowAnalyzer.BuildGraphInfo(workflow)); + private static Dictionary> Route(Workflow workflow, int number) { - WorkflowGraphInfo graphInfo = WorkflowAnalyzer.BuildGraphInfo(workflow); - DurableEdgeMap edgeMap = new(graphInfo); + DurableEdgeMap edgeMap = BuildEdgeMap(workflow); Dictionary> queues = []; edgeMap.RouteMessage(