Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions Test/DurableTask.Core.Tests/VersionSettingsTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

using DurableTask.Core.Settings;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace DurableTask.Core.Tests
{
[TestClass]
public class VersionSettingsTests
{
[TestMethod]
[DataRow("1.0.0", "1.0.0", 0)]
[DataRow("1.1.0", "1.0.0", 1)]
[DataRow("1.0.0", "1.1.0", -1)]
[DataRow("1", "1", 0)]
[DataRow("2", "1", 1)]
[DataRow("1", "2", -1)]
[DataRow("", "1", -1)]
[DataRow("1", "", 1)]
[DataRow("", "", 0)]
public void TestVersionComparison(string orchVersion, string settingVersion, int expectedComparison)
{
int result = VersioningSettings.CompareVersions(orchVersion, settingVersion);

if (expectedComparison == 0)
{
Assert.AreEqual(0, result, $"Expected {orchVersion} to be equal to {settingVersion}");
}
else if (expectedComparison < 0)
{
Assert.IsTrue(result < 0, $"Expected {orchVersion} to be less than {settingVersion}");
}
else
{
Assert.IsTrue(result > 0, $"Expected {orchVersion} to be greater than {settingVersion}");
}
}
}
}
119 changes: 119 additions & 0 deletions src/DurableTask.Core/Settings/VersioningSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

using System;

namespace DurableTask.Core.Settings
{
/// <summary>
/// Collection of settings that define the overall versioning behavior.
/// </summary>
public class VersioningSettings
{
/// <summary>
/// Defines the version matching strategy for the Durable Task worker.
/// </summary>
public enum VersionMatchStrategy
{
/// <summary>
/// Ignore Orchestration version, all work received is processed.
/// </summary>
None = 0,

/// <summary>
/// Worker will only process Tasks from Orchestrations with the same version as the worker.
/// </summary>
Strict = 1,

/// <summary>
/// Worker will process Tasks from Orchestrations whose version is less than or equal to the worker.
/// </summary>
CurrentOrOlder = 2,
}

/// <summary>
/// Defines the versioning failure strategy for the Durable Task worker.
/// </summary>
public enum VersionFailureStrategy
{
/// <summary>
/// Do not change the orchestration state if the version does not adhere to the matching strategy.
/// </summary>
Reject = 0,

/// <summary>
/// Fail the orchestration if the version does not adhere to the matching strategy.
/// </summary>
Fail = 1,
}

/// <summary>
/// Gets or sets the version associated with the settings.
/// </summary>
public string Version { get; set; } = string.Empty;

/// <summary>
/// Gets or sets the <see cref="VersionMatchStrategy"/> that is used for matching versions.
/// </summary>
public VersionMatchStrategy MatchStrategy { get; set; } = VersionMatchStrategy.None;

/// <summary>
/// Gets or sets the <see cref="VersionFailureStrategy"/> that is used to determine what happens on a versioning failure.
/// </summary>
public VersionFailureStrategy FailureStrategy { get; set; } = VersionFailureStrategy.Reject;

/// <summary>
/// Compare two versions to each other.
/// </summary>
/// <remarks>
/// This method's comparison is handled in the following order:
/// 1. The versions are checked if they are empty (non-versioned). Both being empty signifies equality.
/// 2. If sourceVersion is empty but otherVersion is defined, this is treated as the source being less than the other.
/// 3. If otherVersion is empty but sourceVersion is defined, this is treated as the source being greater than the other.
/// 4. Both versions are attempted to be parsed into System.Version and compared as such.
/// 5. If all else fails, a direct string comparison is done between the versions.
/// </remarks>
/// <param name="sourceVersion">The source version that will be compared against the other version.</param>
/// <param name="otherVersion">The other version to compare against.</param>
/// <returns>An int representing how sourceVersion compares to otherVersion.</returns>
public static int CompareVersions(string sourceVersion, string otherVersion)
{
// Both versions are empty, treat as equal.
if (string.IsNullOrWhiteSpace(sourceVersion) && string.IsNullOrWhiteSpace(otherVersion))
{
return 0;
}

// An empty version in the context is always less than a defined version in the parameter.
if (string.IsNullOrWhiteSpace(sourceVersion))
{
return -1;
}

// An empty version in the parameter is always less than a defined version in the context.
if (string.IsNullOrWhiteSpace(otherVersion))
{
return 1;
}

// If both versions use the .NET Version class, return that comparison.
if (System.Version.TryParse(sourceVersion, out Version parsedSourceVersion) && System.Version.TryParse(otherVersion, out Version parsedOtherVersion))
{
return parsedSourceVersion.CompareTo(parsedOtherVersion);
}

// If we have gotten to here, we don't know the syntax of the versions we are comparing, use a string comparison as a final check.
return string.Compare(sourceVersion, otherVersion, StringComparison.OrdinalIgnoreCase);
}
}
}
89 changes: 80 additions & 9 deletions src/DurableTask.Core/TaskHubWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@

