From 1657866c16fff738ae46076a2d66cdcd3f401397 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 27 Mar 2025 19:58:58 -0700 Subject: [PATCH 01/15] first commit! --- src/Client/Grpc/GrpcDurableEntityClient.cs | 14 +- .../ShimDurableEntityClient.cs | 7 +- src/Grpc/orchestrator_service.proto | 22 +- src/Grpc/versions.txt | 4 +- src/Shared/Grpc/ProtoUtils.cs | 55 +- src/Worker/Core/Shims/TaskEntityShim.cs | 35 +- .../Shims/TaskOrchestrationEntityContext.cs | 484 +++++++++--------- 7 files changed, 348 insertions(+), 273 deletions(-) diff --git a/src/Client/Grpc/GrpcDurableEntityClient.cs b/src/Client/Grpc/GrpcDurableEntityClient.cs index b5e3371d0..4cce9c981 100644 --- a/src/Client/Grpc/GrpcDurableEntityClient.cs +++ b/src/Client/Grpc/GrpcDurableEntityClient.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Diagnostics; using Microsoft.DurableTask.Client.Entities; using Microsoft.DurableTask.Entities; using Microsoft.Extensions.Logging; @@ -55,7 +56,18 @@ public override async Task SignalEntityAsync( Name = operationName, Input = this.dataConverter.Serialize(input), ScheduledTime = scheduledTime?.ToTimestamp(), - }; + }; + + if (Activity.Current?.Id != null) + { + if (request.ParentTraceContext == null) + { + request.ParentTraceContext = new P.TraceContext(); + } + + request.ParentTraceContext.TraceParent = Activity.Current.Id; + request.ParentTraceContext.TraceState = Activity.Current.TraceStateString; + } // TODO this.logger.LogSomething try diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs index 10571e376..2926f3e0c 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs @@ -1,8 +1,10 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Diagnostics; using DurableTask.Core; -using DurableTask.Core.Entities; +using DurableTask.Core.Entities; +using DurableTask.Core.Tracing; using Microsoft.DurableTask.Client.Entities; using Microsoft.DurableTask.Entities; @@ -90,7 +92,8 @@ public override async Task SignalEntityAsync( EntityMessageEvent.GetCappedScheduledTime( DateTime.UtcNow, this.options.Entities.MaxSignalDelayTimeOrDefault, - scheduledTime?.UtcDateTime)); + scheduledTime?.UtcDateTime), + Activity.Current?.Id != null ? new DistributedTraceContext(Activity.Current.Id, Activity.Current.TraceStateString) : null); await this.options.Client!.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage()); } diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index 0fa6b6595..a7f8217c1 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -75,6 +75,7 @@ message ExecutionStartedEvent { google.protobuf.Timestamp scheduledStartTimestamp = 6; TraceContext parentTraceContext = 7; google.protobuf.StringValue orchestrationSpanID = 8; + map tags = 9; } message ExecutionCompletedEvent { @@ -343,14 +344,8 @@ message CreateInstanceRequest { } message OrchestrationIdReusePolicy { - repeated OrchestrationStatus operationStatus = 1; - CreateOrchestrationAction action = 2; -} - -enum CreateOrchestrationAction { - ERROR = 0; - IGNORE = 1; - TERMINATE = 2; + repeated OrchestrationStatus replaceableStatus = 1; + reserved 2; } message CreateInstanceResponse { @@ -391,6 +386,7 @@ message OrchestrationState { google.protobuf.StringValue executionId = 12; google.protobuf.Timestamp completedTimestamp = 13; google.protobuf.StringValue parentInstanceId = 14; + map tags = 15; } message RaiseEventRequest { @@ -491,6 +487,7 @@ message SignalEntityRequest { google.protobuf.StringValue input = 3; string requestId = 4; google.protobuf.Timestamp scheduledTime = 5; + TraceContext parentTraceContext = 6; } message SignalEntityResponse { @@ -576,6 +573,7 @@ message OperationRequest { string operation = 1; string requestId = 2; google.protobuf.StringValue input = 3; + TraceContext TraceContext = 4; } message OperationResult { @@ -592,10 +590,12 @@ message OperationInfo { message OperationResultSuccess { google.protobuf.StringValue result = 1; + google.protobuf.Timestamp endTime = 2; } message OperationResultFailure { TaskFailureDetails failureDetails = 1; + google.protobuf.Timestamp endTime = 2; } message OperationAction { @@ -611,6 +611,8 @@ message SendSignalAction { string name = 2; google.protobuf.StringValue input = 3; google.protobuf.Timestamp scheduledTime = 4; + google.protobuf.Timestamp requestTime = 5; + TraceContext parentTraceContext = 6; } message StartNewOrchestrationAction { @@ -619,6 +621,8 @@ message StartNewOrchestrationAction { google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; google.protobuf.Timestamp scheduledTime = 5; + google.protobuf.Timestamp requestTime = 6; + TraceContext parentTraceContext = 7; } service TaskHubSidecarService { @@ -731,4 +735,4 @@ message StreamInstanceHistoryRequest { message HistoryChunk { repeated HistoryEvent events = 1; -} \ No newline at end of file +} diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index ca514f29a..2b7d4a593 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-02-19 06:25:02 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/589cb5ecd9dd4b1fe463750defa3e2c84276b079/protos/orchestrator_service.proto +# The following files were downloaded from branch stevosyan/distributed-tracing-for-entities at 2025-03-24 23:02:20 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/36e201ae2972fe86d42c19c7bc463c52243e39d9/protos/orchestrator_service.proto diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index c3c0e45f7..8bd61e599 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -10,7 +10,8 @@ using DurableTask.Core.Command; using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; -using DurableTask.Core.History; +using DurableTask.Core.History; +using DurableTask.Core.Tracing; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using DTCore = DurableTask.Core; @@ -587,7 +588,11 @@ internal static void ToEntityBatchRequest( { Operation = operationRequest.Operation, Input = operationRequest.Input, - Id = Guid.Parse(operationRequest.RequestId), + Id = Guid.Parse(operationRequest.RequestId), + TraceContext = operationRequest.TraceContext != null ? + new DistributedTraceContext( + operationRequest.TraceContext.TraceParent, + operationRequest.TraceContext.TraceState) : null, }; } @@ -609,13 +614,15 @@ internal static void ToEntityBatchRequest( case P.OperationResult.ResultTypeOneofCase.Success: return new OperationResult() { - Result = operationResult.Success.Result, + Result = operationResult.Success.Result, + EndTime = operationResult.Success.EndTime?.ToDateTime(), }; case P.OperationResult.ResultTypeOneofCase.Failure: return new OperationResult() { - FailureDetails = operationResult.Failure.FailureDetails.ToCore(), + FailureDetails = operationResult.Failure.FailureDetails.ToCore(), + EndTime = operationResult.Failure.EndTime?.ToDateTime(), }; default: @@ -642,7 +649,8 @@ internal static void ToEntityBatchRequest( { Success = new P.OperationResultSuccess() { - Result = operationResult.Result, + Result = operationResult.Result, + EndTime = operationResult.EndTime?.ToTimestamp(), }, }; } @@ -652,7 +660,8 @@ internal static void ToEntityBatchRequest( { Failure = new P.OperationResultFailure() { - FailureDetails = ToProtobuf(operationResult.FailureDetails), + FailureDetails = ToProtobuf(operationResult.FailureDetails), + EndTime = operationResult.EndTime?.ToTimestamp(), }, }; } @@ -680,7 +689,12 @@ internal static void ToEntityBatchRequest( Name = operationAction.SendSignal.Name, Input = operationAction.SendSignal.Input, InstanceId = operationAction.SendSignal.InstanceId, - ScheduledTime = operationAction.SendSignal.ScheduledTime?.ToDateTime(), + ScheduledTime = operationAction.SendSignal.ScheduledTime?.ToDateTime(), + RequestTime = operationAction.SendSignal.RequestTime?.ToDateTime(), + ParentTraceContext = operationAction.SendSignal.ParentTraceContext != null ? + new DistributedTraceContext( + operationAction.SendSignal.ParentTraceContext.TraceParent, + operationAction.SendSignal.ParentTraceContext.TraceState) : null, }; case P.OperationAction.OperationActionTypeOneofCase.StartNewOrchestration: @@ -691,7 +705,12 @@ internal static void ToEntityBatchRequest( Input = operationAction.StartNewOrchestration.Input, InstanceId = operationAction.StartNewOrchestration.InstanceId, Version = operationAction.StartNewOrchestration.Version, - ScheduledStartTime = operationAction.StartNewOrchestration.ScheduledTime?.ToDateTime(), + ScheduledStartTime = operationAction.StartNewOrchestration.ScheduledTime?.ToDateTime(), + RequestTime = operationAction.StartNewOrchestration.RequestTime?.ToDateTime(), + ParentTraceContext = operationAction.StartNewOrchestration.ParentTraceContext != null ? + new DistributedTraceContext( + operationAction.StartNewOrchestration.ParentTraceContext.TraceParent, + operationAction.StartNewOrchestration.ParentTraceContext.TraceState) : null, }; default: throw new NotSupportedException($"Deserialization of {operationAction.OperationActionTypeCase} is not supported."); @@ -722,7 +741,15 @@ internal static void ToEntityBatchRequest( Name = sendSignalAction.Name, Input = sendSignalAction.Input, InstanceId = sendSignalAction.InstanceId, - ScheduledTime = sendSignalAction.ScheduledTime?.ToTimestamp(), + ScheduledTime = sendSignalAction.ScheduledTime?.ToTimestamp(), + RequestTime = sendSignalAction.RequestTime?.ToTimestamp(), + ParentTraceContext = sendSignalAction.ParentTraceContext != null ? + new P.TraceContext + { + TraceParent = sendSignalAction.ParentTraceContext.TraceParent, + TraceState = sendSignalAction.ParentTraceContext.TraceState, + } + : null, }; break; @@ -734,7 +761,15 @@ internal static void ToEntityBatchRequest( Input = startNewOrchestrationAction.Input, Version = startNewOrchestrationAction.Version, InstanceId = startNewOrchestrationAction.InstanceId, - ScheduledTime = startNewOrchestrationAction.ScheduledStartTime?.ToTimestamp(), + ScheduledTime = startNewOrchestrationAction.ScheduledStartTime?.ToTimestamp(), + RequestTime = startNewOrchestrationAction.RequestTime?.ToTimestamp(), + ParentTraceContext = startNewOrchestrationAction.ParentTraceContext != null ? + new P.TraceContext + { + TraceParent = startNewOrchestrationAction.ParentTraceContext.TraceParent, + TraceState = startNewOrchestrationAction.ParentTraceContext.TraceState, + } + : null, }; break; } diff --git a/src/Worker/Core/Shims/TaskEntityShim.cs b/src/Worker/Core/Shims/TaskEntityShim.cs index 800b42bb6..74507df4e 100644 --- a/src/Worker/Core/Shims/TaskEntityShim.cs +++ b/src/Worker/Core/Shims/TaskEntityShim.cs @@ -4,6 +4,7 @@ using DurableTask.Core; using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; +using DurableTask.Core.Tracing; using Microsoft.DurableTask.Entities; using Microsoft.Extensions.Logging; using DTCore = DurableTask.Core; @@ -58,13 +59,18 @@ public override async Task ExecuteOperationBatchAsync(EntityB foreach (OperationRequest current in operations.Operations!) { - this.operation.SetNameAndInput(current.Operation!, current.Input); + this.operation.SetNameAndInput(current.Operation!, current.Input); + this.context.ParentTraceContext = current.TraceContext; try - { + { object? result = await this.taskEntity.RunAsync(this.operation); string? serializedResult = this.dataConverter.Serialize(result); - results.Add(new OperationResult() { Result = serializedResult }); + results.Add(new OperationResult() + { + Result = serializedResult, + EndTime = DateTime.UtcNow, + }); // the user code completed without exception, so we commit the current state and actions. this.state.Commit(); @@ -75,7 +81,8 @@ public override async Task ExecuteOperationBatchAsync(EntityB this.logger.OperationError(applicationException, this.entityId, current.Operation!); results.Add(new OperationResult() { - FailureDetails = new FailureDetails(applicationException), + FailureDetails = new FailureDetails(applicationException), + EndTime = DateTime.UtcNow, }); // the user code threw an unhandled exception, so we roll back the state and the actions. @@ -169,7 +176,9 @@ class ContextShim : TaskEntityContext readonly DataConverter dataConverter; List operationActions; - int checkpointPosition; + int checkpointPosition; + + DistributedTraceContext? parentTraceContext; public ContextShim(EntityInstanceId entityInstanceId, DataConverter dataConverter) { @@ -182,7 +191,13 @@ public ContextShim(EntityInstanceId entityInstanceId, DataConverter dataConverte public int CurrentPosition => this.operationActions.Count; - public override EntityInstanceId Id => this.entityInstanceId; + public override EntityInstanceId Id => this.entityInstanceId; + + public DistributedTraceContext? ParentTraceContext + { + get => this.parentTraceContext; + set => this.parentTraceContext = value; + } public void Commit() { @@ -209,7 +224,9 @@ public override void SignalEntity(EntityInstanceId id, string operationName, obj InstanceId = id.ToString(), Name = operationName, Input = this.dataConverter.Serialize(input), - ScheduledTime = options?.SignalTime?.UtcDateTime, + ScheduledTime = options?.SignalTime?.UtcDateTime, + RequestTime = DateTime.UtcNow, + ParentTraceContext = this.parentTraceContext, }); } @@ -224,7 +241,9 @@ public override string ScheduleNewOrchestration(TaskName name, object? input = n Version = name.Version, InstanceId = instanceId, Input = this.dataConverter.Serialize(input), - ScheduledStartTime = options?.StartAt?.UtcDateTime, + ScheduledStartTime = options?.StartAt?.UtcDateTime, + RequestTime = DateTime.UtcNow, + ParentTraceContext = this.parentTraceContext, }); return instanceId; } diff --git a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs index 35ad033ca..347a5731b 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs @@ -1,241 +1,243 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System.Diagnostics.CodeAnalysis; -using System.Globalization; -using System.Security.Cryptography; -using System.Text; -using DurableTask.Core; -using DurableTask.Core.Entities; -using DurableTask.Core.Entities.OperationFormat; -using Microsoft.DurableTask.Entities; -using Microsoft.Extensions.Logging; -using DurableTaskCore = DurableTask.Core; - -namespace Microsoft.DurableTask.Worker.Shims; - -/// -/// A wrapper to go from to . -/// -sealed partial class TaskOrchestrationContextWrapper -{ - /// - /// A wrapper to go from to . - /// - sealed class TaskOrchestrationEntityContext : TaskOrchestrationEntityFeature - { - static readonly List EmptyEntityList = new(); - - readonly TaskOrchestrationContextWrapper wrapper; - - /// - /// Initializes a new instance of the class. - /// - /// The wrapper for the orchestration context. - public TaskOrchestrationEntityContext(TaskOrchestrationContextWrapper taskOrchestrationContextWrapper) - { - this.wrapper = taskOrchestrationContextWrapper; - this.EntityContext = new OrchestrationEntityContext(this.wrapper.InstanceId, this.wrapper.innerContext.OrchestrationInstance.ExecutionId, this.wrapper.innerContext); - } - - /// - /// Gets the entity context for the orchestration, which stores all the entity-related orchestration state. - /// - internal OrchestrationEntityContext EntityContext { get; } - - /// - public override async Task LockEntitiesAsync(IEnumerable entityIds) - { - Check.NotNull(entityIds); - - EntityId[] dtEntities = entityIds.Select(x => new DurableTaskCore.Entities.EntityId(x.Name, x.Key)).ToArray(); - - if (dtEntities.Length == 0) - { - throw new ArgumentException("The list of entities to lock must not be empty.", nameof(entityIds)); - } - - if (!this.EntityContext.ValidateAcquireTransition(out string? errormsg)) - { - throw new InvalidOperationException(errormsg); - } - - // use a deterministically replayable unique ID for this lock request, and to receive the response - Guid criticalSectionId = this.wrapper.NewGuid(); - - // send a message to the first entity to be acquired - EntityMessageEvent entityMessageEvent = this.EntityContext.EmitAcquireMessage(criticalSectionId, dtEntities); - - if (!this.wrapper.IsReplaying) - { - // this.Config.TraceHelper.SendingEntityMessage( - // this.InstanceId, - // this.ExecutionId, - // entityMessageEvent.TargetInstance.InstanceId, - // entityMessageEvent.EventName, - // entityMessageEvent.ToString()); - } - - this.wrapper.innerContext.SendEvent(entityMessageEvent.TargetInstance, entityMessageEvent.EventName, entityMessageEvent.AsRawInput()); - - OperationResult result = await this.wrapper.WaitForExternalEvent(criticalSectionId.ToString()); - - this.EntityContext.CompleteAcquire(result, criticalSectionId); - - // return an IDisposable that releases the lock - return new LockReleaser(this, criticalSectionId); - } - - /// - public override async Task CallEntityAsync(EntityInstanceId id, string operationName, object? input = null, CallEntityOptions? options = null) - { - Check.NotDefault(id); - OperationResult operationResult = await this.CallEntityInternalAsync(id, operationName, input); - - if (operationResult.IsError) - { - throw new EntityOperationFailedException(id, operationName, ConvertFailureDetails(operationResult.FailureDetails!)); - } - else - { - return this.wrapper.DataConverter.Deserialize(operationResult.Result!); - } - } - - /// - public override async Task CallEntityAsync(EntityInstanceId id, string operationName, object? input = null, CallEntityOptions? options = null) - { - Check.NotDefault(id); - OperationResult operationResult = await this.CallEntityInternalAsync(id, operationName, input); - - if (operationResult.IsError) - { - throw new EntityOperationFailedException(id, operationName, ConvertFailureDetails(operationResult.FailureDetails!)); - } - } - - /// - public override Task SignalEntityAsync(EntityInstanceId id, string operationName, object? input = null, SignalEntityOptions? options = null) - { - Check.NotDefault(id); - this.SendOperationMessage(id.ToString(), operationName, input, oneWay: true, scheduledTime: options?.SignalTime); - return Task.CompletedTask; - } - - /// - public override bool InCriticalSection([NotNullWhen(true)] out IReadOnlyList? entityIds) - { - if (this.EntityContext.IsInsideCriticalSection) - { - entityIds = this.EntityContext.GetAvailableEntities().Select(x => new EntityInstanceId(x.Name, x.Key)).ToList(); - return true; - } - else - { - entityIds = EmptyEntityList; - return false; - } - } - - /// - /// exits the critical section, if currently within a critical section. Otherwise, this has no effect. - /// - /// exit the critical section only if the critical section ID matches. - public void ExitCriticalSection(Guid? matchCriticalSectionId = null) - { - if (this.EntityContext.IsInsideCriticalSection - && (matchCriticalSectionId == null || matchCriticalSectionId == this.EntityContext.CurrentCriticalSectionId)) - { - foreach (EntityMessageEvent releaseMessage in this.EntityContext.EmitLockReleaseMessages()) - { - if (!this.wrapper.IsReplaying) - { - // this.Config.TraceHelper.SendingEntityMessage( - // this.InstanceId, - // this.ExecutionId, - // releaseMessage.TargetInstance.InstanceId, - // releaseMessage.EventName, - // releaseMessage.EventContent); - } - - this.wrapper.innerContext.SendEvent(releaseMessage.TargetInstance, releaseMessage.EventName, releaseMessage.AsRawInput()); - } - } - } - - static TaskFailureDetails ConvertFailureDetails(FailureDetails failureDetails) - => new( - failureDetails.ErrorType, - failureDetails.ErrorMessage, - failureDetails.StackTrace, - failureDetails.InnerFailure != null ? ConvertFailureDetails(failureDetails.InnerFailure) : null); - - async Task CallEntityInternalAsync(EntityInstanceId id, string operationName, object? input) - { - string instanceId = id.ToString(); - Guid requestId = this.SendOperationMessage(instanceId, operationName, input, oneWay: false, scheduledTime: null); - - OperationResult response = await this.wrapper.WaitForExternalEvent(requestId.ToString()); - - if (this.EntityContext.IsInsideCriticalSection) - { - // the lock is available again now that the entity call returned - this.EntityContext.RecoverLockAfterCall(id.ToString()); - } - - return response; - } - - Guid SendOperationMessage(string instanceId, string operationName, object? input, bool oneWay, DateTimeOffset? scheduledTime) - { - if (!this.EntityContext.ValidateOperationTransition(instanceId, oneWay, out string? errorMessage)) - { - throw new InvalidOperationException(errorMessage); - } - - Guid guid = this.wrapper.NewGuid(); // deterministically replayable unique id for this request - string? serializedInput = this.wrapper.DataConverter.Serialize(input); - var target = new OrchestrationInstance() { InstanceId = instanceId }; - - EntityMessageEvent entityMessageEvent = this.EntityContext.EmitRequestMessage( - target, - operationName, - oneWay, - guid, - EntityMessageEvent.GetCappedScheduledTime(this.wrapper.innerContext.CurrentUtcDateTime, this.wrapper.invocationContext.Options.MaximumTimerInterval, scheduledTime?.UtcDateTime), - serializedInput); - - if (!this.wrapper.IsReplaying) - { - // this.Config.TraceHelper.SendingEntityMessage( - // this.InstanceId, - // this.ExecutionId, - // target.InstanceId, - // entityMessageEvent.EventName, - // entityMessageEvent.ToString()); - } - - this.wrapper.innerContext.SendEvent(entityMessageEvent.TargetInstance, entityMessageEvent.EventName, entityMessageEvent.AsRawInput()); - - return guid; - } - - class LockReleaser : IAsyncDisposable - { - readonly TaskOrchestrationEntityContext context; - readonly Guid criticalSectionId; - - public LockReleaser(TaskOrchestrationEntityContext context, Guid criticalSectionId) - { - this.context = context; - this.criticalSectionId = criticalSectionId; - } - - public ValueTask DisposeAsync() - { - this.context.ExitCriticalSection(this.criticalSectionId); - return default; - } - } - } -} +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Diagnostics.CodeAnalysis; +using System.Globalization; +using System.Security.Cryptography; +using System.Text; +using DurableTask.Core; +using DurableTask.Core.Entities; +using DurableTask.Core.Entities.OperationFormat; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using DurableTaskCore = DurableTask.Core; + +namespace Microsoft.DurableTask.Worker.Shims; + +/// +/// A wrapper to go from to . +/// +sealed partial class TaskOrchestrationContextWrapper +{ + /// + /// A wrapper to go from to . + /// + sealed class TaskOrchestrationEntityContext : TaskOrchestrationEntityFeature + { + static readonly List EmptyEntityList = new(); + + readonly TaskOrchestrationContextWrapper wrapper; + + /// + /// Initializes a new instance of the class. + /// + /// The wrapper for the orchestration context. + public TaskOrchestrationEntityContext(TaskOrchestrationContextWrapper taskOrchestrationContextWrapper) + { + this.wrapper = taskOrchestrationContextWrapper; + this.EntityContext = new OrchestrationEntityContext(this.wrapper.InstanceId, this.wrapper.innerContext.OrchestrationInstance.ExecutionId, this.wrapper.innerContext); + } + + /// + /// Gets the entity context for the orchestration, which stores all the entity-related orchestration state. + /// + internal OrchestrationEntityContext EntityContext { get; } + + /// + public override async Task LockEntitiesAsync(IEnumerable entityIds) + { + Check.NotNull(entityIds); + + EntityId[] dtEntities = entityIds.Select(x => new DurableTaskCore.Entities.EntityId(x.Name, x.Key)).ToArray(); + + if (dtEntities.Length == 0) + { + throw new ArgumentException("The list of entities to lock must not be empty.", nameof(entityIds)); + } + + if (!this.EntityContext.ValidateAcquireTransition(out string? errormsg)) + { + throw new InvalidOperationException(errormsg); + } + + // use a deterministically replayable unique ID for this lock request, and to receive the response + Guid criticalSectionId = this.wrapper.NewGuid(); + + // send a message to the first entity to be acquired + EntityMessageEvent entityMessageEvent = this.EntityContext.EmitAcquireMessage(criticalSectionId, dtEntities); + + if (!this.wrapper.IsReplaying) + { + // this.Config.TraceHelper.SendingEntityMessage( + // this.InstanceId, + // this.ExecutionId, + // entityMessageEvent.TargetInstance.InstanceId, + // entityMessageEvent.EventName, + // entityMessageEvent.ToString()); + } + + this.wrapper.innerContext.SendEvent(entityMessageEvent.TargetInstance, entityMessageEvent.EventName, entityMessageEvent.AsRawInput()); + + OperationResult result = await this.wrapper.WaitForExternalEvent(criticalSectionId.ToString()); + + this.EntityContext.CompleteAcquire(result, criticalSectionId); + + // return an IDisposable that releases the lock + return new LockReleaser(this, criticalSectionId); + } + + /// + public override async Task CallEntityAsync(EntityInstanceId id, string operationName, object? input = null, CallEntityOptions? options = null) + { + Check.NotDefault(id); + OperationResult operationResult = await this.CallEntityInternalAsync(id, operationName, input); + + if (operationResult.IsError) + { + throw new EntityOperationFailedException(id, operationName, ConvertFailureDetails(operationResult.FailureDetails!)); + } + else + { + return this.wrapper.DataConverter.Deserialize(operationResult.Result!); + } + } + + /// + public override async Task CallEntityAsync(EntityInstanceId id, string operationName, object? input = null, CallEntityOptions? options = null) + { + Check.NotDefault(id); + OperationResult operationResult = await this.CallEntityInternalAsync(id, operationName, input); + + if (operationResult.IsError) + { + throw new EntityOperationFailedException(id, operationName, ConvertFailureDetails(operationResult.FailureDetails!)); + } + } + + /// + public override Task SignalEntityAsync(EntityInstanceId id, string operationName, object? input = null, SignalEntityOptions? options = null) + { + Check.NotDefault(id); + this.SendOperationMessage(id.ToString(), operationName, input, oneWay: true, scheduledTime: options?.SignalTime); + return Task.CompletedTask; + } + + /// + public override bool InCriticalSection([NotNullWhen(true)] out IReadOnlyList? entityIds) + { + if (this.EntityContext.IsInsideCriticalSection) + { + entityIds = this.EntityContext.GetAvailableEntities().Select(x => new EntityInstanceId(x.Name, x.Key)).ToList(); + return true; + } + else + { + entityIds = EmptyEntityList; + return false; + } + } + + /// + /// exits the critical section, if currently within a critical section. Otherwise, this has no effect. + /// + /// exit the critical section only if the critical section ID matches. + public void ExitCriticalSection(Guid? matchCriticalSectionId = null) + { + if (this.EntityContext.IsInsideCriticalSection + && (matchCriticalSectionId == null || matchCriticalSectionId == this.EntityContext.CurrentCriticalSectionId)) + { + foreach (EntityMessageEvent releaseMessage in this.EntityContext.EmitLockReleaseMessages()) + { + if (!this.wrapper.IsReplaying) + { + // this.Config.TraceHelper.SendingEntityMessage( + // this.InstanceId, + // this.ExecutionId, + // releaseMessage.TargetInstance.InstanceId, + // releaseMessage.EventName, + // releaseMessage.EventContent); + } + + this.wrapper.innerContext.SendEvent(releaseMessage.TargetInstance, releaseMessage.EventName, releaseMessage.AsRawInput()); + } + } + } + + static TaskFailureDetails ConvertFailureDetails(FailureDetails failureDetails) + => new( + failureDetails.ErrorType, + failureDetails.ErrorMessage, + failureDetails.StackTrace, + failureDetails.InnerFailure != null ? ConvertFailureDetails(failureDetails.InnerFailure) : null); + + async Task CallEntityInternalAsync(EntityInstanceId id, string operationName, object? input) + { + string instanceId = id.ToString(); + Guid requestId = this.SendOperationMessage(instanceId, operationName, input, oneWay: false, scheduledTime: null); + + OperationResult response = await this.wrapper.WaitForExternalEvent(requestId.ToString()); + + if (this.EntityContext.IsInsideCriticalSection) + { + // the lock is available again now that the entity call returned + this.EntityContext.RecoverLockAfterCall(id.ToString()); + } + + return response; + } + + Guid SendOperationMessage(string instanceId, string operationName, object? input, bool oneWay, DateTimeOffset? scheduledTime) + { + if (!this.EntityContext.ValidateOperationTransition(instanceId, oneWay, out string? errorMessage)) + { + throw new InvalidOperationException(errorMessage); + } + + Guid guid = this.wrapper.NewGuid(); // deterministically replayable unique id for this request + string? serializedInput = this.wrapper.DataConverter.Serialize(input); + var target = new OrchestrationInstance() { InstanceId = instanceId }; + + EntityMessageEvent entityMessageEvent = this.EntityContext.EmitRequestMessage( + target, + operationName, + oneWay, + guid, + EntityMessageEvent.GetCappedScheduledTime(this.wrapper.innerContext.CurrentUtcDateTime, this.wrapper.invocationContext.Options.MaximumTimerInterval, scheduledTime?.UtcDateTime), + serializedInput, + createTrace: true, + requestTime: DateTime.UtcNow); + + if (!this.wrapper.IsReplaying) + { + // this.Config.TraceHelper.SendingEntityMessage( + // this.InstanceId, + // this.ExecutionId, + // target.InstanceId, + // entityMessageEvent.EventName, + // entityMessageEvent.ToString()); + } + + this.wrapper.innerContext.SendEvent(entityMessageEvent.TargetInstance, entityMessageEvent.EventName, entityMessageEvent.AsRawInput()); + + return guid; + } + + class LockReleaser : IAsyncDisposable + { + readonly TaskOrchestrationEntityContext context; + readonly Guid criticalSectionId; + + public LockReleaser(TaskOrchestrationEntityContext context, Guid criticalSectionId) + { + this.context = context; + this.criticalSectionId = criticalSectionId; + } + + public ValueTask DisposeAsync() + { + this.context.ExitCriticalSection(this.criticalSectionId); + return default; + } + } + } +} From e60efe4a81eb7e09551d0ea88a1252b8a6486ec1 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 27 Mar 2025 20:23:41 -0700 Subject: [PATCH 02/15] one small proto change --- src/Grpc/orchestrator_service.proto | 2 +- src/Grpc/versions.txt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index a7f8217c1..a1eae1783 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -573,7 +573,7 @@ message OperationRequest { string operation = 1; string requestId = 2; google.protobuf.StringValue input = 3; - TraceContext TraceContext = 4; + TraceContext traceContext = 4; } message OperationResult { diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 2b7d4a593..47ce9e0fb 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch stevosyan/distributed-tracing-for-entities at 2025-03-24 23:02:20 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/36e201ae2972fe86d42c19c7bc463c52243e39d9/protos/orchestrator_service.proto +# The following files were downloaded from branch stevosyan/distributed-tracing-for-entities-isolated at 2025-03-28 03:19:09 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c3ba3cbd50904b16357ca747d0ab6f1b7aca7602/protos/orchestrator_service.proto From 3d1efc8b31b098416ae28db12eff331cbb2604c4 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 31 Mar 2025 16:24:47 -0700 Subject: [PATCH 03/15] fixed line endings, removed proto changes --- src/Grpc/orchestrator_service.proto | 44 +- src/Grpc/versions.txt | 4 +- .../Shims/TaskOrchestrationEntityContext.cs | 486 +++++++++--------- 3 files changed, 280 insertions(+), 254 deletions(-) diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index a1eae1783..d3fa2de97 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -463,6 +463,7 @@ message PurgeInstanceFilter { message PurgeInstancesResponse { int32 deletedInstanceCount = 1; + google.protobuf.BoolValue isComplete = 2; } message CreateTaskHubRequest { @@ -487,7 +488,6 @@ message SignalEntityRequest { google.protobuf.StringValue input = 3; string requestId = 4; google.protobuf.Timestamp scheduledTime = 5; - TraceContext parentTraceContext = 6; } message SignalEntityResponse { @@ -573,7 +573,6 @@ message OperationRequest { string operation = 1; string requestId = 2; google.protobuf.StringValue input = 3; - TraceContext traceContext = 4; } message OperationResult { @@ -590,12 +589,10 @@ message OperationInfo { message OperationResultSuccess { google.protobuf.StringValue result = 1; - google.protobuf.Timestamp endTime = 2; } message OperationResultFailure { TaskFailureDetails failureDetails = 1; - google.protobuf.Timestamp endTime = 2; } message OperationAction { @@ -611,8 +608,6 @@ message SendSignalAction { string name = 2; google.protobuf.StringValue input = 3; google.protobuf.Timestamp scheduledTime = 4; - google.protobuf.Timestamp requestTime = 5; - TraceContext parentTraceContext = 6; } message StartNewOrchestrationAction { @@ -621,8 +616,30 @@ message StartNewOrchestrationAction { google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; google.protobuf.Timestamp scheduledTime = 5; - google.protobuf.Timestamp requestTime = 6; - TraceContext parentTraceContext = 7; +} + +message AbandonActivityTaskRequest { + string completionToken = 1; +} + +message AbandonActivityTaskResponse { + // Empty. +} + +message AbandonOrchestrationTaskRequest { + string completionToken = 1; +} + +message AbandonOrchestrationTaskResponse { + // Empty. +} + +message AbandonEntityTaskRequest { + string completionToken = 1; +} + +message AbandonEntityTaskResponse { + // Empty. } service TaskHubSidecarService { @@ -686,6 +703,15 @@ service TaskHubSidecarService { // clean entity storage rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse); + + // Abandons a single work item + rpc AbandonTaskActivityWorkItem(AbandonActivityTaskRequest) returns (AbandonActivityTaskResponse); + + // Abandon an orchestration work item + rpc AbandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest) returns (AbandonOrchestrationTaskResponse); + + // Abandon an entity work item + rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse); } message GetWorkItemsRequest { @@ -735,4 +761,4 @@ message StreamInstanceHistoryRequest { message HistoryChunk { repeated HistoryEvent events = 1; -} +} \ No newline at end of file diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 47ce9e0fb..57789cb77 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch stevosyan/distributed-tracing-for-entities-isolated at 2025-03-28 03:19:09 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c3ba3cbd50904b16357ca747d0ab6f1b7aca7602/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-03-24 23:37:31 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c85ef11430ff8e10e21105abb545b0803bb86c66/protos/orchestrator_service.proto \ No newline at end of file diff --git a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs index 347a5731b..bd9e7926a 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs @@ -1,243 +1,243 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System.Diagnostics.CodeAnalysis; -using System.Globalization; -using System.Security.Cryptography; -using System.Text; -using DurableTask.Core; -using DurableTask.Core.Entities; -using DurableTask.Core.Entities.OperationFormat; -using Microsoft.DurableTask.Entities; -using Microsoft.Extensions.Logging; -using DurableTaskCore = DurableTask.Core; - -namespace Microsoft.DurableTask.Worker.Shims; - -/// -/// A wrapper to go from to . -/// -sealed partial class TaskOrchestrationContextWrapper -{ - /// - /// A wrapper to go from to . - /// - sealed class TaskOrchestrationEntityContext : TaskOrchestrationEntityFeature - { - static readonly List EmptyEntityList = new(); - - readonly TaskOrchestrationContextWrapper wrapper; - - /// - /// Initializes a new instance of the class. - /// - /// The wrapper for the orchestration context. - public TaskOrchestrationEntityContext(TaskOrchestrationContextWrapper taskOrchestrationContextWrapper) - { - this.wrapper = taskOrchestrationContextWrapper; - this.EntityContext = new OrchestrationEntityContext(this.wrapper.InstanceId, this.wrapper.innerContext.OrchestrationInstance.ExecutionId, this.wrapper.innerContext); - } - - /// - /// Gets the entity context for the orchestration, which stores all the entity-related orchestration state. - /// - internal OrchestrationEntityContext EntityContext { get; } - - /// - public override async Task LockEntitiesAsync(IEnumerable entityIds) - { - Check.NotNull(entityIds); - - EntityId[] dtEntities = entityIds.Select(x => new DurableTaskCore.Entities.EntityId(x.Name, x.Key)).ToArray(); - - if (dtEntities.Length == 0) - { - throw new ArgumentException("The list of entities to lock must not be empty.", nameof(entityIds)); - } - - if (!this.EntityContext.ValidateAcquireTransition(out string? errormsg)) - { - throw new InvalidOperationException(errormsg); - } - - // use a deterministically replayable unique ID for this lock request, and to receive the response - Guid criticalSectionId = this.wrapper.NewGuid(); - - // send a message to the first entity to be acquired - EntityMessageEvent entityMessageEvent = this.EntityContext.EmitAcquireMessage(criticalSectionId, dtEntities); - - if (!this.wrapper.IsReplaying) - { - // this.Config.TraceHelper.SendingEntityMessage( - // this.InstanceId, - // this.ExecutionId, - // entityMessageEvent.TargetInstance.InstanceId, - // entityMessageEvent.EventName, - // entityMessageEvent.ToString()); - } - - this.wrapper.innerContext.SendEvent(entityMessageEvent.TargetInstance, entityMessageEvent.EventName, entityMessageEvent.AsRawInput()); - - OperationResult result = await this.wrapper.WaitForExternalEvent(criticalSectionId.ToString()); - - this.EntityContext.CompleteAcquire(result, criticalSectionId); - - // return an IDisposable that releases the lock - return new LockReleaser(this, criticalSectionId); - } - - /// - public override async Task CallEntityAsync(EntityInstanceId id, string operationName, object? input = null, CallEntityOptions? options = null) - { - Check.NotDefault(id); - OperationResult operationResult = await this.CallEntityInternalAsync(id, operationName, input); - - if (operationResult.IsError) - { - throw new EntityOperationFailedException(id, operationName, ConvertFailureDetails(operationResult.FailureDetails!)); - } - else - { - return this.wrapper.DataConverter.Deserialize(operationResult.Result!); - } - } - - /// - public override async Task CallEntityAsync(EntityInstanceId id, string operationName, object? input = null, CallEntityOptions? options = null) - { - Check.NotDefault(id); - OperationResult operationResult = await this.CallEntityInternalAsync(id, operationName, input); - - if (operationResult.IsError) - { - throw new EntityOperationFailedException(id, operationName, ConvertFailureDetails(operationResult.FailureDetails!)); - } - } - - /// - public override Task SignalEntityAsync(EntityInstanceId id, string operationName, object? input = null, SignalEntityOptions? options = null) - { - Check.NotDefault(id); - this.SendOperationMessage(id.ToString(), operationName, input, oneWay: true, scheduledTime: options?.SignalTime); - return Task.CompletedTask; - } - - /// - public override bool InCriticalSection([NotNullWhen(true)] out IReadOnlyList? entityIds) - { - if (this.EntityContext.IsInsideCriticalSection) - { - entityIds = this.EntityContext.GetAvailableEntities().Select(x => new EntityInstanceId(x.Name, x.Key)).ToList(); - return true; - } - else - { - entityIds = EmptyEntityList; - return false; - } - } - - /// - /// exits the critical section, if currently within a critical section. Otherwise, this has no effect. - /// - /// exit the critical section only if the critical section ID matches. - public void ExitCriticalSection(Guid? matchCriticalSectionId = null) - { - if (this.EntityContext.IsInsideCriticalSection - && (matchCriticalSectionId == null || matchCriticalSectionId == this.EntityContext.CurrentCriticalSectionId)) - { - foreach (EntityMessageEvent releaseMessage in this.EntityContext.EmitLockReleaseMessages()) - { - if (!this.wrapper.IsReplaying) - { - // this.Config.TraceHelper.SendingEntityMessage( - // this.InstanceId, - // this.ExecutionId, - // releaseMessage.TargetInstance.InstanceId, - // releaseMessage.EventName, - // releaseMessage.EventContent); - } - - this.wrapper.innerContext.SendEvent(releaseMessage.TargetInstance, releaseMessage.EventName, releaseMessage.AsRawInput()); - } - } - } - - static TaskFailureDetails ConvertFailureDetails(FailureDetails failureDetails) - => new( - failureDetails.ErrorType, - failureDetails.ErrorMessage, - failureDetails.StackTrace, - failureDetails.InnerFailure != null ? ConvertFailureDetails(failureDetails.InnerFailure) : null); - - async Task CallEntityInternalAsync(EntityInstanceId id, string operationName, object? input) - { - string instanceId = id.ToString(); - Guid requestId = this.SendOperationMessage(instanceId, operationName, input, oneWay: false, scheduledTime: null); - - OperationResult response = await this.wrapper.WaitForExternalEvent(requestId.ToString()); - - if (this.EntityContext.IsInsideCriticalSection) - { - // the lock is available again now that the entity call returned - this.EntityContext.RecoverLockAfterCall(id.ToString()); - } - - return response; - } - - Guid SendOperationMessage(string instanceId, string operationName, object? input, bool oneWay, DateTimeOffset? scheduledTime) - { - if (!this.EntityContext.ValidateOperationTransition(instanceId, oneWay, out string? errorMessage)) - { - throw new InvalidOperationException(errorMessage); - } - - Guid guid = this.wrapper.NewGuid(); // deterministically replayable unique id for this request - string? serializedInput = this.wrapper.DataConverter.Serialize(input); - var target = new OrchestrationInstance() { InstanceId = instanceId }; - - EntityMessageEvent entityMessageEvent = this.EntityContext.EmitRequestMessage( - target, - operationName, - oneWay, - guid, - EntityMessageEvent.GetCappedScheduledTime(this.wrapper.innerContext.CurrentUtcDateTime, this.wrapper.invocationContext.Options.MaximumTimerInterval, scheduledTime?.UtcDateTime), - serializedInput, - createTrace: true, - requestTime: DateTime.UtcNow); - - if (!this.wrapper.IsReplaying) - { - // this.Config.TraceHelper.SendingEntityMessage( - // this.InstanceId, - // this.ExecutionId, - // target.InstanceId, - // entityMessageEvent.EventName, - // entityMessageEvent.ToString()); - } - - this.wrapper.innerContext.SendEvent(entityMessageEvent.TargetInstance, entityMessageEvent.EventName, entityMessageEvent.AsRawInput()); - - return guid; - } - - class LockReleaser : IAsyncDisposable - { - readonly TaskOrchestrationEntityContext context; - readonly Guid criticalSectionId; - - public LockReleaser(TaskOrchestrationEntityContext context, Guid criticalSectionId) - { - this.context = context; - this.criticalSectionId = criticalSectionId; - } - - public ValueTask DisposeAsync() - { - this.context.ExitCriticalSection(this.criticalSectionId); - return default; - } - } - } -} +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Diagnostics.CodeAnalysis; +using System.Globalization; +using System.Security.Cryptography; +using System.Text; +using DurableTask.Core; +using DurableTask.Core.Entities; +using DurableTask.Core.Entities.OperationFormat; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using DurableTaskCore = DurableTask.Core; + +namespace Microsoft.DurableTask.Worker.Shims; + +/// +/// A wrapper to go from to . +/// +sealed partial class TaskOrchestrationContextWrapper +{ + /// + /// A wrapper to go from to . + /// + sealed class TaskOrchestrationEntityContext : TaskOrchestrationEntityFeature + { + static readonly List EmptyEntityList = new(); + + readonly TaskOrchestrationContextWrapper wrapper; + + /// + /// Initializes a new instance of the class. + /// + /// The wrapper for the orchestration context. + public TaskOrchestrationEntityContext(TaskOrchestrationContextWrapper taskOrchestrationContextWrapper) + { + this.wrapper = taskOrchestrationContextWrapper; + this.EntityContext = new OrchestrationEntityContext(this.wrapper.InstanceId, this.wrapper.innerContext.OrchestrationInstance.ExecutionId, this.wrapper.innerContext); + } + + /// + /// Gets the entity context for the orchestration, which stores all the entity-related orchestration state. + /// + internal OrchestrationEntityContext EntityContext { get; } + + /// + public override async Task LockEntitiesAsync(IEnumerable entityIds) + { + Check.NotNull(entityIds); + + EntityId[] dtEntities = entityIds.Select(x => new DurableTaskCore.Entities.EntityId(x.Name, x.Key)).ToArray(); + + if (dtEntities.Length == 0) + { + throw new ArgumentException("The list of entities to lock must not be empty.", nameof(entityIds)); + } + + if (!this.EntityContext.ValidateAcquireTransition(out string? errormsg)) + { + throw new InvalidOperationException(errormsg); + } + + // use a deterministically replayable unique ID for this lock request, and to receive the response + Guid criticalSectionId = this.wrapper.NewGuid(); + + // send a message to the first entity to be acquired + EntityMessageEvent entityMessageEvent = this.EntityContext.EmitAcquireMessage(criticalSectionId, dtEntities); + + if (!this.wrapper.IsReplaying) + { + // this.Config.TraceHelper.SendingEntityMessage( + // this.InstanceId, + // this.ExecutionId, + // entityMessageEvent.TargetInstance.InstanceId, + // entityMessageEvent.EventName, + // entityMessageEvent.ToString()); + } + + this.wrapper.innerContext.SendEvent(entityMessageEvent.TargetInstance, entityMessageEvent.EventName, entityMessageEvent.AsRawInput()); + + OperationResult result = await this.wrapper.WaitForExternalEvent(criticalSectionId.ToString()); + + this.EntityContext.CompleteAcquire(result, criticalSectionId); + + // return an IDisposable that releases the lock + return new LockReleaser(this, criticalSectionId); + } + + /// + public override async Task CallEntityAsync(EntityInstanceId id, string operationName, object? input = null, CallEntityOptions? options = null) + { + Check.NotDefault(id); + OperationResult operationResult = await this.CallEntityInternalAsync(id, operationName, input); + + if (operationResult.IsError) + { + throw new EntityOperationFailedException(id, operationName, ConvertFailureDetails(operationResult.FailureDetails!)); + } + else + { + return this.wrapper.DataConverter.Deserialize(operationResult.Result!); + } + } + + /// + public override async Task CallEntityAsync(EntityInstanceId id, string operationName, object? input = null, CallEntityOptions? options = null) + { + Check.NotDefault(id); + OperationResult operationResult = await this.CallEntityInternalAsync(id, operationName, input); + + if (operationResult.IsError) + { + throw new EntityOperationFailedException(id, operationName, ConvertFailureDetails(operationResult.FailureDetails!)); + } + } + + /// + public override Task SignalEntityAsync(EntityInstanceId id, string operationName, object? input = null, SignalEntityOptions? options = null) + { + Check.NotDefault(id); + this.SendOperationMessage(id.ToString(), operationName, input, oneWay: true, scheduledTime: options?.SignalTime); + return Task.CompletedTask; + } + + /// + public override bool InCriticalSection([NotNullWhen(true)] out IReadOnlyList? entityIds) + { + if (this.EntityContext.IsInsideCriticalSection) + { + entityIds = this.EntityContext.GetAvailableEntities().Select(x => new EntityInstanceId(x.Name, x.Key)).ToList(); + return true; + } + else + { + entityIds = EmptyEntityList; + return false; + } + } + + /// + /// exits the critical section, if currently within a critical section. Otherwise, this has no effect. + /// + /// exit the critical section only if the critical section ID matches. + public void ExitCriticalSection(Guid? matchCriticalSectionId = null) + { + if (this.EntityContext.IsInsideCriticalSection + && (matchCriticalSectionId == null || matchCriticalSectionId == this.EntityContext.CurrentCriticalSectionId)) + { + foreach (EntityMessageEvent releaseMessage in this.EntityContext.EmitLockReleaseMessages()) + { + if (!this.wrapper.IsReplaying) + { + // this.Config.TraceHelper.SendingEntityMessage( + // this.InstanceId, + // this.ExecutionId, + // releaseMessage.TargetInstance.InstanceId, + // releaseMessage.EventName, + // releaseMessage.EventContent); + } + + this.wrapper.innerContext.SendEvent(releaseMessage.TargetInstance, releaseMessage.EventName, releaseMessage.AsRawInput()); + } + } + } + + static TaskFailureDetails ConvertFailureDetails(FailureDetails failureDetails) + => new( + failureDetails.ErrorType, + failureDetails.ErrorMessage, + failureDetails.StackTrace, + failureDetails.InnerFailure != null ? ConvertFailureDetails(failureDetails.InnerFailure) : null); + + async Task CallEntityInternalAsync(EntityInstanceId id, string operationName, object? input) + { + string instanceId = id.ToString(); + Guid requestId = this.SendOperationMessage(instanceId, operationName, input, oneWay: false, scheduledTime: null); + + OperationResult response = await this.wrapper.WaitForExternalEvent(requestId.ToString()); + + if (this.EntityContext.IsInsideCriticalSection) + { + // the lock is available again now that the entity call returned + this.EntityContext.RecoverLockAfterCall(id.ToString()); + } + + return response; + } + + Guid SendOperationMessage(string instanceId, string operationName, object? input, bool oneWay, DateTimeOffset? scheduledTime) + { + if (!this.EntityContext.ValidateOperationTransition(instanceId, oneWay, out string? errorMessage)) + { + throw new InvalidOperationException(errorMessage); + } + + Guid guid = this.wrapper.NewGuid(); // deterministically replayable unique id for this request + string? serializedInput = this.wrapper.DataConverter.Serialize(input); + var target = new OrchestrationInstance() { InstanceId = instanceId }; + + EntityMessageEvent entityMessageEvent = this.EntityContext.EmitRequestMessage( + target, + operationName, + oneWay, + guid, + EntityMessageEvent.GetCappedScheduledTime(this.wrapper.innerContext.CurrentUtcDateTime, this.wrapper.invocationContext.Options.MaximumTimerInterval, scheduledTime?.UtcDateTime), + serializedInput, + createTrace: true, + requestTime: DateTime.UtcNow); + + if (!this.wrapper.IsReplaying) + { + // this.Config.TraceHelper.SendingEntityMessage( + // this.InstanceId, + // this.ExecutionId, + // target.InstanceId, + // entityMessageEvent.EventName, + // entityMessageEvent.ToString()); + } + + this.wrapper.innerContext.SendEvent(entityMessageEvent.TargetInstance, entityMessageEvent.EventName, entityMessageEvent.AsRawInput()); + + return guid; + } + + class LockReleaser : IAsyncDisposable + { + readonly TaskOrchestrationEntityContext context; + readonly Guid criticalSectionId; + + public LockReleaser(TaskOrchestrationEntityContext context, Guid criticalSectionId) + { + this.context = context; + this.criticalSectionId = criticalSectionId; + } + + public ValueTask DisposeAsync() + { + this.context.ExitCriticalSection(this.criticalSectionId); + return default; + } + } + } +} From ea9c3c8013c4995348076ae9d18e8da747135c3e Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 31 Mar 2025 16:31:32 -0700 Subject: [PATCH 04/15] trying to revert proto changes --- src/Grpc/orchestrator_service.proto | 46 +++++------------------------ src/Grpc/versions.txt | 4 +-- 2 files changed, 10 insertions(+), 40 deletions(-) diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index d3fa2de97..0fa6b6595 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -75,7 +75,6 @@ message ExecutionStartedEvent { google.protobuf.Timestamp scheduledStartTimestamp = 6; TraceContext parentTraceContext = 7; google.protobuf.StringValue orchestrationSpanID = 8; - map tags = 9; } message ExecutionCompletedEvent { @@ -344,8 +343,14 @@ message CreateInstanceRequest { } message OrchestrationIdReusePolicy { - repeated OrchestrationStatus replaceableStatus = 1; - reserved 2; + repeated OrchestrationStatus operationStatus = 1; + CreateOrchestrationAction action = 2; +} + +enum CreateOrchestrationAction { + ERROR = 0; + IGNORE = 1; + TERMINATE = 2; } message CreateInstanceResponse { @@ -386,7 +391,6 @@ message OrchestrationState { google.protobuf.StringValue executionId = 12; google.protobuf.Timestamp completedTimestamp = 13; google.protobuf.StringValue parentInstanceId = 14; - map tags = 15; } message RaiseEventRequest { @@ -463,7 +467,6 @@ message PurgeInstanceFilter { message PurgeInstancesResponse { int32 deletedInstanceCount = 1; - google.protobuf.BoolValue isComplete = 2; } message CreateTaskHubRequest { @@ -618,30 +621,6 @@ message StartNewOrchestrationAction { google.protobuf.Timestamp scheduledTime = 5; } -message AbandonActivityTaskRequest { - string completionToken = 1; -} - -message AbandonActivityTaskResponse { - // Empty. -} - -message AbandonOrchestrationTaskRequest { - string completionToken = 1; -} - -message AbandonOrchestrationTaskResponse { - // Empty. -} - -message AbandonEntityTaskRequest { - string completionToken = 1; -} - -message AbandonEntityTaskResponse { - // Empty. -} - service TaskHubSidecarService { // Sends a hello request to the sidecar service. rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty); @@ -703,15 +682,6 @@ service TaskHubSidecarService { // clean entity storage rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse); - - // Abandons a single work item - rpc AbandonTaskActivityWorkItem(AbandonActivityTaskRequest) returns (AbandonActivityTaskResponse); - - // Abandon an orchestration work item - rpc AbandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest) returns (AbandonOrchestrationTaskResponse); - - // Abandon an entity work item - rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse); } message GetWorkItemsRequest { diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 57789cb77..1173282bf 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-03-24 23:37:31 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c85ef11430ff8e10e21105abb545b0803bb86c66/protos/orchestrator_service.proto \ No newline at end of file +# The following files were downloaded from branch main at 2025-02-19 06:25:02 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/589cb5ecd9dd4b1fe463750defa3e2c84276b079/protos/orchestrator_service.proto \ No newline at end of file From d3b8f8bef0613cc3168ba6465f6d29ae7c510cf2 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 31 Mar 2025 16:33:02 -0700 Subject: [PATCH 05/15] line endings :( --- src/Grpc/versions.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 1173282bf..31dd281dd 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-02-19 06:25:02 UTC +# The following files were downloaded from branch main at 2025-02-19 06:25:02 UTC https://raw.githubusercontent.com/microsoft/durabletask-protobuf/589cb5ecd9dd4b1fe463750defa3e2c84276b079/protos/orchestrator_service.proto \ No newline at end of file From 3ae40dd1625d896515bd23adc0a783e80baf8442 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 31 Mar 2025 16:33:43 -0700 Subject: [PATCH 06/15] one more time? --- src/Grpc/versions.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 31dd281dd..ca514f29a 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ # The following files were downloaded from branch main at 2025-02-19 06:25:02 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/589cb5ecd9dd4b1fe463750defa3e2c84276b079/protos/orchestrator_service.proto \ No newline at end of file +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/589cb5ecd9dd4b1fe463750defa3e2c84276b079/protos/orchestrator_service.proto From 22137f342effda3fc18049e2520b72428ba8abc2 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 14 Apr 2025 00:20:00 -0700 Subject: [PATCH 07/15] pushing what i have so far --- src/Client/Grpc/GrpcDurableEntityClient.cs | 3 ++- src/Client/Grpc/GrpcDurableTaskClient.cs | 3 ++- .../ShimDurableEntityClient.cs | 3 ++- .../OrchestrationServiceClientShim/ShimDurableTaskClient.cs | 6 +++++- src/Worker/Core/Shims/TaskEntityShim.cs | 4 ++++ src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs | 2 +- 6 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/Client/Grpc/GrpcDurableEntityClient.cs b/src/Client/Grpc/GrpcDurableEntityClient.cs index 4cce9c981..3e59b36a6 100644 --- a/src/Client/Grpc/GrpcDurableEntityClient.cs +++ b/src/Client/Grpc/GrpcDurableEntityClient.cs @@ -55,7 +55,8 @@ public override async Task SignalEntityAsync( RequestId = requestId.ToString(), Name = operationName, Input = this.dataConverter.Serialize(input), - ScheduledTime = scheduledTime?.ToTimestamp(), + ScheduledTime = scheduledTime?.ToTimestamp(), + RequestTime = DateTimeOffset.UtcNow.ToTimestamp(), }; if (Activity.Current?.Id != null) diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index a408151d7..4acc62674 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -83,7 +83,8 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( Name = orchestratorName.Name, Version = orchestratorName.Version, InstanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N"), - Input = this.DataConverter.Serialize(input), + Input = this.DataConverter.Serialize(input), + RequestTime = DateTimeOffset.UtcNow.ToTimestamp(), }; if (Activity.Current?.Id != null || Activity.Current?.TraceStateString != null) diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs index 2926f3e0c..df3e42b6c 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs @@ -93,7 +93,8 @@ public override async Task SignalEntityAsync( DateTime.UtcNow, this.options.Entities.MaxSignalDelayTimeOrDefault, scheduledTime?.UtcDateTime), - Activity.Current?.Id != null ? new DistributedTraceContext(Activity.Current.Id, Activity.Current.TraceStateString) : null); + requestTime: DateTimeOffset.UtcNow, + createTrace: true); await this.options.Client!.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage()); } diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index ce510fd0a..6cbb54348 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -1,7 +1,9 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using System.Globalization; using DurableTask.Core; using DurableTask.Core.Entities; using DurableTask.Core.History; @@ -175,7 +177,9 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( Name = orchestratorName.Name, Version = orchestratorName.Version, OrchestrationInstance = instance, - ScheduledStartTime = options?.StartAt?.UtcDateTime, + ScheduledStartTime = options?.StartAt?.UtcDateTime, + ParentTraceContext = Activity.Current?.Id != null ? new Core.Tracing.DistributedTraceContext(Activity.Current.Id, Activity.Current.TraceStateString) : null, + Tags = new Dictionary { { OrchestrationTags.CreateTraceForNewOrchestration, "true" }, { OrchestrationTags.RequestTime, DateTimeOffset.UtcNow.ToString(CultureInfo.InvariantCulture) } }, }, }; diff --git a/src/Worker/Core/Shims/TaskEntityShim.cs b/src/Worker/Core/Shims/TaskEntityShim.cs index 74507df4e..5eb811975 100644 --- a/src/Worker/Core/Shims/TaskEntityShim.cs +++ b/src/Worker/Core/Shims/TaskEntityShim.cs @@ -60,6 +60,10 @@ public override async Task ExecuteOperationBatchAsync(EntityB foreach (OperationRequest current in operations.Operations!) { this.operation.SetNameAndInput(current.Operation!, current.Input); + + // The trace context of the current operation becomes the parent trace context of the TaskEntityContext. + // That way, if processing this operation request leads to the TaskEntityContext signaling another entity or starting an orchestration, + // then the parent trace context of these actions will be set to the trace context of whatever operation request triggered them this.context.ParentTraceContext = current.TraceContext; try diff --git a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs index bd9e7926a..4416d0672 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs @@ -204,7 +204,7 @@ Guid SendOperationMessage(string instanceId, string operationName, object? input guid, EntityMessageEvent.GetCappedScheduledTime(this.wrapper.innerContext.CurrentUtcDateTime, this.wrapper.invocationContext.Options.MaximumTimerInterval, scheduledTime?.UtcDateTime), serializedInput, - createTrace: true, + createTrace: oneWay, requestTime: DateTime.UtcNow); if (!this.wrapper.IsReplaying) From 1eed186a672b307b591f0d32f251cc33d2a135a6 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 14 Apr 2025 15:37:28 -0700 Subject: [PATCH 08/15] seems like everything is working and aligned with the new in-process changes --- .../OrchestrationServiceClientShim/ShimDurableEntityClient.cs | 3 ++- src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs index df3e42b6c..2982ea486 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs @@ -93,7 +93,8 @@ public override async Task SignalEntityAsync( DateTime.UtcNow, this.options.Entities.MaxSignalDelayTimeOrDefault, scheduledTime?.UtcDateTime), - requestTime: DateTimeOffset.UtcNow, + Activity.Current != null ? new DistributedTraceContext(Activity.Current.Id!, Activity.Current.TraceStateString) : null, + DateTimeOffset.UtcNow, createTrace: true); await this.options.Client!.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage()); diff --git a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs index 4416d0672..065027658 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs @@ -205,7 +205,7 @@ Guid SendOperationMessage(string instanceId, string operationName, object? input EntityMessageEvent.GetCappedScheduledTime(this.wrapper.innerContext.CurrentUtcDateTime, this.wrapper.invocationContext.Options.MaximumTimerInterval, scheduledTime?.UtcDateTime), serializedInput, createTrace: oneWay, - requestTime: DateTime.UtcNow); + requestTime: DateTimeOffset.UtcNow); if (!this.wrapper.IsReplaying) { From fc191924f82bef3ee9c957f4a9f5aaddd318fd76 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 23 Apr 2025 13:30:50 -0700 Subject: [PATCH 09/15] a very small change to create a trace regardless of calling or signaling an entity from an orchestration --- src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs index 065027658..465ebdf06 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs @@ -204,7 +204,7 @@ Guid SendOperationMessage(string instanceId, string operationName, object? input guid, EntityMessageEvent.GetCappedScheduledTime(this.wrapper.innerContext.CurrentUtcDateTime, this.wrapper.invocationContext.Options.MaximumTimerInterval, scheduledTime?.UtcDateTime), serializedInput, - createTrace: oneWay, + createTrace: true, requestTime: DateTimeOffset.UtcNow); if (!this.wrapper.IsReplaying) From 03f0f9be8d1d51b0255cdc56a458fdfa3c66ac67 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Fri, 25 Apr 2025 15:11:18 -0700 Subject: [PATCH 10/15] added a start time to OperationResult --- src/Shared/Grpc/ProtoUtils.cs | 4 ++++ src/Worker/Core/Shims/TaskEntityShim.cs | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index 8bd61e599..e1dd8b132 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -615,6 +615,7 @@ internal static void ToEntityBatchRequest( return new OperationResult() { Result = operationResult.Success.Result, + StartTime = operationResult.Success.StartTime?.ToDateTime(), EndTime = operationResult.Success.EndTime?.ToDateTime(), }; @@ -622,6 +623,7 @@ internal static void ToEntityBatchRequest( return new OperationResult() { FailureDetails = operationResult.Failure.FailureDetails.ToCore(), + StartTime = operationResult.Failure.StartTime?.ToDateTime(), EndTime = operationResult.Failure.EndTime?.ToDateTime(), }; @@ -650,6 +652,7 @@ internal static void ToEntityBatchRequest( Success = new P.OperationResultSuccess() { Result = operationResult.Result, + StartTime = operationResult.StartTime?.ToTimestamp(), EndTime = operationResult.EndTime?.ToTimestamp(), }, }; @@ -661,6 +664,7 @@ internal static void ToEntityBatchRequest( Failure = new P.OperationResultFailure() { FailureDetails = ToProtobuf(operationResult.FailureDetails), + StartTime = operationResult.StartTime?.ToTimestamp(), EndTime = operationResult.EndTime?.ToTimestamp(), }, }; diff --git a/src/Worker/Core/Shims/TaskEntityShim.cs b/src/Worker/Core/Shims/TaskEntityShim.cs index 5eb811975..0dcf8a755 100644 --- a/src/Worker/Core/Shims/TaskEntityShim.cs +++ b/src/Worker/Core/Shims/TaskEntityShim.cs @@ -58,7 +58,8 @@ public override async Task ExecuteOperationBatchAsync(EntityB List results = new(); foreach (OperationRequest current in operations.Operations!) - { + { + var startTime = DateTime.UtcNow; this.operation.SetNameAndInput(current.Operation!, current.Input); // The trace context of the current operation becomes the parent trace context of the TaskEntityContext. @@ -73,6 +74,7 @@ public override async Task ExecuteOperationBatchAsync(EntityB results.Add(new OperationResult() { Result = serializedResult, + StartTime = startTime, EndTime = DateTime.UtcNow, }); @@ -86,6 +88,7 @@ public override async Task ExecuteOperationBatchAsync(EntityB results.Add(new OperationResult() { FailureDetails = new FailureDetails(applicationException), + StartTime = startTime, EndTime = DateTime.UtcNow, }); From c00278de51170add2dc163e9557916cb44d8b954 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 8 May 2025 13:06:18 -0700 Subject: [PATCH 11/15] missed one file --- src/Shared/Grpc/ProtoUtils.cs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index 9c88371da..7c5ada5f0 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -768,6 +768,14 @@ internal static void ToEntityBatchRequest( Version = startNewOrchestrationAction.Version, InstanceId = startNewOrchestrationAction.InstanceId, ScheduledTime = startNewOrchestrationAction.ScheduledStartTime?.ToTimestamp(), + RequestTime = startNewOrchestrationAction.RequestTime?.ToTimestamp(), + ParentTraceContext = startNewOrchestrationAction.ParentTraceContext != null ? + new P.TraceContext + { + TraceParent = startNewOrchestrationAction.ParentTraceContext.TraceParent, + TraceState = startNewOrchestrationAction.ParentTraceContext.TraceState, + } + : null, }; break; } From a0e0b80daeb66a331bfd01b0ed5adf5c8d4353e6 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Fri, 9 May 2025 23:57:24 -0700 Subject: [PATCH 12/15] some stylistic updates per PR comments --- src/Client/Grpc/GrpcDurableEntityClient.cs | 12 ++++-------- .../ShimDurableEntityClient.cs | 2 +- .../ShimDurableTaskClient.cs | 2 +- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/Client/Grpc/GrpcDurableEntityClient.cs b/src/Client/Grpc/GrpcDurableEntityClient.cs index 3e59b36a6..fd8e63926 100644 --- a/src/Client/Grpc/GrpcDurableEntityClient.cs +++ b/src/Client/Grpc/GrpcDurableEntityClient.cs @@ -59,15 +59,11 @@ public override async Task SignalEntityAsync( RequestTime = DateTimeOffset.UtcNow.ToTimestamp(), }; - if (Activity.Current?.Id != null) + if (Activity.Current is { } activity) { - if (request.ParentTraceContext == null) - { - request.ParentTraceContext = new P.TraceContext(); - } - - request.ParentTraceContext.TraceParent = Activity.Current.Id; - request.ParentTraceContext.TraceState = Activity.Current.TraceStateString; + request.ParentTraceContext ??= new P.TraceContext(); + request.ParentTraceContext.TraceParent = activity.Id; + request.ParentTraceContext.TraceState = activity.TraceStateString; } // TODO this.logger.LogSomething diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs index 2982ea486..4fd8af4ce 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs @@ -93,7 +93,7 @@ public override async Task SignalEntityAsync( DateTime.UtcNow, this.options.Entities.MaxSignalDelayTimeOrDefault, scheduledTime?.UtcDateTime), - Activity.Current != null ? new DistributedTraceContext(Activity.Current.Id!, Activity.Current.TraceStateString) : null, + Activity.Current is { } activity ? new DistributedTraceContext(activity.Id!, activity.TraceStateString) : null, DateTimeOffset.UtcNow, createTrace: true); diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index 0947fb905..1fee7425d 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -187,7 +187,7 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( Version = orchestratorName.Version, OrchestrationInstance = instance, ScheduledStartTime = options?.StartAt?.UtcDateTime, - ParentTraceContext = Activity.Current?.Id != null ? new Core.Tracing.DistributedTraceContext(Activity.Current.Id, Activity.Current.TraceStateString) : null, + ParentTraceContext = Activity.Current is { } activity ? new Core.Tracing.DistributedTraceContext(activity.Id!, activity.TraceStateString) : null, Tags = tags, }, }; From 0c0421e4f75222c415d130ba4c7640e3aff1016c Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 12 May 2025 20:50:09 -0700 Subject: [PATCH 13/15] super tiny style update --- .../OrchestrationServiceClientShim/ShimDurableEntityClient.cs | 2 +- src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs index 4fd8af4ce..4d798479d 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs @@ -94,7 +94,7 @@ public override async Task SignalEntityAsync( this.options.Entities.MaxSignalDelayTimeOrDefault, scheduledTime?.UtcDateTime), Activity.Current is { } activity ? new DistributedTraceContext(activity.Id!, activity.TraceStateString) : null, - DateTimeOffset.UtcNow, + requestTime: DateTimeOffset.UtcNow, createTrace: true); await this.options.Client!.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage()); diff --git a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs index 465ebdf06..330fd1888 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs @@ -204,8 +204,8 @@ Guid SendOperationMessage(string instanceId, string operationName, object? input guid, EntityMessageEvent.GetCappedScheduledTime(this.wrapper.innerContext.CurrentUtcDateTime, this.wrapper.invocationContext.Options.MaximumTimerInterval, scheduledTime?.UtcDateTime), serializedInput, - createTrace: true, - requestTime: DateTimeOffset.UtcNow); + requestTime: DateTimeOffset.UtcNow, + createTrace: true); if (!this.wrapper.IsReplaying) { From f09df538abf8dbfb266ce35e6e3a4b5f8e316cde Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 15 May 2025 21:28:18 -0700 Subject: [PATCH 14/15] changed startTime/endTime to startTimeUtc/endTimeUtc and made the request times DateTimeOffset --- src/Shared/Grpc/ProtoUtils.cs | 20 ++++++++++---------- src/Worker/Core/Shims/TaskEntityShim.cs | 12 ++++++------ 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index 7c5ada5f0..b73d24374 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -617,16 +617,16 @@ internal static void ToEntityBatchRequest( return new OperationResult() { Result = operationResult.Success.Result, - StartTime = operationResult.Success.StartTime?.ToDateTime(), - EndTime = operationResult.Success.EndTime?.ToDateTime(), + StartTimeUtc = operationResult.Success.StartTimeUtc?.ToDateTime(), + EndTimeUtc = operationResult.Success.EndTimeUtc?.ToDateTime(), }; case P.OperationResult.ResultTypeOneofCase.Failure: return new OperationResult() { FailureDetails = operationResult.Failure.FailureDetails.ToCore(), - StartTime = operationResult.Failure.StartTime?.ToDateTime(), - EndTime = operationResult.Failure.EndTime?.ToDateTime(), + StartTimeUtc = operationResult.Failure.StartTimeUtc?.ToDateTime(), + EndTimeUtc = operationResult.Failure.EndTimeUtc?.ToDateTime(), }; default: @@ -654,8 +654,8 @@ internal static void ToEntityBatchRequest( Success = new P.OperationResultSuccess() { Result = operationResult.Result, - StartTime = operationResult.StartTime?.ToTimestamp(), - EndTime = operationResult.EndTime?.ToTimestamp(), + StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(), + EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(), }, }; } @@ -666,8 +666,8 @@ internal static void ToEntityBatchRequest( Failure = new P.OperationResultFailure() { FailureDetails = ToProtobuf(operationResult.FailureDetails), - StartTime = operationResult.StartTime?.ToTimestamp(), - EndTime = operationResult.EndTime?.ToTimestamp(), + StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(), + EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(), }, }; } @@ -696,7 +696,7 @@ internal static void ToEntityBatchRequest( Input = operationAction.SendSignal.Input, InstanceId = operationAction.SendSignal.InstanceId, ScheduledTime = operationAction.SendSignal.ScheduledTime?.ToDateTime(), - RequestTime = operationAction.SendSignal.RequestTime?.ToDateTime(), + RequestTime = operationAction.SendSignal.RequestTime?.ToDateTimeOffset(), ParentTraceContext = operationAction.SendSignal.ParentTraceContext != null ? new DistributedTraceContext( operationAction.SendSignal.ParentTraceContext.TraceParent, @@ -712,7 +712,7 @@ internal static void ToEntityBatchRequest( InstanceId = operationAction.StartNewOrchestration.InstanceId, Version = operationAction.StartNewOrchestration.Version, ScheduledStartTime = operationAction.StartNewOrchestration.ScheduledTime?.ToDateTime(), - RequestTime = operationAction.StartNewOrchestration.RequestTime?.ToDateTime(), + RequestTime = operationAction.StartNewOrchestration.RequestTime?.ToDateTimeOffset(), ParentTraceContext = operationAction.StartNewOrchestration.ParentTraceContext != null ? new DistributedTraceContext( operationAction.StartNewOrchestration.ParentTraceContext.TraceParent, diff --git a/src/Worker/Core/Shims/TaskEntityShim.cs b/src/Worker/Core/Shims/TaskEntityShim.cs index 0dcf8a755..cc5490b74 100644 --- a/src/Worker/Core/Shims/TaskEntityShim.cs +++ b/src/Worker/Core/Shims/TaskEntityShim.cs @@ -74,8 +74,8 @@ public override async Task ExecuteOperationBatchAsync(EntityB results.Add(new OperationResult() { Result = serializedResult, - StartTime = startTime, - EndTime = DateTime.UtcNow, + StartTimeUtc = startTime, + EndTimeUtc = DateTime.UtcNow, }); // the user code completed without exception, so we commit the current state and actions. @@ -88,8 +88,8 @@ public override async Task ExecuteOperationBatchAsync(EntityB results.Add(new OperationResult() { FailureDetails = new FailureDetails(applicationException), - StartTime = startTime, - EndTime = DateTime.UtcNow, + StartTimeUtc = startTime, + EndTimeUtc = DateTime.UtcNow, }); // the user code threw an unhandled exception, so we roll back the state and the actions. @@ -232,7 +232,7 @@ public override void SignalEntity(EntityInstanceId id, string operationName, obj Name = operationName, Input = this.dataConverter.Serialize(input), ScheduledTime = options?.SignalTime?.UtcDateTime, - RequestTime = DateTime.UtcNow, + RequestTime = DateTimeOffset.UtcNow, ParentTraceContext = this.parentTraceContext, }); } @@ -249,7 +249,7 @@ public override string ScheduleNewOrchestration(TaskName name, object? input = n InstanceId = instanceId, Input = this.dataConverter.Serialize(input), ScheduledStartTime = options?.StartAt?.UtcDateTime, - RequestTime = DateTime.UtcNow, + RequestTime = DateTimeOffset.UtcNow, ParentTraceContext = this.parentTraceContext, }); return instanceId; From 9dc8afe3984837dc24fbfe5258bb3a14201bbd26 Mon Sep 17 00:00:00 2001 From: sophiatev <38052607+sophiatev@users.noreply.github.com> Date: Wed, 4 Jun 2025 18:01:49 -0700 Subject: [PATCH 15/15] first commit! (#436) Co-authored-by: Sophia Tevosyan --- Directory.Packages.props | 2 +- eng/targets/Release.props | 2 +- src/Grpc/orchestrator_service.proto | 14 ++++++++++++++ src/Grpc/versions.txt | 4 ++-- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index 81e3d633f..9a87896a6 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -28,7 +28,7 @@ - + diff --git a/eng/targets/Release.props b/eng/targets/Release.props index 61031c15c..697fc3f70 100644 --- a/eng/targets/Release.props +++ b/eng/targets/Release.props @@ -17,7 +17,7 @@ - 1.10.0 + 1.11.0 diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index 88928c3ba..b2ca147b5 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -95,6 +95,7 @@ message TaskScheduledEvent { google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; TraceContext parentTraceContext = 4; + map tags = 5; } message TaskCompletedEvent { @@ -256,6 +257,7 @@ message ScheduleTaskAction { string name = 1; google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; + map tags = 4; } message CreateSubOrchestrationAction { @@ -343,6 +345,7 @@ message CreateInstanceRequest { google.protobuf.StringValue executionId = 7; map tags = 8; TraceContext parentTraceContext = 9; + google.protobuf.Timestamp requestTime = 10; } message OrchestrationIdReusePolicy { @@ -490,6 +493,8 @@ message SignalEntityRequest { google.protobuf.StringValue input = 3; string requestId = 4; google.protobuf.Timestamp scheduledTime = 5; + TraceContext parentTraceContext = 6; + google.protobuf.Timestamp requestTime = 7; } message SignalEntityResponse { @@ -575,6 +580,7 @@ message OperationRequest { string operation = 1; string requestId = 2; google.protobuf.StringValue input = 3; + TraceContext traceContext = 4; } message OperationResult { @@ -591,10 +597,14 @@ message OperationInfo { message OperationResultSuccess { google.protobuf.StringValue result = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; } message OperationResultFailure { TaskFailureDetails failureDetails = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; } message OperationAction { @@ -610,6 +620,8 @@ message SendSignalAction { string name = 2; google.protobuf.StringValue input = 3; google.protobuf.Timestamp scheduledTime = 4; + google.protobuf.Timestamp requestTime = 5; + TraceContext parentTraceContext = 6; } message StartNewOrchestrationAction { @@ -618,6 +630,8 @@ message StartNewOrchestrationAction { google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; google.protobuf.Timestamp scheduledTime = 5; + google.protobuf.Timestamp requestTime = 6; + TraceContext parentTraceContext = 7; } message AbandonActivityTaskRequest { diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index fc90a4409..121ec0ec7 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-04-23 23:27:00 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fbe5bb20835678099fc51a44993ed9b045dee5a6/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-06-02 21:12:34 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fd9369c6a03d6af4e95285e432b7c4e943c06970/protos/orchestrator_service.proto