From 8d45ad93e1f53113634a0fc3bb28dd6597cca096 Mon Sep 17 00:00:00 2001 From: halspang Date: Tue, 15 Apr 2025 16:16:11 -0700 Subject: [PATCH 1/3] Add versioning to DTFx orchestration dispatch This commit adds versioning similar to the feature created in durabletask-dotnet. This portion of versioning allows for an orchestration to be failed or abandonned based on the configuration provided. The actual configuration will be provided in a follow-up PR in the web extensions as it should be handled through the host.json file. Signed-off-by: halspang --- .../VersionSettingsTests.cs | 42 +++++++ .../Settings/VersioningSettings.cs | 106 ++++++++++++++++++ src/DurableTask.Core/TaskHubWorker.cs | 39 ++++--- .../TaskOrchestrationDispatcher.cs | 92 +++++++++++---- .../AutoStartOrchestrationCreator.cs | 9 +- .../AzureStorageScenarioTests.cs | 90 ++++++++++++++- .../TestHelpers.cs | 8 +- .../TestOrchestrationHost.cs | 43 +++++-- 8 files changed, 371 insertions(+), 58 deletions(-) create mode 100644 Test/DurableTask.Core.Tests/VersionSettingsTests.cs create mode 100644 src/DurableTask.Core/Settings/VersioningSettings.cs diff --git a/Test/DurableTask.Core.Tests/VersionSettingsTests.cs b/Test/DurableTask.Core.Tests/VersionSettingsTests.cs new file mode 100644 index 000000000..e2b9fad93 --- /dev/null +++ b/Test/DurableTask.Core.Tests/VersionSettingsTests.cs @@ -0,0 +1,42 @@ +using DurableTask.Core.Settings; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DurableTask.Core.Tests +{ + [TestClass] + public class VersionSettingsTests + { + [TestMethod] + [DataRow("1.0.0", "1.0.0", 0)] + [DataRow("1.1.0", "1.0.0", 1)] + [DataRow("1.0.0", "1.1.0", -1)] + [DataRow("1", "1", 0)] + [DataRow("2", "1", 1)] + [DataRow("1", "2", -1)] + [DataRow("", "1", -1)] + [DataRow("1", "", 1)] + [DataRow("", "", 0)] + public void TestVersionComparison(string orchVersion, string settingVersion, int expectedComparison) + { + int result = VersioningSettings.CompareVersions(orchVersion, settingVersion); + + if (expectedComparison == 0) + { + Assert.AreEqual(0, result, $"Expected {orchVersion} to be equal to {settingVersion}"); + } + else if (expectedComparison < 0) + { + Assert.IsTrue(result < 0, $"Expected {orchVersion} to be less than {settingVersion}"); + } + else + { + Assert.IsTrue(result > 0, $"Expected {orchVersion} to be greater than {settingVersion}"); + } + } + } +} \ No newline at end of file diff --git a/src/DurableTask.Core/Settings/VersioningSettings.cs b/src/DurableTask.Core/Settings/VersioningSettings.cs new file mode 100644 index 000000000..bf149ebac --- /dev/null +++ b/src/DurableTask.Core/Settings/VersioningSettings.cs @@ -0,0 +1,106 @@ +using System; + +namespace DurableTask.Core.Settings +{ + /// + /// Collection of settings that define the overall versioning behavior. + /// + public class VersioningSettings + { + /// + /// Defines the version matching strategy for the Durable Task worker. + /// + public enum VersionMatchStrategy + { + /// + /// Ignore Orchestration version, all work received is processed. + /// + None = 0, + + /// + /// Worker will only process Tasks from Orchestrations with the same version as the worker. + /// + Strict = 1, + + /// + /// Worker will process Tasks from Orchestrations whose version is less than or equal to the worker. + /// + CurrentOrOlder = 2, + } + + /// + /// Defines the versioning failure strategy for the Durable Task worker. + /// + public enum VersionFailureStrategy + { + /// + /// Do not change the orchestration state if the version does not adhere to the matching strategy. + /// + Reject = 0, + + /// + /// Fail the orchestration if the version does not adhere to the matching strategy. + /// + Fail = 1, + } + + /// + /// Gets or sets the version associated with the settings. + /// + public string Version { get; set; } = string.Empty; + + /// + /// Gets or sets the that is used for matching versions. + /// + public VersionMatchStrategy MatchStrategy { get; set; } = VersionMatchStrategy.None; + + /// + /// Gets or sets the that is used to determine what happens on a versioning failure. + /// + public VersionFailureStrategy FailureStrategy { get; set; } = VersionFailureStrategy.Reject; + + /// + /// Compare two versions to each other. + /// + /// + /// This method's comparison is handled in the following order: + /// 1. The versions are checked if they are empty (non-versioned). Both being empty signifies equality. + /// 2. If sourceVersion is empty but otherVersion is defined, this is treated as the source being less than the other. + /// 3. If otherVersion is empty but sourceVersion is defined, this is treated as the source being greater than the other. + /// 4. Both versions are attempted to be parsed into System.Version and compared as such. + /// 5. If all else fails, a direct string comparison is done between the versions. + /// + /// The source version that will be compared against the other version. + /// The other version to compare against. + /// An int representing how sourceVersion compares to otherVersion. + public static int CompareVersions(string sourceVersion, string otherVersion) + { + // Both versions are empty, treat as equal. + if (string.IsNullOrWhiteSpace(sourceVersion) && string.IsNullOrWhiteSpace(otherVersion)) + { + return 0; + } + + // An empty version in the context is always less than a defined version in the parameter. + if (string.IsNullOrWhiteSpace(sourceVersion)) + { + return -1; + } + + // An empty version in the parameter is always less than a defined version in the context. + if (string.IsNullOrWhiteSpace(otherVersion)) + { + return 1; + } + + // If both versions use the .NET Version class, return that comparison. + if (System.Version.TryParse(sourceVersion, out Version parsedSourceVersion) && System.Version.TryParse(otherVersion, out Version parsedOtherVersion)) + { + return parsedSourceVersion.CompareTo(parsedOtherVersion); + } + + // If we have gotten to here, we don't know the syntax of the versions we are comparing, use a string comparison as a final check. + return string.Compare(sourceVersion, otherVersion, StringComparison.OrdinalIgnoreCase); + } + } +} diff --git a/src/DurableTask.Core/TaskHubWorker.cs b/src/DurableTask.Core/TaskHubWorker.cs index 24271dc78..e9f6808ea 100644 --- a/src/DurableTask.Core/TaskHubWorker.cs +++ b/src/DurableTask.Core/TaskHubWorker.cs @@ -13,6 +13,12 @@ namespace DurableTask.Core { + using DurableTask.Core.Entities; + using DurableTask.Core.Exceptions; + using DurableTask.Core.Logging; + using DurableTask.Core.Middleware; + using DurableTask.Core.Settings; + using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Diagnostics; @@ -21,11 +27,6 @@ namespace DurableTask.Core using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; - using DurableTask.Core.Entities; - using DurableTask.Core.Exceptions; - using DurableTask.Core.Logging; - using DurableTask.Core.Middleware; - using Microsoft.Extensions.Logging; /// /// Allows users to load the TaskOrchestration and TaskActivity classes and start @@ -36,6 +37,7 @@ public sealed class TaskHubWorker : IDisposable readonly INameVersionObjectManager activityManager; readonly INameVersionObjectManager orchestrationManager; readonly INameVersionObjectManager entityManager; + readonly VersioningSettings versioningSettings; readonly DispatchMiddlewarePipeline orchestrationDispatchPipeline = new DispatchMiddlewarePipeline(); readonly DispatchMiddlewarePipeline entityDispatchPipeline = new DispatchMiddlewarePipeline(); @@ -76,13 +78,15 @@ public TaskHubWorker(IOrchestrationService orchestrationService) /// /// Reference the orchestration service implementation /// The to use for logging - public TaskHubWorker(IOrchestrationService orchestrationService, ILoggerFactory loggerFactory = null) + /// The that define how orchestration versions are handled + public TaskHubWorker(IOrchestrationService orchestrationService, ILoggerFactory loggerFactory = null, VersioningSettings versioningSettings = null) : this( orchestrationService, new NameVersionObjectManager(), new NameVersionObjectManager(), new NameVersionObjectManager(), - loggerFactory) + loggerFactory, + versioningSettings) { } @@ -101,7 +105,8 @@ public TaskHubWorker( orchestrationObjectManager, activityObjectManager, new NameVersionObjectManager(), - loggerFactory: null) + loggerFactory: null, + versioningSettings: null) { } @@ -112,17 +117,20 @@ public TaskHubWorker( /// The for orchestrations /// The for activities /// The to use for logging + /// The that define how orchestration versions are handled public TaskHubWorker( IOrchestrationService orchestrationService, INameVersionObjectManager orchestrationObjectManager, INameVersionObjectManager activityObjectManager, - ILoggerFactory loggerFactory = null) + ILoggerFactory loggerFactory = null, + VersioningSettings versioningSettings = null) : this( orchestrationService, orchestrationObjectManager, activityObjectManager, new NameVersionObjectManager(), - loggerFactory) + loggerFactory, + versioningSettings) { } @@ -134,12 +142,14 @@ public TaskHubWorker( /// NameVersionObjectManager for Activities /// The NameVersionObjectManager for entities. The version is the entity key. /// The to use for logging + /// The that define how orchestration versions are handled public TaskHubWorker( IOrchestrationService orchestrationService, INameVersionObjectManager orchestrationObjectManager, INameVersionObjectManager activityObjectManager, INameVersionObjectManager entityObjectManager, - ILoggerFactory loggerFactory = null) + ILoggerFactory loggerFactory = null, + VersioningSettings versioningSettings = null) { this.orchestrationManager = orchestrationObjectManager ?? throw new ArgumentException("orchestrationObjectManager"); this.activityManager = activityObjectManager ?? throw new ArgumentException("activityObjectManager"); @@ -147,6 +157,7 @@ public TaskHubWorker( this.orchestrationService = orchestrationService ?? throw new ArgumentException("orchestrationService"); this.logHelper = new LogHelper(loggerFactory?.CreateLogger("DurableTask.Core")); this.dispatchEntitiesSeparately = (orchestrationService as IEntityOrchestrationService)?.EntityBackendProperties?.UseSeparateQueueForEntityWorkItems ?? false; + this.versioningSettings = versioningSettings; } /// @@ -219,13 +230,13 @@ public async Task StartAsync() this.logHelper.TaskHubWorkerStarting(); var sw = Stopwatch.StartNew(); - this.orchestrationDispatcher = new TaskOrchestrationDispatcher( this.orchestrationService, this.orchestrationManager, this.orchestrationDispatchPipeline, this.logHelper, - this.ErrorPropagationMode); + this.ErrorPropagationMode, + this.versioningSettings); this.activityDispatcher = new TaskActivityDispatcher( this.orchestrationService, this.activityManager, @@ -357,7 +368,7 @@ public TaskHubWorker AddTaskEntities(params Type[] taskEntityTypes) type.Name, string.Empty, type); - + this.entityManager.Add(creator); } diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 61864a1a5..d1cb03f87 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -13,12 +13,6 @@ #nullable enable namespace DurableTask.Core { - using System; - using System.Collections.Generic; - using System.Diagnostics; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; using DurableTask.Core.Command; using DurableTask.Core.Common; using DurableTask.Core.Entities; @@ -27,7 +21,14 @@ namespace DurableTask.Core using DurableTask.Core.Logging; using DurableTask.Core.Middleware; using DurableTask.Core.Serializing; + using DurableTask.Core.Settings; using DurableTask.Core.Tracing; + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; using ActivityStatusCode = Tracing.ActivityStatusCode; /// @@ -47,13 +48,15 @@ public class TaskOrchestrationDispatcher readonly IEntityOrchestrationService? entityOrchestrationService; readonly EntityBackendProperties? entityBackendProperties; readonly TaskOrchestrationEntityParameters? entityParameters; + readonly VersioningSettings? versioningSettings; internal TaskOrchestrationDispatcher( IOrchestrationService orchestrationService, INameVersionObjectManager objectManager, DispatchMiddlewarePipeline dispatchPipeline, LogHelper logHelper, - ErrorPropagationMode errorPropagationMode) + ErrorPropagationMode errorPropagationMode, + VersioningSettings versioningSettings) { this.objectManager = objectManager ?? throw new ArgumentNullException(nameof(objectManager)); this.orchestrationService = orchestrationService ?? throw new ArgumentNullException(nameof(orchestrationService)); @@ -63,6 +66,7 @@ internal TaskOrchestrationDispatcher( this.entityOrchestrationService = orchestrationService as IEntityOrchestrationService; this.entityBackendProperties = this.entityOrchestrationService?.EntityBackendProperties; this.entityParameters = TaskOrchestrationEntityParameters.FromEntityBackendProperties(this.entityBackendProperties); + this.versioningSettings = versioningSettings; this.dispatcher = new WorkItemDispatcher( "TaskOrchestrationDispatcher", @@ -155,7 +159,7 @@ void EnsureExecutionStartedIsFirst(IList batch) { // Keep track of orchestrator generation changes, maybe update target position string executionId = message.OrchestrationInstance.ExecutionId; - if(previousExecutionId != executionId) + if (previousExecutionId != executionId) { // We want to re-position the ExecutionStarted event after the "right-most" // event with a non-null executionID that came before it. @@ -217,7 +221,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) CorrelationTraceClient.Propagate( () => - { + { // Check if it is extended session. // TODO: Remove this code - it looks incorrect and dangerous isExtendedSession = this.concurrentSessionLock.Acquire(); @@ -305,7 +309,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work var isCompleted = false; var continuedAsNew = false; var isInterrupted = false; - + // correlation CorrelationTraceClient.Propagate(() => CorrelationTraceContext.Current = workItem.TraceContext); @@ -363,6 +367,44 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work continuedAsNew = false; continuedAsNewMessage = null; + IReadOnlyList decisions = new List(); + bool versioningFailed = false; + + if (this.versioningSettings != null) + { + switch (this.versioningSettings.MatchStrategy) + { + case VersioningSettings.VersionMatchStrategy.None: + // No versioning, do nothing + break; + case VersioningSettings.VersionMatchStrategy.Strict: + versioningFailed = this.versioningSettings.Version != runtimeState.Version; + break; + case VersioningSettings.VersionMatchStrategy.CurrentOrOlder: + // Positive result indicates the orchestration version is higher than the versioning settings. + versioningFailed = VersioningSettings.CompareVersions(runtimeState.Version, this.versioningSettings.Version) > 0; + break; + } + + if (versioningFailed) + { + if (this.versioningSettings.FailureStrategy == VersioningSettings.VersionFailureStrategy.Fail) + { + var failureAction = new OrchestrationCompleteOrchestratorAction + { + Id = runtimeState.PastEvents.Count, + FailureDetails = new FailureDetails("VersionMismatch", "Orchestration version did not comply with Worker Versioning", null, null, true), + OrchestrationStatus = OrchestrationStatus.Failed, + }; + decisions = new List { failureAction }; + } + else // Abandon work item in all other cases (will be retried later). + { + await this.orchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem); + break; + } + } + } this.logHelper.OrchestrationExecuting(runtimeState.OrchestrationInstance!, runtimeState.Name); TraceHelper.TraceInstance( @@ -372,17 +414,19 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work "Executing user orchestration: {0}", JsonDataConverter.Default.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true)); - if (workItem.Cursor == null) + if (!versioningFailed) { - workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem); - } - else - { - await this.ResumeOrchestrationAsync(workItem); + if (workItem.Cursor == null) + { + workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem); + } + else + { + await this.ResumeOrchestrationAsync(workItem); + } + decisions = workItem.Cursor.LatestDecisions.ToList(); } - IReadOnlyList decisions = workItem.Cursor.LatestDecisions.ToList(); - this.logHelper.OrchestrationExecuted( runtimeState.OrchestrationInstance!, runtimeState.Name, @@ -618,7 +662,7 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( continuedAsNew ? null : timerMessages, continuedAsNewMessage, instanceState); - + if (workItem.RestoreOriginalRuntimeStateDuringCompletion) { workItem.OrchestrationRuntimeState = runtimeState; @@ -1143,11 +1187,11 @@ TaskMessage ProcessSendEventDecision( { var historyEvent = new EventSentEvent(sendEventAction.Id) { - InstanceId = sendEventAction.Instance?.InstanceId, - Name = sendEventAction.EventName, - Input = sendEventAction.EventData + InstanceId = sendEventAction.Instance?.InstanceId, + Name = sendEventAction.EventName, + Input = sendEventAction.EventData }; - + runtimeState.AddEvent(historyEvent); EventRaisedEvent eventRaisedEvent = new EventRaisedEvent(-1, sendEventAction.EventData) @@ -1169,7 +1213,7 @@ TaskMessage ProcessSendEventDecision( Event = eventRaisedEvent }; } - + internal class NonBlockingCountdownLock { int available; diff --git a/test/DurableTask.AzureStorage.Tests/AutoStartOrchestrationCreator.cs b/test/DurableTask.AzureStorage.Tests/AutoStartOrchestrationCreator.cs index 9b62df9aa..617e01885 100644 --- a/test/DurableTask.AzureStorage.Tests/AutoStartOrchestrationCreator.cs +++ b/test/DurableTask.AzureStorage.Tests/AutoStartOrchestrationCreator.cs @@ -24,14 +24,16 @@ public class AutoStartOrchestrationCreator : ObjectCreator { readonly TaskOrchestration instance; readonly Type prototype; + readonly string version; /// /// Creates a new AutoStartOrchestrationCreator of supplied type /// /// Type to use for the creator - public AutoStartOrchestrationCreator(Type type) + public AutoStartOrchestrationCreator(Type type, string version = null) { this.prototype = type; + this.version = version; Initialize(type); } @@ -39,9 +41,10 @@ public AutoStartOrchestrationCreator(Type type) /// Creates a new AutoStartOrchestrationCreator using type of supplied object instance /// /// Object instances to infer the type from - public AutoStartOrchestrationCreator(TaskOrchestration instance) + public AutoStartOrchestrationCreator(TaskOrchestration instance, string version = null) { this.instance = instance; + this.version = version; Initialize(instance); } @@ -59,7 +62,7 @@ public override TaskOrchestration Create() void Initialize(object obj) { Name = $"@{NameVersionHelper.GetDefaultName(obj)}"; - Version = NameVersionHelper.GetDefaultVersion(obj); + Version = !string.IsNullOrEmpty(version) ? this.version : NameVersionHelper.GetDefaultVersion(obj); } } } \ No newline at end of file diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index 6b5593b25..c115c127d 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -30,6 +30,7 @@ namespace DurableTask.AzureStorage.Tests using DurableTask.Core; using DurableTask.Core.Exceptions; using DurableTask.Core.History; + using DurableTask.Core.Settings; using Microsoft.Practices.EnterpriseLibrary.SemanticLogging.Utility; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -522,7 +523,7 @@ await client.PurgeInstanceHistoryByTimePeriod( List thirdHistoryEventsAfterPurging = await client.GetOrchestrationHistoryAsync(thirdInstanceId); Assert.AreEqual(0, thirdHistoryEventsAfterPurging.Count); - ListfourthHistoryEventsAfterPurging = await client.GetOrchestrationHistoryAsync(fourthInstanceId); + List fourthHistoryEventsAfterPurging = await client.GetOrchestrationHistoryAsync(fourthInstanceId); Assert.AreEqual(0, fourthHistoryEventsAfterPurging.Count); firstOrchestrationStateList = await client.GetStateAsync(firstInstanceId); @@ -928,7 +929,7 @@ public async Task SuspendResumeOrchestration(bool enableExtendedSessions) // Test case 3: external event now goes through await client.ResumeAsync("wakeUp"); - status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); Assert.AreEqual(changedStatus, JToken.Parse(status?.Status)); @@ -1811,7 +1812,7 @@ public async Task LargeTextMessagePayloads_URIFormatCheck(bool enableExtendedSes } } - private StringBuilder GenerateMediumRandomStringPayload(int numChars = 128*1024, short utf8ByteSize = 1, short utf16ByteSize = 2) + private StringBuilder GenerateMediumRandomStringPayload(int numChars = 128 * 1024, short utf8ByteSize = 1, short utf16ByteSize = 2) { string Chars; if (utf16ByteSize != 2 && utf16ByteSize != 4) @@ -2196,7 +2197,7 @@ public async Task AbortOrchestrationAndActivity(bool enableExtendedSessions) } } - /// + /// /// Validates scheduled starts, ensuring they are executed according to defined start date time /// /// @@ -2411,6 +2412,87 @@ public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSession } } + [TestMethod] + [DataRow(VersioningSettings.VersionMatchStrategy.Strict)] + [DataRow(VersioningSettings.VersionMatchStrategy.CurrentOrOlder)] + [DataRow(VersioningSettings.VersionMatchStrategy.None)] + public async Task OrchestrationFailsWithVersionMismatch(VersioningSettings.VersionMatchStrategy matchStrategy) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(false, versioningSettings: new VersioningSettings + { + Version = "1", + MatchStrategy = matchStrategy, + FailureStrategy = VersioningSettings.VersionFailureStrategy.Fail + })) + { + await host.StartAsync(); + + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "World", tags: new Dictionary(), version: "2"); + var status = await client.WaitForCompletionAsync(StandardTimeout); + + if (matchStrategy == VersioningSettings.VersionMatchStrategy.None) + { + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + } + else + { + Assert.AreEqual(OrchestrationStatus.Failed, status?.OrchestrationStatus); + } + + await host.StopAsync(); + } + } + + [TestMethod] + [DataRow(VersioningSettings.VersionMatchStrategy.Strict, "1.0.0")] + [DataRow(VersioningSettings.VersionMatchStrategy.CurrentOrOlder, "1.0.0")] + [DataRow(VersioningSettings.VersionMatchStrategy.CurrentOrOlder, "0.9.0")] + [DataRow(VersioningSettings.VersionMatchStrategy.None, "1.0.0")] + public async Task OrchestrationSucceedsWithVersion(VersioningSettings.VersionMatchStrategy matchStrategy, string orchestrationVersion) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(false, versioningSettings: new VersioningSettings + { + Version = "1.0.0", + MatchStrategy = matchStrategy, + FailureStrategy = VersioningSettings.VersionFailureStrategy.Fail + })) + { + await host.StartAsync(); + + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "World", tags: new Dictionary(), version: orchestrationVersion); + var status = await client.WaitForCompletionAsync(StandardTimeout); + + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task OrchestrationRejectsWithVersionMismatch() + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(false, versioningSettings: new VersioningSettings + { + Version = "1", + MatchStrategy = VersioningSettings.VersionMatchStrategy.Strict, + FailureStrategy = VersioningSettings.VersionFailureStrategy.Reject + })) + { + await host.StartAsync(); + + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "World", tags: new Dictionary(), version: "2"); + // We intend for this to timeout as the work should be getting rejected. + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.IsNull(status); + + // We should either be pending (recently rejected) or running (to be rejected). + status = await client.GetStatusAsync(); + Assert.IsTrue(OrchestrationStatus.Running == status?.OrchestrationStatus || OrchestrationStatus.Pending == status?.OrchestrationStatus); + + await host.StopAsync(); + } + } + #if !NET462 /// /// End-to-end test which validates a simple orchestrator function that calls an activity function diff --git a/test/DurableTask.AzureStorage.Tests/TestHelpers.cs b/test/DurableTask.AzureStorage.Tests/TestHelpers.cs index 80bd79e0d..4264c1e92 100644 --- a/test/DurableTask.AzureStorage.Tests/TestHelpers.cs +++ b/test/DurableTask.AzureStorage.Tests/TestHelpers.cs @@ -13,13 +13,14 @@ #nullable enable namespace DurableTask.AzureStorage.Tests { + using DurableTask.Core.Logging; + using DurableTask.Core.Settings; + using Microsoft.Extensions.Logging; using System; using System.Configuration; using System.Diagnostics; using System.Diagnostics.Tracing; using System.Threading.Tasks; - using DurableTask.Core.Logging; - using Microsoft.Extensions.Logging; static class TestHelpers { @@ -28,6 +29,7 @@ public static TestOrchestrationHost GetTestOrchestrationHost( int extendedSessionTimeoutInSeconds = 30, bool fetchLargeMessages = true, bool allowReplayingTerminalInstances = false, + VersioningSettings? versioningSettings = null, Action? modifySettingsAction = null) { AzureStorageOrchestrationServiceSettings settings = GetTestAzureStorageOrchestrationServiceSettings( @@ -38,7 +40,7 @@ public static TestOrchestrationHost GetTestOrchestrationHost( // Give the caller a chance to make test-specific changes to the settings modifySettingsAction?.Invoke(settings); - return new TestOrchestrationHost(settings); + return new TestOrchestrationHost(settings, versioningSettings); } public static AzureStorageOrchestrationServiceSettings GetTestAzureStorageOrchestrationServiceSettings( diff --git a/test/DurableTask.AzureStorage.Tests/TestOrchestrationHost.cs b/test/DurableTask.AzureStorage.Tests/TestOrchestrationHost.cs index b88e0dfcf..f5e630b6f 100644 --- a/test/DurableTask.AzureStorage.Tests/TestOrchestrationHost.cs +++ b/test/DurableTask.AzureStorage.Tests/TestOrchestrationHost.cs @@ -13,33 +13,36 @@ namespace DurableTask.AzureStorage.Tests { + using DurableTask.Core; + using DurableTask.Core.Settings; + using Microsoft.Extensions.Logging; + using Microsoft.VisualStudio.TestTools.UnitTesting; using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Runtime.Serialization; using System.Threading.Tasks; - using DurableTask.Core; - using Microsoft.Extensions.Logging; - using Microsoft.VisualStudio.TestTools.UnitTesting; internal sealed class TestOrchestrationHost : IDisposable { internal readonly AzureStorageOrchestrationService service; readonly AzureStorageOrchestrationServiceSettings settings; - readonly TaskHubWorker worker; readonly TaskHubClient client; readonly HashSet addedOrchestrationTypes; readonly HashSet addedActivityTypes; - public TestOrchestrationHost(AzureStorageOrchestrationServiceSettings settings) + // We allow updates to the worker for versioning tests. + TaskHubWorker worker; + + public TestOrchestrationHost(AzureStorageOrchestrationServiceSettings settings, VersioningSettings versioningSettings = null) { this.service = new AzureStorageOrchestrationService(settings); this.service.CreateAsync().GetAwaiter().GetResult(); this.settings = settings; - this.worker = new TaskHubWorker(service, loggerFactory: settings.LoggerFactory); + this.worker = new TaskHubWorker(service, loggerFactory: settings.LoggerFactory, versioningSettings: versioningSettings); this.client = new TaskHubClient(service, loggerFactory: settings.LoggerFactory); this.addedOrchestrationTypes = new HashSet(); this.addedActivityTypes = new HashSet(); @@ -62,6 +65,14 @@ public Task StopAsync() return this.worker.StopAsync(isForced: true); } + public async Task UpdateWorkerVersion(VersioningSettings versioningSettings) + { + // Stop the current worker and create a new one with the new versioning settings. + await this.worker.StopAsync(); + this.worker = new TaskHubWorker(this.service, loggerFactory: this.settings.LoggerFactory, versioningSettings: versioningSettings); + await this.worker.StartAsync(); + } + public void AddAutoStartOrchestrator(Type type) { this.worker.AddTaskOrchestrations(new AutoStartOrchestrationCreator(type)); @@ -73,7 +84,8 @@ public async Task StartOrchestrationAsync( object input, string instanceId = null, DateTime? startAt = null, - IDictionary tags = null) + IDictionary tags = null, + string version = null) { if (startAt != null && tags != null) { @@ -82,7 +94,17 @@ public async Task StartOrchestrationAsync( if (!this.addedOrchestrationTypes.Contains(orchestrationType)) { - this.worker.AddTaskOrchestrations(orchestrationType); + if (version != null) + { + this.worker.AddTaskOrchestrations(new NameValueObjectCreator( + NameVersionHelper.GetDefaultName(orchestrationType), + version, + orchestrationType)); + } + else + { + this.worker.AddTaskOrchestrations(orchestrationType); + } this.addedOrchestrationTypes.Add(orchestrationType); } @@ -110,11 +132,12 @@ public async Task StartOrchestrationAsync( DateTime creationTime = DateTime.UtcNow; OrchestrationInstance instance; + string orchestrationVersion = !string.IsNullOrEmpty(version) ? version : NameVersionHelper.GetDefaultVersion(orchestrationType); if (tags != null) { instance = await this.client.CreateOrchestrationInstanceAsync( NameVersionHelper.GetDefaultName(orchestrationType), - NameVersionHelper.GetDefaultVersion(orchestrationType), + orchestrationVersion, instanceId, input, tags); @@ -315,7 +338,7 @@ class TestObjectCreator : ObjectCreator { readonly T obj; - public TestObjectCreator(string name, T obj) + public TestObjectCreator(string name, T obj) : this(name, string.Empty, obj) { } From b87cce6b86b9d3cc29288f54853e2724c6ce19f0 Mon Sep 17 00:00:00 2001 From: halspang Date: Fri, 18 Apr 2025 17:31:55 -0700 Subject: [PATCH 2/3] Update constructors to remove breaking change Signed-off-by: halspang --- src/DurableTask.Core/TaskHubWorker.cs | 78 +++++++++++++++++++++++---- 1 file changed, 69 insertions(+), 9 deletions(-) diff --git a/src/DurableTask.Core/TaskHubWorker.cs b/src/DurableTask.Core/TaskHubWorker.cs index e9f6808ea..629453645 100644 --- a/src/DurableTask.Core/TaskHubWorker.cs +++ b/src/DurableTask.Core/TaskHubWorker.cs @@ -78,15 +78,30 @@ public TaskHubWorker(IOrchestrationService orchestrationService) /// /// Reference the orchestration service implementation /// The to use for logging + public TaskHubWorker(IOrchestrationService orchestrationService, ILoggerFactory loggerFactory = null) + : this( + orchestrationService, + new NameVersionObjectManager(), + new NameVersionObjectManager(), + new NameVersionObjectManager(), + loggerFactory) + { + } + + /// + /// Create a new TaskHubWorker with given OrchestrationService + /// + /// Reference the orchestration service implementation /// The that define how orchestration versions are handled - public TaskHubWorker(IOrchestrationService orchestrationService, ILoggerFactory loggerFactory = null, VersioningSettings versioningSettings = null) + /// The to use for logging + public TaskHubWorker(IOrchestrationService orchestrationService, VersioningSettings versioningSettings, ILoggerFactory loggerFactory = null) : this( orchestrationService, new NameVersionObjectManager(), new NameVersionObjectManager(), new NameVersionObjectManager(), - loggerFactory, - versioningSettings) + versioningSettings, + loggerFactory) { } @@ -117,20 +132,41 @@ public TaskHubWorker( /// The for orchestrations /// The for activities /// The to use for logging + public TaskHubWorker( + IOrchestrationService orchestrationService, + INameVersionObjectManager orchestrationObjectManager, + INameVersionObjectManager activityObjectManager, + ILoggerFactory loggerFactory = null) + : this( + orchestrationService, + orchestrationObjectManager, + activityObjectManager, + new NameVersionObjectManager(), + loggerFactory) + { + } + + /// + /// Create a new with given and name version managers + /// + /// The orchestration service implementation + /// The for orchestrations + /// The for activities /// The that define how orchestration versions are handled + /// The to use for logging public TaskHubWorker( IOrchestrationService orchestrationService, INameVersionObjectManager orchestrationObjectManager, INameVersionObjectManager activityObjectManager, - ILoggerFactory loggerFactory = null, - VersioningSettings versioningSettings = null) + VersioningSettings versioningSettings, + ILoggerFactory loggerFactory = null) : this( orchestrationService, orchestrationObjectManager, activityObjectManager, new NameVersionObjectManager(), - loggerFactory, - versioningSettings) + versioningSettings, + loggerFactory) { } @@ -142,14 +178,38 @@ public TaskHubWorker( /// NameVersionObjectManager for Activities /// The NameVersionObjectManager for entities. The version is the entity key. /// The to use for logging + public TaskHubWorker( + IOrchestrationService orchestrationService, + INameVersionObjectManager orchestrationObjectManager, + INameVersionObjectManager activityObjectManager, + INameVersionObjectManager entityObjectManager, + ILoggerFactory loggerFactory = null) + : this( + orchestrationService, + orchestrationObjectManager, + activityObjectManager, + entityObjectManager, + null, + loggerFactory) + { + } + + /// + /// Create a new TaskHubWorker with given OrchestrationService and name version managers + /// + /// Reference the orchestration service implementation + /// NameVersionObjectManager for Orchestrations + /// NameVersionObjectManager for Activities + /// The NameVersionObjectManager for entities. The version is the entity key. /// The that define how orchestration versions are handled + /// The to use for logging public TaskHubWorker( IOrchestrationService orchestrationService, INameVersionObjectManager orchestrationObjectManager, INameVersionObjectManager activityObjectManager, INameVersionObjectManager entityObjectManager, - ILoggerFactory loggerFactory = null, - VersioningSettings versioningSettings = null) + VersioningSettings versioningSettings, + ILoggerFactory loggerFactory = null) { this.orchestrationManager = orchestrationObjectManager ?? throw new ArgumentException("orchestrationObjectManager"); this.activityManager = activityObjectManager ?? throw new ArgumentException("activityObjectManager"); From fb66bcb214bb0e5cbc45ffd26f1f180fda85f4d5 Mon Sep 17 00:00:00 2001 From: halspang Date: Mon, 21 Apr 2025 10:28:54 -0700 Subject: [PATCH 3/3] Add copyright header Signed-off-by: halspang --- .../VersionSettingsTests.cs | 20 +++++++++++++------ .../Settings/VersioningSettings.cs | 15 +++++++++++++- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/Test/DurableTask.Core.Tests/VersionSettingsTests.cs b/Test/DurableTask.Core.Tests/VersionSettingsTests.cs index e2b9fad93..44128164d 100644 --- a/Test/DurableTask.Core.Tests/VersionSettingsTests.cs +++ b/Test/DurableTask.Core.Tests/VersionSettingsTests.cs @@ -1,10 +1,18 @@ -using DurableTask.Core.Settings; +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +using DurableTask.Core.Settings; using Microsoft.VisualStudio.TestTools.UnitTesting; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace DurableTask.Core.Tests { diff --git a/src/DurableTask.Core/Settings/VersioningSettings.cs b/src/DurableTask.Core/Settings/VersioningSettings.cs index bf149ebac..41b00d161 100644 --- a/src/DurableTask.Core/Settings/VersioningSettings.cs +++ b/src/DurableTask.Core/Settings/VersioningSettings.cs @@ -1,4 +1,17 @@ -using System; +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +using System; namespace DurableTask.Core.Settings {