namespace DurableTask.Core
{
using DurableTask.Core.Entities;
using DurableTask.Core.Exceptions;
using DurableTask.Core.Logging;
using DurableTask.Core.Middleware;
using DurableTask.Core.Settings;
using Microsoft.Extensions.Logging;
Comment thread
halspang marked this conversation as resolved.
using System;
using System.Collections.Generic;
using System.Diagnostics;
Expand All @@ -21,11 +27,6 @@ namespace DurableTask.Core
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core.Entities;
using DurableTask.Core.Exceptions;
using DurableTask.Core.Logging;
using DurableTask.Core.Middleware;
using Microsoft.Extensions.Logging;

/// <summary>
/// Allows users to load the TaskOrchestration and TaskActivity classes and start
Expand All @@ -36,6 +37,7 @@ public sealed class TaskHubWorker : IDisposable
readonly INameVersionObjectManager<TaskActivity> activityManager;
readonly INameVersionObjectManager<TaskOrchestration> orchestrationManager;
readonly INameVersionObjectManager<TaskEntity> entityManager;
readonly VersioningSettings versioningSettings;

readonly DispatchMiddlewarePipeline orchestrationDispatchPipeline = new DispatchMiddlewarePipeline();
readonly DispatchMiddlewarePipeline entityDispatchPipeline = new DispatchMiddlewarePipeline();
Expand Down Expand Up @@ -86,6 +88,23 @@ public TaskHubWorker(IOrchestrationService orchestrationService, ILoggerFactory
{
}

/// <summary>
/// Create a new TaskHubWorker with given OrchestrationService
/// </summary>
/// <param name="orchestrationService">Reference the orchestration service implementation</param>
/// <param name="versioningSettings">The <see cref="VersioningSettings"/> that define how orchestration versions are handled</param>
/// <param name="loggerFactory">The <see cref="ILoggerFactory"/> to use for logging</param>
public TaskHubWorker(IOrchestrationService orchestrationService, VersioningSettings versioningSettings, ILoggerFactory loggerFactory = null)
: this(
orchestrationService,
new NameVersionObjectManager<TaskOrchestration>(),
new NameVersionObjectManager<TaskActivity>(),
new NameVersionObjectManager<TaskEntity>(),
versioningSettings,
loggerFactory)
{
}

/// <summary>
/// Create a new TaskHubWorker with given OrchestrationService and name version managers
/// </summary>
Expand All @@ -101,7 +120,29 @@ public TaskHubWorker(
orchestrationObjectManager,
activityObjectManager,
new NameVersionObjectManager<TaskEntity>(),
loggerFactory: null)
loggerFactory: null,
versioningSettings: null)
{
}

/// <summary>
/// Create a new <see cref="TaskHubWorker"/> with given <see cref="IOrchestrationService"/> and name version managers
/// </summary>
/// <param name="orchestrationService">The orchestration service implementation</param>
/// <param name="orchestrationObjectManager">The <see cref="INameVersionObjectManager{TaskOrchestration}"/> for orchestrations</param>
/// <param name="activityObjectManager">The <see cref="INameVersionObjectManager{TaskActivity}"/> for activities</param>
/// <param name="loggerFactory">The <see cref="ILoggerFactory"/> to use for logging</param>
public TaskHubWorker(
IOrchestrationService orchestrationService,
INameVersionObjectManager<TaskOrchestration> orchestrationObjectManager,
INameVersionObjectManager<TaskActivity> activityObjectManager,
ILoggerFactory loggerFactory = null)
: this(
orchestrationService,
orchestrationObjectManager,
activityObjectManager,
new NameVersionObjectManager<TaskEntity>(),
loggerFactory)
{
}

Expand All @@ -111,17 +152,20 @@ public TaskHubWorker(
/// <param name="orchestrationService">The orchestration service implementation</param>
/// <param name="orchestrationObjectManager">The <see cref="INameVersionObjectManager{TaskOrchestration}"/> for orchestrations</param>
/// <param name="activityObjectManager">The <see cref="INameVersionObjectManager{TaskActivity}"/> for activities</param>
/// <param name="versioningSettings">The <see cref="VersioningSettings"/> that define how orchestration versions are handled</param>
/// <param name="loggerFactory">The <see cref="ILoggerFactory"/> to use for logging</param>
public TaskHubWorker(
IOrchestrationService orchestrationService,
INameVersionObjectManager<TaskOrchestration> orchestrationObjectManager,
INameVersionObjectManager<TaskActivity> activityObjectManager,
VersioningSettings versioningSettings,
ILoggerFactory loggerFactory = null)
: this(
orchestrationService,
orchestrationObjectManager,
activityObjectManager,
new NameVersionObjectManager<TaskEntity>(),
versioningSettings,
loggerFactory)
{
}
Expand All @@ -140,13 +184,40 @@ public TaskHubWorker(
INameVersionObjectManager<TaskActivity> activityObjectManager,
INameVersionObjectManager<TaskEntity> entityObjectManager,
ILoggerFactory loggerFactory = null)
: this(
orchestrationService,
orchestrationObjectManager,
activityObjectManager,
entityObjectManager,
null,
loggerFactory)
{
}

/// <summary>
/// Create a new TaskHubWorker with given OrchestrationService and name version managers
/// </summary>
/// <param name="orchestrationService">Reference the orchestration service implementation</param>
/// <param name="orchestrationObjectManager">NameVersionObjectManager for Orchestrations</param>
/// <param name="activityObjectManager">NameVersionObjectManager for Activities</param>
/// <param name="entityObjectManager">The NameVersionObjectManager for entities. The version is the entity key.</param>
/// <param name="versioningSettings">The <see cref="VersioningSettings"/> that define how orchestration versions are handled</param>
/// <param name="loggerFactory">The <see cref="ILoggerFactory"/> to use for logging</param>
public TaskHubWorker(
IOrchestrationService orchestrationService,
INameVersionObjectManager<TaskOrchestration> orchestrationObjectManager,
INameVersionObjectManager<TaskActivity> activityObjectManager,
INameVersionObjectManager<TaskEntity> entityObjectManager,
VersioningSettings versioningSettings,
ILoggerFactory loggerFactory = null)
{
this.orchestrationManager = orchestrationObjectManager ?? throw new ArgumentException("orchestrationObjectManager");
this.activityManager = activityObjectManager ?? throw new ArgumentException("activityObjectManager");
this.entityManager = entityObjectManager ?? throw new ArgumentException("entityObjectManager");
this.orchestrationService = orchestrationService ?? throw new ArgumentException("orchestrationService");
this.logHelper = new LogHelper(loggerFactory?.CreateLogger("DurableTask.Core"));
this.dispatchEntitiesSeparately = (orchestrationService as IEntityOrchestrationService)?.EntityBackendProperties?.UseSeparateQueueForEntityWorkItems ?? false;
this.versioningSettings = versioningSettings;
}

/// <summary>
Expand Down Expand Up @@ -219,13 +290,13 @@ public async Task<TaskHubWorker> StartAsync()

this.logHelper.TaskHubWorkerStarting();
var sw = Stopwatch.StartNew();

this.orchestrationDispatcher = new TaskOrchestrationDispatcher(
this.orchestrationService,
this.orchestrationManager,
this.orchestrationDispatchPipeline,
this.logHelper,
this.ErrorPropagationMode);
this.ErrorPropagationMode,
this.versioningSettings);
this.activityDispatcher = new TaskActivityDispatcher(
this.orchestrationService,
this.activityManager,
Expand Down Expand Up @@ -357,7 +428,7 @@ public TaskHubWorker AddTaskEntities(params Type[] taskEntityTypes)
type.Name,
string.Empty,
type);

this.entityManager.Add(creator);
}

Expand Down
Loading
Loading