diff --git a/Test/DurableTask.Core.Tests/VersionSettingsTests.cs b/Test/DurableTask.Core.Tests/VersionSettingsTests.cs
new file mode 100644
index 000000000..44128164d
--- /dev/null
+++ b/Test/DurableTask.Core.Tests/VersionSettingsTests.cs
@@ -0,0 +1,50 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ----------------------------------------------------------------------------------
+
+using DurableTask.Core.Settings;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+namespace DurableTask.Core.Tests
+{
+ [TestClass]
+ public class VersionSettingsTests
+ {
+ [TestMethod]
+ [DataRow("1.0.0", "1.0.0", 0)]
+ [DataRow("1.1.0", "1.0.0", 1)]
+ [DataRow("1.0.0", "1.1.0", -1)]
+ [DataRow("1", "1", 0)]
+ [DataRow("2", "1", 1)]
+ [DataRow("1", "2", -1)]
+ [DataRow("", "1", -1)]
+ [DataRow("1", "", 1)]
+ [DataRow("", "", 0)]
+ public void TestVersionComparison(string orchVersion, string settingVersion, int expectedComparison)
+ {
+ int result = VersioningSettings.CompareVersions(orchVersion, settingVersion);
+
+ if (expectedComparison == 0)
+ {
+ Assert.AreEqual(0, result, $"Expected {orchVersion} to be equal to {settingVersion}");
+ }
+ else if (expectedComparison < 0)
+ {
+ Assert.IsTrue(result < 0, $"Expected {orchVersion} to be less than {settingVersion}");
+ }
+ else
+ {
+ Assert.IsTrue(result > 0, $"Expected {orchVersion} to be greater than {settingVersion}");
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DurableTask.Core/Settings/VersioningSettings.cs b/src/DurableTask.Core/Settings/VersioningSettings.cs
new file mode 100644
index 000000000..41b00d161
--- /dev/null
+++ b/src/DurableTask.Core/Settings/VersioningSettings.cs
@@ -0,0 +1,119 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ----------------------------------------------------------------------------------
+
+using System;
+
+namespace DurableTask.Core.Settings
+{
+ ///
+ /// Collection of settings that define the overall versioning behavior.
+ ///
+ public class VersioningSettings
+ {
+ ///
+ /// Defines the version matching strategy for the Durable Task worker.
+ ///
+ public enum VersionMatchStrategy
+ {
+ ///
+ /// Ignore Orchestration version, all work received is processed.
+ ///
+ None = 0,
+
+ ///
+ /// Worker will only process Tasks from Orchestrations with the same version as the worker.
+ ///
+ Strict = 1,
+
+ ///
+ /// Worker will process Tasks from Orchestrations whose version is less than or equal to the worker.
+ ///
+ CurrentOrOlder = 2,
+ }
+
+ ///
+ /// Defines the versioning failure strategy for the Durable Task worker.
+ ///
+ public enum VersionFailureStrategy
+ {
+ ///
+ /// Do not change the orchestration state if the version does not adhere to the matching strategy.
+ ///
+ Reject = 0,
+
+ ///
+ /// Fail the orchestration if the version does not adhere to the matching strategy.
+ ///
+ Fail = 1,
+ }
+
+ ///
+ /// Gets or sets the version associated with the settings.
+ ///
+ public string Version { get; set; } = string.Empty;
+
+ ///
+ /// Gets or sets the that is used for matching versions.
+ ///
+ public VersionMatchStrategy MatchStrategy { get; set; } = VersionMatchStrategy.None;
+
+ ///
+ /// Gets or sets the that is used to determine what happens on a versioning failure.
+ ///
+ public VersionFailureStrategy FailureStrategy { get; set; } = VersionFailureStrategy.Reject;
+
+ ///
+ /// Compare two versions to each other.
+ ///
+ ///
+ /// This method's comparison is handled in the following order:
+ /// 1. The versions are checked if they are empty (non-versioned). Both being empty signifies equality.
+ /// 2. If sourceVersion is empty but otherVersion is defined, this is treated as the source being less than the other.
+ /// 3. If otherVersion is empty but sourceVersion is defined, this is treated as the source being greater than the other.
+ /// 4. Both versions are attempted to be parsed into System.Version and compared as such.
+ /// 5. If all else fails, a direct string comparison is done between the versions.
+ ///
+ /// The source version that will be compared against the other version.
+ /// The other version to compare against.
+ /// An int representing how sourceVersion compares to otherVersion.
+ public static int CompareVersions(string sourceVersion, string otherVersion)
+ {
+ // Both versions are empty, treat as equal.
+ if (string.IsNullOrWhiteSpace(sourceVersion) && string.IsNullOrWhiteSpace(otherVersion))
+ {
+ return 0;
+ }
+
+ // An empty version in the context is always less than a defined version in the parameter.
+ if (string.IsNullOrWhiteSpace(sourceVersion))
+ {
+ return -1;
+ }
+
+ // An empty version in the parameter is always less than a defined version in the context.
+ if (string.IsNullOrWhiteSpace(otherVersion))
+ {
+ return 1;
+ }
+
+ // If both versions use the .NET Version class, return that comparison.
+ if (System.Version.TryParse(sourceVersion, out Version parsedSourceVersion) && System.Version.TryParse(otherVersion, out Version parsedOtherVersion))
+ {
+ return parsedSourceVersion.CompareTo(parsedOtherVersion);
+ }
+
+ // If we have gotten to here, we don't know the syntax of the versions we are comparing, use a string comparison as a final check.
+ return string.Compare(sourceVersion, otherVersion, StringComparison.OrdinalIgnoreCase);
+ }
+ }
+}
diff --git a/src/DurableTask.Core/TaskHubWorker.cs b/src/DurableTask.Core/TaskHubWorker.cs
index 24271dc78..629453645 100644
--- a/src/DurableTask.Core/TaskHubWorker.cs
+++ b/src/DurableTask.Core/TaskHubWorker.cs
@@ -13,6 +13,12 @@
namespace DurableTask.Core
{
+ using DurableTask.Core.Entities;
+ using DurableTask.Core.Exceptions;
+ using DurableTask.Core.Logging;
+ using DurableTask.Core.Middleware;
+ using DurableTask.Core.Settings;
+ using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
@@ -21,11 +27,6 @@ namespace DurableTask.Core
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
- using DurableTask.Core.Entities;
- using DurableTask.Core.Exceptions;
- using DurableTask.Core.Logging;
- using DurableTask.Core.Middleware;
- using Microsoft.Extensions.Logging;
///
/// Allows users to load the TaskOrchestration and TaskActivity classes and start
@@ -36,6 +37,7 @@ public sealed class TaskHubWorker : IDisposable
readonly INameVersionObjectManager activityManager;
readonly INameVersionObjectManager orchestrationManager;
readonly INameVersionObjectManager entityManager;
+ readonly VersioningSettings versioningSettings;
readonly DispatchMiddlewarePipeline orchestrationDispatchPipeline = new DispatchMiddlewarePipeline();
readonly DispatchMiddlewarePipeline entityDispatchPipeline = new DispatchMiddlewarePipeline();
@@ -86,6 +88,23 @@ public TaskHubWorker(IOrchestrationService orchestrationService, ILoggerFactory
{
}
+ ///
+ /// Create a new TaskHubWorker with given OrchestrationService
+ ///
+ /// Reference the orchestration service implementation
+ /// The that define how orchestration versions are handled
+ /// The to use for logging
+ public TaskHubWorker(IOrchestrationService orchestrationService, VersioningSettings versioningSettings, ILoggerFactory loggerFactory = null)
+ : this(
+ orchestrationService,
+ new NameVersionObjectManager(),
+ new NameVersionObjectManager(),
+ new NameVersionObjectManager(),
+ versioningSettings,
+ loggerFactory)
+ {
+ }
+
///
/// Create a new TaskHubWorker with given OrchestrationService and name version managers
///
@@ -101,7 +120,29 @@ public TaskHubWorker(
orchestrationObjectManager,
activityObjectManager,
new NameVersionObjectManager(),
- loggerFactory: null)
+ loggerFactory: null,
+ versioningSettings: null)
+ {
+ }
+
+ ///
+ /// Create a new with given and name version managers
+ ///
+ /// The orchestration service implementation
+ /// The for orchestrations
+ /// The for activities
+ /// The to use for logging
+ public TaskHubWorker(
+ IOrchestrationService orchestrationService,
+ INameVersionObjectManager orchestrationObjectManager,
+ INameVersionObjectManager activityObjectManager,
+ ILoggerFactory loggerFactory = null)
+ : this(
+ orchestrationService,
+ orchestrationObjectManager,
+ activityObjectManager,
+ new NameVersionObjectManager(),
+ loggerFactory)
{
}
@@ -111,17 +152,20 @@ public TaskHubWorker(
/// The orchestration service implementation
/// The for orchestrations
/// The for activities
+ /// The that define how orchestration versions are handled
/// The to use for logging
public TaskHubWorker(
IOrchestrationService orchestrationService,
INameVersionObjectManager orchestrationObjectManager,
INameVersionObjectManager activityObjectManager,
+ VersioningSettings versioningSettings,
ILoggerFactory loggerFactory = null)
: this(
orchestrationService,
orchestrationObjectManager,
activityObjectManager,
new NameVersionObjectManager(),
+ versioningSettings,
loggerFactory)
{
}
@@ -140,6 +184,32 @@ public TaskHubWorker(
INameVersionObjectManager activityObjectManager,
INameVersionObjectManager entityObjectManager,
ILoggerFactory loggerFactory = null)
+ : this(
+ orchestrationService,
+ orchestrationObjectManager,
+ activityObjectManager,
+ entityObjectManager,
+ null,
+ loggerFactory)
+ {
+ }
+
+ ///
+ /// Create a new TaskHubWorker with given OrchestrationService and name version managers
+ ///
+ /// Reference the orchestration service implementation
+ /// NameVersionObjectManager for Orchestrations
+ /// NameVersionObjectManager for Activities
+ /// The NameVersionObjectManager for entities. The version is the entity key.
+ /// The that define how orchestration versions are handled
+ /// The to use for logging
+ public TaskHubWorker(
+ IOrchestrationService orchestrationService,
+ INameVersionObjectManager orchestrationObjectManager,
+ INameVersionObjectManager activityObjectManager,
+ INameVersionObjectManager entityObjectManager,
+ VersioningSettings versioningSettings,
+ ILoggerFactory loggerFactory = null)
{
this.orchestrationManager = orchestrationObjectManager ?? throw new ArgumentException("orchestrationObjectManager");
this.activityManager = activityObjectManager ?? throw new ArgumentException("activityObjectManager");
@@ -147,6 +217,7 @@ public TaskHubWorker(
this.orchestrationService = orchestrationService ?? throw new ArgumentException("orchestrationService");
this.logHelper = new LogHelper(loggerFactory?.CreateLogger("DurableTask.Core"));
this.dispatchEntitiesSeparately = (orchestrationService as IEntityOrchestrationService)?.EntityBackendProperties?.UseSeparateQueueForEntityWorkItems ?? false;
+ this.versioningSettings = versioningSettings;
}
///
@@ -219,13 +290,13 @@ public async Task StartAsync()
this.logHelper.TaskHubWorkerStarting();
var sw = Stopwatch.StartNew();
-
this.orchestrationDispatcher = new TaskOrchestrationDispatcher(
this.orchestrationService,
this.orchestrationManager,
this.orchestrationDispatchPipeline,
this.logHelper,
- this.ErrorPropagationMode);
+ this.ErrorPropagationMode,
+ this.versioningSettings);
this.activityDispatcher = new TaskActivityDispatcher(
this.orchestrationService,
this.activityManager,
@@ -357,7 +428,7 @@ public TaskHubWorker AddTaskEntities(params Type[] taskEntityTypes)
type.Name,
string.Empty,
type);
-
+
this.entityManager.Add(creator);
}
diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
index 61864a1a5..d1cb03f87 100644
--- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
+++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
@@ -13,12 +13,6 @@
#nullable enable
namespace DurableTask.Core
{
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
using DurableTask.Core.Command;
using DurableTask.Core.Common;
using DurableTask.Core.Entities;
@@ -27,7 +21,14 @@ namespace DurableTask.Core
using DurableTask.Core.Logging;
using DurableTask.Core.Middleware;
using DurableTask.Core.Serializing;
+ using DurableTask.Core.Settings;
using DurableTask.Core.Tracing;
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
using ActivityStatusCode = Tracing.ActivityStatusCode;
///
@@ -47,13 +48,15 @@ public class TaskOrchestrationDispatcher
readonly IEntityOrchestrationService? entityOrchestrationService;
readonly EntityBackendProperties? entityBackendProperties;
readonly TaskOrchestrationEntityParameters? entityParameters;
+ readonly VersioningSettings? versioningSettings;
internal TaskOrchestrationDispatcher(
IOrchestrationService orchestrationService,
INameVersionObjectManager objectManager,
DispatchMiddlewarePipeline dispatchPipeline,
LogHelper logHelper,
- ErrorPropagationMode errorPropagationMode)
+ ErrorPropagationMode errorPropagationMode,
+ VersioningSettings versioningSettings)
{
this.objectManager = objectManager ?? throw new ArgumentNullException(nameof(objectManager));
this.orchestrationService = orchestrationService ?? throw new ArgumentNullException(nameof(orchestrationService));
@@ -63,6 +66,7 @@ internal TaskOrchestrationDispatcher(
this.entityOrchestrationService = orchestrationService as IEntityOrchestrationService;
this.entityBackendProperties = this.entityOrchestrationService?.EntityBackendProperties;
this.entityParameters = TaskOrchestrationEntityParameters.FromEntityBackendProperties(this.entityBackendProperties);
+ this.versioningSettings = versioningSettings;
this.dispatcher = new WorkItemDispatcher(
"TaskOrchestrationDispatcher",
@@ -155,7 +159,7 @@ void EnsureExecutionStartedIsFirst(IList batch)
{
// Keep track of orchestrator generation changes, maybe update target position
string executionId = message.OrchestrationInstance.ExecutionId;
- if(previousExecutionId != executionId)
+ if (previousExecutionId != executionId)
{
// We want to re-position the ExecutionStarted event after the "right-most"
// event with a non-null executionID that came before it.
@@ -217,7 +221,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
CorrelationTraceClient.Propagate(
() =>
- {
+ {
// Check if it is extended session.
// TODO: Remove this code - it looks incorrect and dangerous
isExtendedSession = this.concurrentSessionLock.Acquire();
@@ -305,7 +309,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
var isCompleted = false;
var continuedAsNew = false;
var isInterrupted = false;
-
+
// correlation
CorrelationTraceClient.Propagate(() => CorrelationTraceContext.Current = workItem.TraceContext);
@@ -363,6 +367,44 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
continuedAsNew = false;
continuedAsNewMessage = null;
+ IReadOnlyList decisions = new List();
+ bool versioningFailed = false;
+
+ if (this.versioningSettings != null)
+ {
+ switch (this.versioningSettings.MatchStrategy)
+ {
+ case VersioningSettings.VersionMatchStrategy.None:
+ // No versioning, do nothing
+ break;
+ case VersioningSettings.VersionMatchStrategy.Strict:
+ versioningFailed = this.versioningSettings.Version != runtimeState.Version;
+ break;
+ case VersioningSettings.VersionMatchStrategy.CurrentOrOlder:
+ // Positive result indicates the orchestration version is higher than the versioning settings.
+ versioningFailed = VersioningSettings.CompareVersions(runtimeState.Version, this.versioningSettings.Version) > 0;
+ break;
+ }
+
+ if (versioningFailed)
+ {
+ if (this.versioningSettings.FailureStrategy == VersioningSettings.VersionFailureStrategy.Fail)
+ {
+ var failureAction = new OrchestrationCompleteOrchestratorAction
+ {
+ Id = runtimeState.PastEvents.Count,
+ FailureDetails = new FailureDetails("VersionMismatch", "Orchestration version did not comply with Worker Versioning", null, null, true),
+ OrchestrationStatus = OrchestrationStatus.Failed,
+ };
+ decisions = new List { failureAction };
+ }
+ else // Abandon work item in all other cases (will be retried later).
+ {
+ await this.orchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
+ break;
+ }
+ }
+ }
this.logHelper.OrchestrationExecuting(runtimeState.OrchestrationInstance!, runtimeState.Name);
TraceHelper.TraceInstance(
@@ -372,17 +414,19 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
"Executing user orchestration: {0}",
JsonDataConverter.Default.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true));
- if (workItem.Cursor == null)
+ if (!versioningFailed)
{
- workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem);
- }
- else
- {
- await this.ResumeOrchestrationAsync(workItem);
+ if (workItem.Cursor == null)
+ {
+ workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem);
+ }
+ else
+ {
+ await this.ResumeOrchestrationAsync(workItem);
+ }
+ decisions = workItem.Cursor.LatestDecisions.ToList();
}
- IReadOnlyList decisions = workItem.Cursor.LatestDecisions.ToList();
-
this.logHelper.OrchestrationExecuted(
runtimeState.OrchestrationInstance!,
runtimeState.Name,
@@ -618,7 +662,7 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
continuedAsNew ? null : timerMessages,
continuedAsNewMessage,
instanceState);
-
+
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
workItem.OrchestrationRuntimeState = runtimeState;
@@ -1143,11 +1187,11 @@ TaskMessage ProcessSendEventDecision(
{
var historyEvent = new EventSentEvent(sendEventAction.Id)
{
- InstanceId = sendEventAction.Instance?.InstanceId,
- Name = sendEventAction.EventName,
- Input = sendEventAction.EventData
+ InstanceId = sendEventAction.Instance?.InstanceId,
+ Name = sendEventAction.EventName,
+ Input = sendEventAction.EventData
};
-
+
runtimeState.AddEvent(historyEvent);
EventRaisedEvent eventRaisedEvent = new EventRaisedEvent(-1, sendEventAction.EventData)
@@ -1169,7 +1213,7 @@ TaskMessage ProcessSendEventDecision(
Event = eventRaisedEvent
};
}
-
+
internal class NonBlockingCountdownLock
{
int available;
diff --git a/test/DurableTask.AzureStorage.Tests/AutoStartOrchestrationCreator.cs b/test/DurableTask.AzureStorage.Tests/AutoStartOrchestrationCreator.cs
index 9b62df9aa..617e01885 100644
--- a/test/DurableTask.AzureStorage.Tests/AutoStartOrchestrationCreator.cs
+++ b/test/DurableTask.AzureStorage.Tests/AutoStartOrchestrationCreator.cs
@@ -24,14 +24,16 @@ public class AutoStartOrchestrationCreator : ObjectCreator
{
readonly TaskOrchestration instance;
readonly Type prototype;
+ readonly string version;
///
/// Creates a new AutoStartOrchestrationCreator of supplied type
///
/// Type to use for the creator
- public AutoStartOrchestrationCreator(Type type)
+ public AutoStartOrchestrationCreator(Type type, string version = null)
{
this.prototype = type;
+ this.version = version;
Initialize(type);
}
@@ -39,9 +41,10 @@ public AutoStartOrchestrationCreator(Type type)
/// Creates a new AutoStartOrchestrationCreator using type of supplied object instance
///
/// Object instances to infer the type from
- public AutoStartOrchestrationCreator(TaskOrchestration instance)
+ public AutoStartOrchestrationCreator(TaskOrchestration instance, string version = null)
{
this.instance = instance;
+ this.version = version;
Initialize(instance);
}
@@ -59,7 +62,7 @@ public override TaskOrchestration Create()
void Initialize(object obj)
{
Name = $"@{NameVersionHelper.GetDefaultName(obj)}";
- Version = NameVersionHelper.GetDefaultVersion(obj);
+ Version = !string.IsNullOrEmpty(version) ? this.version : NameVersionHelper.GetDefaultVersion(obj);
}
}
}
\ No newline at end of file
diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
index 6b5593b25..c115c127d 100644
--- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
+++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
@@ -30,6 +30,7 @@ namespace DurableTask.AzureStorage.Tests
using DurableTask.Core;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
+ using DurableTask.Core.Settings;
using Microsoft.Practices.EnterpriseLibrary.SemanticLogging.Utility;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
@@ -522,7 +523,7 @@ await client.PurgeInstanceHistoryByTimePeriod(
List thirdHistoryEventsAfterPurging = await client.GetOrchestrationHistoryAsync(thirdInstanceId);
Assert.AreEqual(0, thirdHistoryEventsAfterPurging.Count);
- ListfourthHistoryEventsAfterPurging = await client.GetOrchestrationHistoryAsync(fourthInstanceId);
+ List fourthHistoryEventsAfterPurging = await client.GetOrchestrationHistoryAsync(fourthInstanceId);
Assert.AreEqual(0, fourthHistoryEventsAfterPurging.Count);
firstOrchestrationStateList = await client.GetStateAsync(firstInstanceId);
@@ -928,7 +929,7 @@ public async Task SuspendResumeOrchestration(bool enableExtendedSessions)
// Test case 3: external event now goes through
await client.ResumeAsync("wakeUp");
- status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10));
+ status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10));
Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
Assert.AreEqual(changedStatus, JToken.Parse(status?.Status));
@@ -1811,7 +1812,7 @@ public async Task LargeTextMessagePayloads_URIFormatCheck(bool enableExtendedSes
}
}
- private StringBuilder GenerateMediumRandomStringPayload(int numChars = 128*1024, short utf8ByteSize = 1, short utf16ByteSize = 2)
+ private StringBuilder GenerateMediumRandomStringPayload(int numChars = 128 * 1024, short utf8ByteSize = 1, short utf16ByteSize = 2)
{
string Chars;
if (utf16ByteSize != 2 && utf16ByteSize != 4)
@@ -2196,7 +2197,7 @@ public async Task AbortOrchestrationAndActivity(bool enableExtendedSessions)
}
}
- ///
+ ///
/// Validates scheduled starts, ensuring they are executed according to defined start date time
///
///
@@ -2411,6 +2412,87 @@ public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSession
}
}
+ [TestMethod]
+ [DataRow(VersioningSettings.VersionMatchStrategy.Strict)]
+ [DataRow(VersioningSettings.VersionMatchStrategy.CurrentOrOlder)]
+ [DataRow(VersioningSettings.VersionMatchStrategy.None)]
+ public async Task OrchestrationFailsWithVersionMismatch(VersioningSettings.VersionMatchStrategy matchStrategy)
+ {
+ using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(false, versioningSettings: new VersioningSettings
+ {
+ Version = "1",
+ MatchStrategy = matchStrategy,
+ FailureStrategy = VersioningSettings.VersionFailureStrategy.Fail
+ }))
+ {
+ await host.StartAsync();
+
+ var client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "World", tags: new Dictionary(), version: "2");
+ var status = await client.WaitForCompletionAsync(StandardTimeout);
+
+ if (matchStrategy == VersioningSettings.VersionMatchStrategy.None)
+ {
+ Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
+ }
+ else
+ {
+ Assert.AreEqual(OrchestrationStatus.Failed, status?.OrchestrationStatus);
+ }
+
+ await host.StopAsync();
+ }
+ }
+
+ [TestMethod]
+ [DataRow(VersioningSettings.VersionMatchStrategy.Strict, "1.0.0")]
+ [DataRow(VersioningSettings.VersionMatchStrategy.CurrentOrOlder, "1.0.0")]
+ [DataRow(VersioningSettings.VersionMatchStrategy.CurrentOrOlder, "0.9.0")]
+ [DataRow(VersioningSettings.VersionMatchStrategy.None, "1.0.0")]
+ public async Task OrchestrationSucceedsWithVersion(VersioningSettings.VersionMatchStrategy matchStrategy, string orchestrationVersion)
+ {
+ using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(false, versioningSettings: new VersioningSettings
+ {
+ Version = "1.0.0",
+ MatchStrategy = matchStrategy,
+ FailureStrategy = VersioningSettings.VersionFailureStrategy.Fail
+ }))
+ {
+ await host.StartAsync();
+
+ var client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "World", tags: new Dictionary(), version: orchestrationVersion);
+ var status = await client.WaitForCompletionAsync(StandardTimeout);
+
+ Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
+
+ await host.StopAsync();
+ }
+ }
+
+ [TestMethod]
+ public async Task OrchestrationRejectsWithVersionMismatch()
+ {
+ using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(false, versioningSettings: new VersioningSettings
+ {
+ Version = "1",
+ MatchStrategy = VersioningSettings.VersionMatchStrategy.Strict,
+ FailureStrategy = VersioningSettings.VersionFailureStrategy.Reject
+ }))
+ {
+ await host.StartAsync();
+
+ var client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "World", tags: new Dictionary(), version: "2");
+ // We intend for this to timeout as the work should be getting rejected.
+ var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10));
+ Assert.IsNull(status);
+
+ // We should either be pending (recently rejected) or running (to be rejected).
+ status = await client.GetStatusAsync();
+ Assert.IsTrue(OrchestrationStatus.Running == status?.OrchestrationStatus || OrchestrationStatus.Pending == status?.OrchestrationStatus);
+
+ await host.StopAsync();
+ }
+ }
+
#if !NET462
///
/// End-to-end test which validates a simple orchestrator function that calls an activity function
diff --git a/test/DurableTask.AzureStorage.Tests/TestHelpers.cs b/test/DurableTask.AzureStorage.Tests/TestHelpers.cs
index 80bd79e0d..4264c1e92 100644
--- a/test/DurableTask.AzureStorage.Tests/TestHelpers.cs
+++ b/test/DurableTask.AzureStorage.Tests/TestHelpers.cs
@@ -13,13 +13,14 @@
#nullable enable
namespace DurableTask.AzureStorage.Tests
{
+ using DurableTask.Core.Logging;
+ using DurableTask.Core.Settings;
+ using Microsoft.Extensions.Logging;
using System;
using System.Configuration;
using System.Diagnostics;
using System.Diagnostics.Tracing;
using System.Threading.Tasks;
- using DurableTask.Core.Logging;
- using Microsoft.Extensions.Logging;
static class TestHelpers
{
@@ -28,6 +29,7 @@ public static TestOrchestrationHost GetTestOrchestrationHost(
int extendedSessionTimeoutInSeconds = 30,
bool fetchLargeMessages = true,
bool allowReplayingTerminalInstances = false,
+ VersioningSettings? versioningSettings = null,
Action? modifySettingsAction = null)
{
AzureStorageOrchestrationServiceSettings settings = GetTestAzureStorageOrchestrationServiceSettings(
@@ -38,7 +40,7 @@ public static TestOrchestrationHost GetTestOrchestrationHost(
// Give the caller a chance to make test-specific changes to the settings
modifySettingsAction?.Invoke(settings);
- return new TestOrchestrationHost(settings);
+ return new TestOrchestrationHost(settings, versioningSettings);
}
public static AzureStorageOrchestrationServiceSettings GetTestAzureStorageOrchestrationServiceSettings(
diff --git a/test/DurableTask.AzureStorage.Tests/TestOrchestrationHost.cs b/test/DurableTask.AzureStorage.Tests/TestOrchestrationHost.cs
index b88e0dfcf..f5e630b6f 100644
--- a/test/DurableTask.AzureStorage.Tests/TestOrchestrationHost.cs
+++ b/test/DurableTask.AzureStorage.Tests/TestOrchestrationHost.cs
@@ -13,33 +13,36 @@
namespace DurableTask.AzureStorage.Tests
{
+ using DurableTask.Core;
+ using DurableTask.Core.Settings;
+ using Microsoft.Extensions.Logging;
+ using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Serialization;
using System.Threading.Tasks;
- using DurableTask.Core;
- using Microsoft.Extensions.Logging;
- using Microsoft.VisualStudio.TestTools.UnitTesting;
internal sealed class TestOrchestrationHost : IDisposable
{
internal readonly AzureStorageOrchestrationService service;
readonly AzureStorageOrchestrationServiceSettings settings;
- readonly TaskHubWorker worker;
readonly TaskHubClient client;
readonly HashSet addedOrchestrationTypes;
readonly HashSet addedActivityTypes;
- public TestOrchestrationHost(AzureStorageOrchestrationServiceSettings settings)
+ // We allow updates to the worker for versioning tests.
+ TaskHubWorker worker;
+
+ public TestOrchestrationHost(AzureStorageOrchestrationServiceSettings settings, VersioningSettings versioningSettings = null)
{
this.service = new AzureStorageOrchestrationService(settings);
this.service.CreateAsync().GetAwaiter().GetResult();
this.settings = settings;
- this.worker = new TaskHubWorker(service, loggerFactory: settings.LoggerFactory);
+ this.worker = new TaskHubWorker(service, loggerFactory: settings.LoggerFactory, versioningSettings: versioningSettings);
this.client = new TaskHubClient(service, loggerFactory: settings.LoggerFactory);
this.addedOrchestrationTypes = new HashSet();
this.addedActivityTypes = new HashSet();
@@ -62,6 +65,14 @@ public Task StopAsync()
return this.worker.StopAsync(isForced: true);
}
+ public async Task UpdateWorkerVersion(VersioningSettings versioningSettings)
+ {
+ // Stop the current worker and create a new one with the new versioning settings.
+ await this.worker.StopAsync();
+ this.worker = new TaskHubWorker(this.service, loggerFactory: this.settings.LoggerFactory, versioningSettings: versioningSettings);
+ await this.worker.StartAsync();
+ }
+
public void AddAutoStartOrchestrator(Type type)
{
this.worker.AddTaskOrchestrations(new AutoStartOrchestrationCreator(type));
@@ -73,7 +84,8 @@ public async Task StartOrchestrationAsync(
object input,
string instanceId = null,
DateTime? startAt = null,
- IDictionary tags = null)
+ IDictionary tags = null,
+ string version = null)
{
if (startAt != null && tags != null)
{
@@ -82,7 +94,17 @@ public async Task StartOrchestrationAsync(
if (!this.addedOrchestrationTypes.Contains(orchestrationType))
{
- this.worker.AddTaskOrchestrations(orchestrationType);
+ if (version != null)
+ {
+ this.worker.AddTaskOrchestrations(new NameValueObjectCreator(
+ NameVersionHelper.GetDefaultName(orchestrationType),
+ version,
+ orchestrationType));
+ }
+ else
+ {
+ this.worker.AddTaskOrchestrations(orchestrationType);
+ }
this.addedOrchestrationTypes.Add(orchestrationType);
}
@@ -110,11 +132,12 @@ public async Task StartOrchestrationAsync(
DateTime creationTime = DateTime.UtcNow;
OrchestrationInstance instance;
+ string orchestrationVersion = !string.IsNullOrEmpty(version) ? version : NameVersionHelper.GetDefaultVersion(orchestrationType);
if (tags != null)
{
instance = await this.client.CreateOrchestrationInstanceAsync(
NameVersionHelper.GetDefaultName(orchestrationType),
- NameVersionHelper.GetDefaultVersion(orchestrationType),
+ orchestrationVersion,
instanceId,
input,
tags);
@@ -315,7 +338,7 @@ class TestObjectCreator : ObjectCreator
{
readonly T obj;
- public TestObjectCreator(string name, T obj)
+ public TestObjectCreator(string name, T obj)
: this(name, string.Empty, obj)
{
}