diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index 3be35c1cda0..9e47424cd98 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -79,6 +79,7 @@ + diff --git a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/09_SwitchRouting.csproj b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/09_SwitchRouting.csproj new file mode 100644 index 00000000000..5db719ec651 --- /dev/null +++ b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/09_SwitchRouting.csproj @@ -0,0 +1,28 @@ + + + net10.0 + Exe + enable + enable + SwitchRouting + SwitchRouting + + + + + + + + + + + + + + + diff --git a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs new file mode 100644 index 00000000000..484a397a512 --- /dev/null +++ b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Executors.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.Workflows; + +namespace SwitchRouting; + +internal sealed record Expense(string Id, decimal Amount); + +internal sealed class ExpenseParser() : Executor("ExpenseParser") +{ + public override ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default) + { + // The input is the expense amount (e.g., "50" or "4500"). A real workflow would + // look the expense up from a store; here we just attach a generated id. + decimal amount = decimal.TryParse(message, out decimal parsed) ? parsed : 0m; + return ValueTask.FromResult(new Expense($"EXP-{Guid.NewGuid().ToString()[..4]}", amount)); + } +} + +internal sealed class AutoApprove() : Executor("AutoApprove") +{ + public override ValueTask HandleAsync(Expense message, IWorkflowContext context, CancellationToken cancellationToken = default) + => ValueTask.FromResult($"Expense {message.Id} for {message.Amount:C} was auto-approved."); +} + +internal sealed class ManagerApproval() : Executor("ManagerApproval") +{ + public override ValueTask HandleAsync(Expense message, IWorkflowContext context, CancellationToken cancellationToken = default) + => ValueTask.FromResult($"Expense {message.Id} for {message.Amount:C} was routed to a manager for approval."); +} + +internal sealed class DirectorApproval() : Executor("DirectorApproval") +{ + public override ValueTask HandleAsync(Expense message, IWorkflowContext context, CancellationToken cancellationToken = default) + => ValueTask.FromResult($"Expense {message.Id} for {message.Amount:C} was routed to a director for approval."); +} diff --git a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Program.cs b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Program.cs new file mode 100644 index 00000000000..a115a1d93de --- /dev/null +++ b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/Program.cs @@ -0,0 +1,103 @@ +// Copyright (c) Microsoft. All rights reserved. + +// This sample demonstrates multi-way routing in a workflow using AddSwitch. +// An expense is routed to a different approval path based on its amount: +// - Amount < 100 -> AutoApprove +// - Amount < 1000 -> ManagerApproval +// - Otherwise -> DirectorApproval (default) +// +// Unlike AddEdge(..., condition:), which adds an independent boolean condition per edge, +// AddSwitch evaluates its cases in order and routes to the FIRST matching branch (or the +// default when none match), making it a natural fit for multi-way routing. + +using Microsoft.Agents.AI.DurableTask; +using Microsoft.Agents.AI.DurableTask.Workflows; +using Microsoft.Agents.AI.Workflows; +using Microsoft.DurableTask.Client.AzureManaged; +using Microsoft.DurableTask.Worker.AzureManaged; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using SwitchRouting; + +string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING") + ?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"; + +// Create executor instances +ExpenseParser expenseParser = new(); +AutoApprove autoApprove = new(); +ManagerApproval managerApproval = new(); +DirectorApproval directorApproval = new(); + +// Build a workflow that switches on the parsed expense amount. Cases are evaluated in order; +// the first matching case wins, and WithDefault handles everything else. +WorkflowBuilder builder = new(expenseParser); +builder.AddSwitch(expenseParser, switchBuilder => + switchBuilder + .AddCase(expense => expense!.Amount < 100m, autoApprove) + .AddCase(expense => expense!.Amount < 1000m, managerApproval) + .WithDefault(directorApproval)); + +Workflow approveExpense = builder.WithName("ApproveExpense").Build(); + +IHost host = Host.CreateDefaultBuilder(args) +.ConfigureLogging(logging => logging.SetMinimumLevel(LogLevel.Warning)) +.ConfigureServices(services => +{ + services.ConfigureDurableWorkflows( + workflowOptions => workflowOptions.AddWorkflow(approveExpense), + workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString), + clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString)); +}) +.Build(); + +await host.StartAsync(); + +IWorkflowClient workflowClient = host.Services.GetRequiredService(); + +Console.WriteLine("Enter an expense amount (or 'exit'):"); +Console.WriteLine("Tip: try 50, 450, and 5000 to see each branch.\n"); + +while (true) +{ + Console.Write("> "); + string? input = Console.ReadLine(); + if (string.IsNullOrWhiteSpace(input) || input.Equals("exit", StringComparison.OrdinalIgnoreCase)) + { + break; + } + + try + { + await StartNewWorkflowAsync(input, approveExpense, workflowClient); + } + catch (Exception ex) + { + Console.WriteLine($"Error: {ex.Message}"); + } + + Console.WriteLine(); +} + +await host.StopAsync(); + +// Start a new workflow and wait for completion +static async Task StartNewWorkflowAsync(string amount, Workflow workflow, IWorkflowClient client) +{ + Console.WriteLine($"Starting workflow for expense amount '{amount}'..."); + + // Cast to IAwaitableWorkflowRun to access WaitForCompletionAsync + IAwaitableWorkflowRun run = (IAwaitableWorkflowRun)await client.RunAsync(workflow, amount); + Console.WriteLine($"Run ID: {run.RunId}"); + + try + { + Console.WriteLine("Waiting for workflow to complete..."); + string? result = await run.WaitForCompletionAsync(); + Console.WriteLine($"Workflow completed. {result}"); + } + catch (InvalidOperationException ex) + { + Console.WriteLine($"Failed: {ex.Message}"); + } +} diff --git a/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/README.md b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/README.md new file mode 100644 index 00000000000..b3de9fbc686 --- /dev/null +++ b/dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting/README.md @@ -0,0 +1,83 @@ +# Switch Routing Workflow Sample + +This sample demonstrates how to build a workflow with **multi-way routing** using `AddSwitch`. A switch evaluates a set of ordered cases against the output of an executor and routes execution to the **first matching** branch — or to a **default** branch when no case matches. + +> **Related sample:** [`03_ConditionalEdges`](../03_ConditionalEdges/README.md) solves a similar branching problem with a different API — `AddEdge(..., condition:)` for per-edge boolean conditions. See [`AddSwitch` vs. conditional edges](#addswitch-vs-conditional-edges) below for when to use which. + +## Key Concepts Demonstrated + +- Building workflows with **multi-way routing** using `AddSwitch` and `AddCase` / `WithDefault` +- Ordered, first-match-wins case evaluation +- Falling back to a default branch when no case matches +- Using `ConfigureDurableWorkflows` to register workflows with dependency injection + +## Overview + +The sample implements an expense approval workflow that routes each expense to a different approval path based on its amount: + +``` +ExpenseParser --[amount < 100]---> AutoApprove + |--[amount < 1000]--> ManagerApproval + +--[default]--------> DirectorApproval +``` + +| Executor | Description | +|----------|-------------| +| ExpenseParser | Parses the entered amount into an `Expense` | +| AutoApprove | Auto-approves small expenses (`< 100`) | +| ManagerApproval | Routes mid-range expenses (`< 1000`) to a manager | +| DirectorApproval | Default branch for everything else (`>= 1000`) | + +## How `AddSwitch` Works + +`AddSwitch` configures a switch on the output of a source executor. Cases are evaluated **in order**, and the **first** matching case wins; `WithDefault` handles anything that matches no case: + +```csharp +builder.AddSwitch(expenseParser, switchBuilder => + switchBuilder + .AddCase(expense => expense!.Amount < 100m, autoApprove) + .AddCase(expense => expense!.Amount < 1000m, managerApproval) + .WithDefault(directorApproval)); +``` + +Each case predicate receives the output of the source executor and returns a boolean. + +### `AddSwitch` vs. conditional edges + +The [`03_ConditionalEdges`](../03_ConditionalEdges/README.md) sample uses `AddEdge(..., condition:)`, where each edge carries its own **independent** boolean condition (an edge is traversed whenever its condition is true). `AddSwitch` instead models **mutually exclusive, first-match-wins** routing with a single default — a better fit when exactly one of several branches should run. + +## Environment Setup + +See the [README.md](../../README.md) file in the parent directory for information on configuring the environment, including how to install and run the Durable Task Scheduler. + +## Running the Sample + +```bash +cd dotnet/samples/04-hosting/DurableWorkflows/ConsoleApps/09_SwitchRouting +dotnet run --framework net10.0 +``` + +### Sample Output + +```text +Enter an expense amount (or 'exit'): +Tip: try 50, 450, and 5000 to see each branch. + +> 50 +Starting workflow for expense amount '50'... +Run ID: abc123... +Waiting for workflow to complete... +Workflow completed. Expense EXP-1a2b for $50.00 was auto-approved. + +> 450 +Starting workflow for expense amount '450'... +Run ID: def456... +Waiting for workflow to complete... +Workflow completed. Expense EXP-3c4d for $450.00 was routed to a manager for approval. + +> 5000 +Starting workflow for expense amount '5000'... +Run ID: ghi789... +Waiting for workflow to complete... +Workflow completed. Expense EXP-5e6f for $5,000.00 was routed to a director for approval. +``` diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md b/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md index 2264d994280..2a79e882b1e 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md @@ -5,6 +5,7 @@ - Fix issue with resuming checkpoint after package version upgrade ([#6670](https://github.com/microsoft/agent-framework/pull/6670)) - Bind MCP threadId to the current agent and guard cross-agent session dispatch ([#6531](https://github.com/microsoft/agent-framework/pull/6531)) - Added support for durable workflows ([#4436](https://github.com/microsoft/agent-framework/pull/4436)) +- Added support for `AddSwitch` and target-selecting fan-out edges in the durable workflow runner ([#6749](https://github.com/microsoft/agent-framework/pull/6749)) ## v1.0.0-preview.260219.1 diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs index 57ef010a2f9..2dd1e2add92 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs @@ -227,4 +227,42 @@ public static partial void LogWaitingForExternalEvent( public static partial void LogReceivedExternalEvent( this ILogger logger, string requestPortId); + + [LoggerMessage( + EventId = 114, + Level = LogLevel.Debug, + Message = "Fan-out from {Source}: selector matched {SelectedCount} of {TotalCount} target(s)")] + public static partial void LogFanOutSelectorMatched( + this ILogger logger, + string source, + int selectedCount, + int totalCount); + + [LoggerMessage( + EventId = 115, + Level = LogLevel.Warning, + Message = "Failed to evaluate fan-out selector for source {Source}, skipping")] + public static partial void LogFanOutSelectorEvaluationFailed( + this ILogger logger, + Exception ex, + string source); + + [LoggerMessage( + EventId = 116, + Level = LogLevel.Debug, + Message = "Fan-out from {Source}: routing to {Count} target(s)")] + public static partial void LogFanOutRouting( + this ILogger logger, + string source, + int count); + + [LoggerMessage( + EventId = 117, + Level = LogLevel.Warning, + Message = "Fan-out from {Source}: selector returned out-of-range index {Index} (target count {Count}), skipping")] + public static partial void LogFanOutSelectorIndexOutOfRange( + this ILogger logger, + string source, + int index, + int count); } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableSerialization.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableSerialization.cs index 245ec36fb83..11fbc894562 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableSerialization.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableSerialization.cs @@ -1,5 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. +using System.Diagnostics.CodeAnalysis; using System.Text.Json; namespace Microsoft.Agents.AI.DurableTask.Workflows; @@ -19,4 +20,27 @@ internal static class DurableSerialization PropertyNamingPolicy = JsonNamingPolicy.CamelCase, PropertyNameCaseInsensitive = true }; + + /// + /// Deserializes a workflow message's JSON to the source executor's output type so an edge condition or + /// fan-out selector can evaluate it as a strongly-typed value. Falls back to a generic object when the + /// type is unknown, and returns null for empty input. + /// + /// The serialized message. + /// The source executor's output type, or null if unknown. + /// The deserialized object, or null if the JSON is empty. + /// Thrown when the JSON is invalid or cannot be deserialized to the target type. + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow types registered at startup.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow types registered at startup.")] + internal static object? DeserializeMessage(string json, Type? targetType) + { + if (string.IsNullOrEmpty(json)) + { + return null; + } + + return targetType is null + ? JsonSerializer.Deserialize(json, Options) + : JsonSerializer.Deserialize(json, targetType, Options); + } } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableDirectEdgeRouter.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableDirectEdgeRouter.cs index 3f780931839..54a11a15a3e 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableDirectEdgeRouter.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableDirectEdgeRouter.cs @@ -29,8 +29,6 @@ // Enqueue to // D's queue -using System.Diagnostics.CodeAnalysis; -using System.Text.Json; using Microsoft.Extensions.Logging; namespace Microsoft.Agents.AI.DurableTask.Workflows.EdgeRouters; @@ -88,7 +86,7 @@ public void RouteMessage( { try { - object? messageObj = DeserializeForCondition(envelope.Message, this._sourceOutputType); + object? messageObj = DurableSerialization.DeserializeMessage(envelope.Message, this._sourceOutputType); if (!this._condition(messageObj)) { logger.LogEdgeConditionFalse(this._sourceId, this._sinkId); @@ -106,40 +104,6 @@ public void RouteMessage( EnqueueMessage(messageQueues, this._sinkId, envelope); } - /// - /// Deserializes a JSON message to an object for condition evaluation. - /// - /// - /// Messages travel through the durable workflow as serialized JSON strings, but condition - /// delegates need typed objects to evaluate (e.g., order => order.Status == "Approved"). - /// This method converts the JSON back to an object the condition delegate can evaluate. - /// - /// The JSON string representation of the message. - /// - /// The expected type of the message. When provided, enables strongly-typed deserialization - /// so the condition function receives the correct type to evaluate against. - /// - /// - /// The deserialized object, or null if the JSON is empty. - /// - /// Thrown when the JSON is invalid or cannot be deserialized to the target type. - [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow types registered at startup.")] - [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow types registered at startup.")] - private static object? DeserializeForCondition(string json, Type? targetType) - { - if (string.IsNullOrEmpty(json)) - { - return null; - } - - // If we know the source executor's output type, deserialize to that specific type - // so the condition function can access strongly-typed properties. - // Otherwise, deserialize as a generic object for basic inspection. - return targetType is null - ? JsonSerializer.Deserialize(json, DurableSerialization.Options) - : JsonSerializer.Deserialize(json, targetType, DurableSerialization.Options); - } - private static void EnqueueMessage( Dictionary> queues, string executorId, diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs index 69b8b7cc1c8..2d31a11e3e9 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableEdgeMap.cs @@ -101,6 +101,54 @@ internal DurableEdgeMap(WorkflowGraphInfo graphInfo) graphInfo.ExecutorOutputTypes.TryGetValue(sourceId, out Type? sourceOutputType); + // A switch (AddSwitch) or target-selecting fan-out edge is represented as a single fan-out edge + // with an assigner. A source can also declare such selector edges alongside ordinary direct or + // plain fan-out edges. Because the graph flattens every edge's targets into a single successor + // list, we build one selector-aware fan-out router per selector routing (so only the chosen + // targets receive the message) and ordinary direct routers for the remaining sibling edges, so + // none of them are dropped. This mirrors the in-process runner, which creates one runner per edge. + if (graphInfo.SelectiveFanOuts.TryGetValue(sourceId, out List? selectiveFanOuts) + && selectiveFanOuts.Count > 0) + { + List sourceRouters = []; + + // Count how many times each target is claimed by a selector. The flattened successor list can + // contain the same target more than once (a selector case and a sibling edge can point at the + // same executor), so we consume one occurrence per selector claim rather than skipping the + // target entirely, ensuring sibling edges to a selector's target are still wired. + Dictionary selectorSinkCounts = []; + foreach (SelectiveFanOut selectiveFanOut in selectiveFanOuts) + { + List orderedRouters = []; + foreach (string sinkId in selectiveFanOut.SinkIds) + { + orderedRouters.Add(new DurableDirectEdgeRouter(sourceId, sinkId, condition: null, sourceOutputType)); + selectorSinkCounts[sinkId] = selectorSinkCounts.GetValueOrDefault(sinkId) + 1; + } + + sourceRouters.Add(new DurableFanOutEdgeRouter(sourceId, orderedRouters, selectiveFanOut.Assigner, sourceOutputType)); + } + + // Sibling edges from the same source (direct or plain fan-out, possibly conditional) that are + // not part of any selector still need to deliver. + foreach (string sinkId in successorIds) + { + // Consume one selector occurrence of this target; any remaining occurrence is a sibling edge. + if (selectorSinkCounts.TryGetValue(sinkId, out int remaining) && remaining > 0) + { + selectorSinkCounts[sinkId] = remaining - 1; + continue; + } + + graphInfo.EdgeConditions.TryGetValue((sourceId, sinkId), out Func? siblingCondition); + sourceRouters.Add(new DurableDirectEdgeRouter(sourceId, sinkId, siblingCondition, sourceOutputType)); + } + + this._routersBySource[sourceId] = sourceRouters; + + continue; + } + List routers = []; foreach (string sinkId in successorIds) { @@ -120,10 +168,14 @@ internal DurableEdgeMap(WorkflowGraphInfo graphInfo) } } - // Store predecessor counts for fan-in detection + // Store predecessor counts for fan-in detection. Count distinct source executors: a single source can + // reach the same target through more than one edge (for example a switch case plus a sibling direct + // edge to the same executor), and those repeated deliveries must not be mistaken for a fan-in. True + // fan-in aggregates deliveries from multiple distinct sources, mirroring the in-process FanInEdgeData + // contract, so a target fed twice by the same source still runs once per delivery. foreach (KeyValuePair> entry in graphInfo.Predecessors) { - this._predecessorCounts[entry.Key] = entry.Value.Count; + this._predecessorCounts[entry.Key] = entry.Value.Distinct().Count(); } } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs index f13a0def923..999c33ca721 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableFanOutEdgeRouter.cs @@ -36,16 +36,33 @@ internal sealed class DurableFanOutEdgeRouter : IDurableEdgeRouter { private readonly string _sourceId; private readonly List _targetRouters; + private readonly Func>? _edgeAssigner; + private readonly Type? _sourceOutputType; /// /// Initializes a new instance of . /// /// The source executor ID. /// The routers for each target executor. - internal DurableFanOutEdgeRouter(string sourceId, List targetRouters) + /// + /// Optional target selector. When provided (e.g., for a switch built via AddSwitch), it maps the + /// incoming message to the indices of that should receive it, so only the + /// selected targets run. When null, the message is forwarded to all targets. + /// + /// + /// The output type of the source executor, used to deserialize the JSON message before evaluating the + /// . Ignored when is null. + /// + internal DurableFanOutEdgeRouter( + string sourceId, + List targetRouters, + Func>? edgeAssigner = null, + Type? sourceOutputType = null) { this._sourceId = sourceId; this._targetRouters = targetRouters; + this._edgeAssigner = edgeAssigner; + this._sourceOutputType = sourceOutputType; } /// @@ -54,14 +71,47 @@ public void RouteMessage( Dictionary> messageQueues, ILogger logger) { - if (logger.IsEnabled(LogLevel.Debug)) + // No assigner: plain fan-out, forward the message to every target. + if (this._edgeAssigner is null) { - logger.LogDebug("Fan-Out from {Source}: routing to {Count} targets", this._sourceId, this._targetRouters.Count); + logger.LogFanOutRouting(this._sourceId, this._targetRouters.Count); + + foreach (IDurableEdgeRouter targetRouter in this._targetRouters) + { + targetRouter.RouteMessage(envelope, messageQueues, logger); + } + + return; } - foreach (IDurableEdgeRouter targetRouter in this._targetRouters) + // Assigner present (e.g., a switch): select only the matching targets, mirroring the in-process + // FanOutEdgeRunner. The assigner returns indices into the ordered target list, with no de-duplication, + // so duplicate indices deliver the message more than once. + List selectedIndices; + try { - targetRouter.RouteMessage(envelope, messageQueues, logger); + object? messageObj = DurableSerialization.DeserializeMessage(envelope.Message, this._sourceOutputType); + selectedIndices = this._edgeAssigner(messageObj, this._targetRouters.Count).ToList(); + } + catch (Exception ex) + { + logger.LogFanOutSelectorEvaluationFailed(ex, this._sourceId); + return; + } + + logger.LogFanOutSelectorMatched(this._sourceId, selectedIndices.Count, this._targetRouters.Count); + + foreach (int index in selectedIndices) + { + // Range-check each index individually so an out-of-range value is surfaced (logged) and skipped on + // its own, without dropping deliveries to the other valid targets selected for this message. + if (index < 0 || index >= this._targetRouters.Count) + { + logger.LogFanOutSelectorIndexOutOfRange(this._sourceId, index, this._targetRouters.Count); + continue; + } + + this._targetRouters[index].RouteMessage(envelope, messageQueues, logger); } } } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowAnalyzer.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowAnalyzer.cs index bb4d2956166..f48b8248249 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowAnalyzer.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowAnalyzer.cs @@ -123,6 +123,7 @@ private static void PopulateGraphFromEdges(WorkflowGraphInfo graphInfo, Dictiona { AddSuccessorsFromEdge(graphInfo, sourceId, edge, successors); TryAddEdgeCondition(graphInfo, edge); + TryAddSelectiveFanOut(graphInfo, edge); } } } @@ -167,6 +168,34 @@ private static void TryAddEdgeCondition(WorkflowGraphInfo graphInfo, Edge edge) } } + /// + /// Captures the target-selecting assigner from a fan-out edge if present. + /// + /// + /// A switch (AddSwitch) or a target-selecting fan-out edge (AddFanOutEdge with a + /// target selector) is represented as a single carrying an + /// EdgeAssigner. The assigner maps an incoming message to the subset of targets that should + /// receive it. Capturing it here lets the durable runtime route to only the selected target(s). + /// A source can declare several such edges, so each one is appended to the source's list. + /// + /// The graph info to update. + /// The edge that may be a fan-out edge with an assigner. + private static void TryAddSelectiveFanOut(WorkflowGraphInfo graphInfo, Edge edge) + { + FanOutEdgeData? fanOutEdge = edge.FanOutEdgeData; + + if (fanOutEdge?.EdgeAssigner is not null) + { + if (!graphInfo.SelectiveFanOuts.TryGetValue(fanOutEdge.SourceId, out List? selectiveFanOuts)) + { + selectiveFanOuts = []; + graphInfo.SelectiveFanOuts[fanOutEdge.SourceId] = selectiveFanOuts; + } + + selectiveFanOuts.Add(new SelectiveFanOut(fanOutEdge.SinkIds, fanOutEdge.EdgeAssigner)); + } + } + /// /// Extracts the output type from an executor type by walking the inheritance chain. /// diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowGraphInfo.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowGraphInfo.cs index a504a07b13a..fc8b04d8f15 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowGraphInfo.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/WorkflowGraphInfo.cs @@ -95,4 +95,22 @@ internal sealed class WorkflowGraphInfo /// Maps executor IDs to their output types (for proper deserialization during condition evaluation). /// public Dictionary ExecutorOutputTypes { get; } = []; + + /// + /// Maps a source executor ID to the selective fan-outs originating from it. A source can have more than one + /// (for example, multiple switches), so the value is a list rather than a single entry. + /// + public Dictionary> SelectiveFanOuts { get; } = []; } + +/// +/// Represents a fan-out that delivers to a selected subset of its targets, produced by a switch +/// (AddSwitch) or a target-selecting fan-out edge. Only the targets chosen by +/// receive the message, mirroring the in-process FanOutEdgeData.EdgeAssigner. +/// +/// The ordered target executor IDs of the fan-out edge. +/// +/// Maps an incoming message and the target count to the indices of that should +/// receive it. +/// +internal sealed record SelectiveFanOut(List SinkIds, Func> Assigner); diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs index 4624b6fb792..ee83d06879a 100644 --- a/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs @@ -173,6 +173,68 @@ await this.RunSampleTestAsync(samplePath, async (process, logs) => }); } + [RetryFact(2, 5000)] + public async Task SwitchRoutingWorkflowSampleValidationAsync() + { + using CancellationTokenSource testTimeoutCts = this.CreateTestTimeoutCts(s_testTimeout); + string samplePath = Path.Combine(s_samplesPath, "09_SwitchRouting"); + + await this.RunSampleTestAsync(samplePath, async (process, logs) => + { + bool autoApproveSent = false; + bool managerApprovalSent = false; + bool directorApprovalSent = false; + bool autoApproveCompleted = false; + bool managerApprovalCompleted = false; + bool directorApprovalCompleted = false; + + string? line; + while ((line = this.ReadLogLine(logs, testTimeoutCts.Token)) != null) + { + // Amount < 100 routes to the first switch case (AutoApprove). + if (!autoApproveSent && line.Contains("Enter an expense amount", StringComparison.OrdinalIgnoreCase)) + { + await this.WriteInputAsync(process, "50", testTimeoutCts.Token); + autoApproveSent = true; + } + + if (autoApproveSent && !autoApproveCompleted && + line.Contains("was auto-approved", StringComparison.OrdinalIgnoreCase)) + { + autoApproveCompleted = true; + + // Amount < 1000 routes to the second switch case (ManagerApproval). + await this.WriteInputAsync(process, "450", testTimeoutCts.Token); + managerApprovalSent = true; + } + + if (managerApprovalSent && !managerApprovalCompleted && + line.Contains("routed to a manager", StringComparison.OrdinalIgnoreCase)) + { + managerApprovalCompleted = true; + + // Everything else routes to the default branch (DirectorApproval). + await this.WriteInputAsync(process, "5000", testTimeoutCts.Token); + directorApprovalSent = true; + } + + if (directorApprovalSent && line.Contains("routed to a director", StringComparison.OrdinalIgnoreCase)) + { + directorApprovalCompleted = true; + break; + } + + this.AssertNoError(line); + } + + Assert.True(autoApproveCompleted, "Expense < 100 did not route to AutoApprove."); + Assert.True(managerApprovalCompleted, "Expense < 1000 did not route to ManagerApproval."); + Assert.True(directorApprovalCompleted, "Expense >= 1000 did not route to the default DirectorApproval branch."); + + await this.WriteInputAsync(process, "exit", testTimeoutCts.Token); + }); + } + private void AssertNoError(string line) { if (line.Contains("Failed:", StringComparison.OrdinalIgnoreCase) || diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs new file mode 100644 index 00000000000..8a4b4afaa9f --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableEdgeMapSwitchTests.cs @@ -0,0 +1,338 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text.Json; +using Microsoft.Agents.AI.DurableTask.Workflows; +using Microsoft.Agents.AI.DurableTask.Workflows.EdgeRouters; +using Microsoft.Agents.AI.Workflows; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Microsoft.Agents.AI.DurableTask.UnitTests.Workflows; + +/// +/// Tests for AddSwitch support in durable workflows. +/// A switch (AddSwitch) reduces to a single fan-out edge carrying an +/// EdgeAssigner that selects the matching case's target(s). The durable +/// routing layer must honor that selection so only the matching executor runs. +/// +public sealed class DurableEdgeMapSwitchTests +{ + private const string RouterId = "router"; + private const string EvenId = "evenSink"; + private const string OddId = "oddSink"; + + [Theory] + [InlineData(4, EvenId, OddId)] // even -> first case matches + [InlineData(7, OddId, EvenId)] // odd -> second case matches + public void RouteMessage_Switch_RoutesToMatchingCaseOnly(int number, string expected, string notExpected) + { + // Arrange: a switch with two mutually exclusive cases. + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n % 2 == 0, Sink(EvenId)); + sb.AddCase(n => n % 2 != 0, Sink(OddId)); + }); + + // Act + Dictionary> queues = Route(builder.Build(), number); + + // Assert: only the matching branch receives the message. + Assert.Equal(1, QueuedCount(queues, expected)); + Assert.Equal(0, QueuedCount(queues, notExpected)); + } + + [Fact] + public void RouteMessage_NoCaseMatches_RoutesToDefaultExecutorOnly() + { + // Arrange: a switch whose only case never matches, plus a default branch. + const string MatchId = "matchSink"; + const string DefaultId = "defaultSink"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n > 1000, Sink(MatchId)); + sb.WithDefault(Sink(DefaultId)); + }); + + // Act: 5 does not match the case, so it must fall through to the default. + Dictionary> queues = Route(builder.Build(), 5); + + // Assert: only the default branch receives the message. + Assert.Equal(1, QueuedCount(queues, DefaultId)); + Assert.Equal(0, QueuedCount(queues, MatchId)); + } + + [Fact] + public void RouteMessage_FanOutWithoutSelector_RoutesToAllTargets() + { + // Arrange: a plain fan-out edge (no target selector) must still reach every target. + const string TargetAId = "targetA"; + const string TargetBId = "targetB"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddFanOutEdge(router, [Sink(TargetAId), Sink(TargetBId)]); + + // Act + Dictionary> queues = Route(builder.Build(), 42); + + // Assert: both targets receive the message. + Assert.Equal(1, QueuedCount(queues, TargetAId)); + Assert.Equal(1, QueuedCount(queues, TargetBId)); + } + + [Fact] + public void RouteMessage_SwitchWithSiblingDirectEdge_DeliversToSelectedCaseAndSibling() + { + // Arrange: a switch plus an ordinary direct edge from the same source. The switch selects one case, + // while the sibling edge must always deliver (the switch must not suppress unrelated edges). + const string AuditId = "audit"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n % 2 == 0, Sink(EvenId)); + sb.AddCase(n => n % 2 != 0, Sink(OddId)); + }); + builder.AddEdge(router, Sink(AuditId)); + + // Act: 4 is even, so the even case matches; the audit sibling always receives the message. + Dictionary> queues = Route(builder.Build(), 4); + + // Assert + Assert.Equal(1, QueuedCount(queues, EvenId)); + Assert.Equal(0, QueuedCount(queues, OddId)); + Assert.Equal(1, QueuedCount(queues, AuditId)); + } + + [Fact] + public void RouteMessage_MultipleSwitchesFromSameSource_HonorsEachIndependently() + { + // Arrange: two switches from the same source. Both must be evaluated; neither should overwrite the other. + const string PositiveId = "positive"; + const string NonPositiveId = "nonPositive"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n > 0, Sink(PositiveId)); + sb.WithDefault(Sink(NonPositiveId)); + }); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n % 2 == 0, Sink(EvenId)); + sb.WithDefault(Sink(OddId)); + }); + + // Act: 4 is both positive and even. + Dictionary> queues = Route(builder.Build(), 4); + + // Assert: the matching branch of each switch receives the message. + Assert.Equal(1, QueuedCount(queues, PositiveId)); + Assert.Equal(0, QueuedCount(queues, NonPositiveId)); + Assert.Equal(1, QueuedCount(queues, EvenId)); + Assert.Equal(0, QueuedCount(queues, OddId)); + } + + [Fact] + public void RouteMessage_SelectorReturnsDuplicateIndex_DeliversMessageOncePerIndex() + { + // Arrange: a selector that returns the same index twice must deliver twice (no de-duplication), + // mirroring the in-process FanOutEdgeRunner. + const string TargetId = "dupTarget"; + const string OtherId = "other"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddFanOutEdge(router, [Sink(TargetId), Sink(OtherId)], (_, _) => [0, 0]); + + // Act + Dictionary> queues = Route(builder.Build(), 7); + + // Assert: index 0 was selected twice, so the target receives two messages. + Assert.Equal(2, QueuedCount(queues, TargetId)); + Assert.Equal(0, QueuedCount(queues, OtherId)); + } + + [Fact] + public void RouteMessage_SelectorReturnsOutOfRangeIndex_DoesNotThrow() + { + // Arrange: an out-of-range index must be surfaced (logged) rather than crash the routing layer. + const string TargetId = "target"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddFanOutEdge(router, [Sink(TargetId)], (_, _) => [5]); + + // Act + Assert: routing swallows the bad index, nothing is delivered, and no exception escapes. + Dictionary> queues = Route(builder.Build(), 7); + + Assert.Equal(0, QueuedCount(queues, TargetId)); + } + + [Fact] + public void RouteMessage_SelectorReturnsMixedValidAndInvalidIndex_DeliversValidTargetsOnly() + { + // Arrange: a selector returning [0, 5] for two targets. The out-of-range index 5 must be skipped + // without dropping the valid delivery to index 0. + const string ValidId = "valid"; + const string OtherId = "other"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddFanOutEdge(router, [Sink(ValidId), Sink(OtherId)], (_, _) => [0, 5]); + + // Act + Dictionary> queues = Route(builder.Build(), 7); + + // Assert: index 0 still receives the message; the invalid index is skipped. + Assert.Equal(1, QueuedCount(queues, ValidId)); + Assert.Equal(0, QueuedCount(queues, OtherId)); + } + + [Fact] + public void RouteMessage_SwitchCaseAndSiblingEdgeToSameTarget_BothDeliver() + { + // Arrange: a switch case and a sibling direct edge both target the same executor. The sibling edge must + // still be wired even though the switch already routes to that target. + FunctionExecutor evenSink = Sink(EvenId); + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n % 2 == 0, evenSink); + sb.AddCase(n => n % 2 != 0, Sink(OddId)); + }); + builder.AddEdge(router, evenSink); + + // Act: 4 is even, so the switch routes to evenSink; the sibling direct edge also delivers to it. + Dictionary> queues = Route(builder.Build(), 4); + + // Assert: evenSink receives the message twice (once via the switch case, once via the sibling edge). + Assert.Equal(2, QueuedCount(queues, EvenId)); + Assert.Equal(0, QueuedCount(queues, OddId)); + } + + [Theory] + [InlineData(4, 1)] // 4 > 3, sibling condition holds, audit receives the message + [InlineData(2, 0)] // 2 > 3 is false, sibling condition fails, audit is skipped + public void RouteMessage_SwitchWithConditionalSiblingEdge_HonorsSiblingCondition(int number, int expectedAuditCount) + { + // Arrange: a switch plus a conditional sibling direct edge from the same source. The sibling edge's + // condition must be honored even though it shares a source with the selector. + const string AuditId = "audit"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n % 2 == 0, Sink(EvenId)); + sb.AddCase(n => n % 2 != 0, Sink(OddId)); + }); + builder.AddEdge(router, Sink(AuditId), n => n > 3); + + // Act: both inputs are even, so the switch always routes to the even branch. + Dictionary> queues = Route(builder.Build(), number); + + // Assert: the switch delivers regardless; the audit sibling only delivers when its condition holds. + Assert.Equal(1, QueuedCount(queues, EvenId)); + Assert.Equal(expectedAuditCount, QueuedCount(queues, AuditId)); + } + + [Fact] + public void RouteMessage_SelectorThrows_DoesNotDeliverOrThrow() + { + // Arrange: a selector that throws when evaluated. Routing must swallow and log the failure rather + // than crash the orchestration, and nothing should be delivered. + const string TargetId = "target"; + const string OtherId = "other"; + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddFanOutEdge(router, [Sink(TargetId), Sink(OtherId)], (_, _) => throw new InvalidOperationException("boom")); + + // Act + Assert: the selector exception is swallowed, no message is delivered, and nothing escapes. + Dictionary> queues = Route(builder.Build(), 7); + + Assert.Equal(0, QueuedCount(queues, TargetId)); + Assert.Equal(0, QueuedCount(queues, OtherId)); + } + + [Fact] + public void IsFanInExecutor_SwitchCaseAndSiblingEdgeToSameTarget_NotTreatedAsFanIn() + { + // Arrange: a switch case and a sibling direct edge both target the same executor from the same source. + // Those repeated deliveries originate from a single source, so the target must not be treated as a + // fan-in (which would aggregate them into one invocation); it should run once per delivery, matching + // the in-process contract where aggregation is reserved for explicit fan-in edges. + FunctionExecutor evenSink = Sink(EvenId); + + FunctionExecutor router = Router(); + WorkflowBuilder builder = new(router); + builder.AddSwitch(router, sb => + { + sb.AddCase(n => n % 2 == 0, evenSink); + sb.AddCase(n => n % 2 != 0, Sink(OddId)); + }); + builder.AddEdge(router, evenSink); + + DurableEdgeMap edgeMap = BuildEdgeMap(builder.Build()); + + // Assert: a target fed twice by the same source is not a fan-in point. + Assert.False(edgeMap.IsFanInExecutor(EvenId)); + } + + [Fact] + public void IsFanInExecutor_MultipleDistinctSources_TreatedAsFanIn() + { + // Arrange: a diamond where two distinct sources converge on one target — a genuine fan-in. + FunctionExecutor start = new("start", (input, _, _) => input, outputTypes: [typeof(int)]); + FunctionExecutor left = new("left", (input, _, _) => input, outputTypes: [typeof(int)]); + FunctionExecutor right = new("right", (input, _, _) => input, outputTypes: [typeof(int)]); + FunctionExecutor target = Sink("target"); + + WorkflowBuilder builder = new(start); + builder.AddEdge(start, left); + builder.AddEdge(start, right); + builder.AddEdge(left, target); + builder.AddEdge(right, target); + + DurableEdgeMap edgeMap = BuildEdgeMap(builder.Build()); + + // Assert: two distinct predecessors still register as a fan-in point. + Assert.True(edgeMap.IsFanInExecutor("target")); + } + + private static FunctionExecutor Router() + => new(RouterId, (input, _, _) => input, outputTypes: [typeof(int)]); + + private static FunctionExecutor Sink(string id) + => new(id, (_, _, _) => default); + + private static DurableEdgeMap BuildEdgeMap(Workflow workflow) + => new(WorkflowAnalyzer.BuildGraphInfo(workflow)); + + private static Dictionary> Route(Workflow workflow, int number) + { + DurableEdgeMap edgeMap = BuildEdgeMap(workflow); + Dictionary> queues = []; + + edgeMap.RouteMessage( + RouterId, + JsonSerializer.Serialize(number, DurableSerialization.Options), + typeof(int).AssemblyQualifiedName, + queues, + NullLogger.Instance); + + return queues; + } + + private static int QueuedCount(Dictionary> queues, string executorId) + => queues.TryGetValue(executorId, out Queue? queue) ? queue.Count : 0; +}