diff --git a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs index 0bba1c96..a3fa583f 100644 --- a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs +++ b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs @@ -85,6 +85,11 @@ static void ConfigureSchedulerOptions( .Configure(additionalConfig ?? (_ => { })) .ValidateDataAnnotations(); + builder.Configure(options => + { + options.EnableEntitySupport = true; + }); + builder.Services.TryAddEnumerable( ServiceDescriptor.Singleton, ConfigureGrpcChannel>()); builder.UseGrpc(_ => { }); diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index f566163f..0fa6b659 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -288,6 +288,15 @@ message TerminateOrchestrationAction { bool recurse = 3; } +message SendEntityMessageAction { + oneof EntityMessageType { + EntityOperationSignaledEvent entityOperationSignaled = 1; + EntityOperationCalledEvent entityOperationCalled = 2; + EntityLockRequestedEvent entityLockRequested = 3; + EntityUnlockSentEvent entityUnlockSent = 4; + } +} + message OrchestratorAction { int32 id = 1; oneof orchestratorActionType { @@ -297,6 +306,7 @@ message OrchestratorAction { SendEventAction sendEvent = 5; CompleteOrchestrationAction completeOrchestration = 6; TerminateOrchestrationAction terminateOrchestration = 7; + SendEntityMessageAction sendEntityMessage = 8; } } @@ -329,6 +339,7 @@ message CreateInstanceRequest { OrchestrationIdReusePolicy orchestrationIdReusePolicy = 6; google.protobuf.StringValue executionId = 7; map tags = 8; + TraceContext parentTraceContext = 9; } message OrchestrationIdReusePolicy { @@ -550,6 +561,8 @@ message EntityBatchResult { repeated OperationAction actions = 2; google.protobuf.StringValue entityState = 3; TaskFailureDetails failureDetails = 4; + string completionToken = 5; + repeated OperationInfo operationInfos = 6; // used only with DTS } message EntityRequest { @@ -572,6 +585,11 @@ message OperationResult { } } +message OperationInfo { + string requestId = 1; + OrchestrationInstance responseDestination = 2; // null for signals +} + message OperationResultSuccess { google.protobuf.StringValue result = 1; } diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 94b29bd3..ad90d91f 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-01-30 00:06:14 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c672a0dc97c06587d7399ee12f1c5b0b9fc492a7/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-02-11 19:40:58 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/589cb5ecd9dd4b1fe463750defa3e2c84276b079/protos/orchestrator_service.proto diff --git a/src/Shared/Grpc/EntityConversions.cs b/src/Shared/Grpc/EntityConversions.cs new file mode 100644 index 00000000..254bb4a2 --- /dev/null +++ b/src/Shared/Grpc/EntityConversions.cs @@ -0,0 +1,443 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Runtime.Serialization; +using System.Text; +using System.Threading.Tasks; +using DurableTask.Core; +using DurableTask.Core.Entities; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; +using Microsoft.DurableTask; +using Newtonsoft.Json; +using DTCore = DurableTask.Core; +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.DurableTask; + +/// +/// Utilities for converting between representations of entity history events. The older backends represent entity +/// messages as external events, using a JSON encoding defined in DT Core. +/// Starting with the DTS backend, we use explicit, separate +/// protobuf encodings for all entity-related history events. +/// +static class EntityConversions +{ + // we copied the relevant data members from DT.Core to allow us to convert between the data structures + // used in the backend, and the legacy encoding of entities within orchestration histories. + + // The point of this class is to reverse a Newtonsoft serialization that happened in prior DT code. + // To do this reliably we use the same Newtonsoft. + // This is not introducing a new dependency, and should be eliminated once the original dependency is eliminated. + static readonly JsonSerializerSettings ConversionSettings = new JsonSerializerSettings() + { + TypeNameHandling = TypeNameHandling.None, // the type names do not match, so we have to disable them + }; + + /// + /// Encodes an operation signal. + /// + /// The proto event. + /// The core event. + public static DTCore.History.HistoryEvent EncodeOperationSignaled(P.HistoryEvent protoEvent) + { + P.EntityOperationSignaledEvent signaledEvent = protoEvent.EntityOperationSignaled; + DateTime? scheduledTime = signaledEvent.ScheduledTime?.ToDateTime(); + string name = EncodeEventName(scheduledTime); + string input = JsonConvert.SerializeObject( + new RequestMessage() + { + Operation = signaledEvent.Operation, + IsSignal = true, + Input = signaledEvent.Input, + Id = signaledEvent.RequestId, + ScheduledTime = scheduledTime, + }, + ConversionSettings); + + string? target = signaledEvent.TargetInstanceId + ?? throw new InvalidOperationException("missing target instance id"); + + return CreateEventRaisedOrSentEvent(protoEvent.EventId, name, input, target); + } + + /// + /// Encodes an operation call. + /// + /// The proto event. + /// The orchestration instance. + /// The core event. + public static DTCore.History.HistoryEvent EncodeOperationCalled( + P.HistoryEvent protoEvent, + OrchestrationInstance? instance) + { + P.EntityOperationCalledEvent calledEvent = protoEvent.EntityOperationCalled; + DateTime? scheduledTime = calledEvent.ScheduledTime?.ToDateTime(); + string name = EncodeEventName(scheduledTime); + string input = JsonConvert.SerializeObject( + new RequestMessage() + { + Operation = calledEvent.Operation, + IsSignal = false, + Input = calledEvent.Input, + Id = calledEvent.RequestId, + ScheduledTime = scheduledTime, + ParentInstanceId = instance?.InstanceId + ?? throw new InvalidOperationException("missing instance id"), + ParentExecutionId = instance?.ExecutionId, + }, + ConversionSettings); + + string? target = calledEvent.TargetInstanceId + ?? throw new InvalidOperationException("missing target instance id"); + + return CreateEventRaisedOrSentEvent(protoEvent.EventId, name, input, target); + } + + /// + /// Encodes an operation lock. + /// + /// The proto event. + /// The orchestration instance. + /// The core event. + public static DTCore.History.HistoryEvent EncodeLockRequested( + P.HistoryEvent protoEvent, + OrchestrationInstance? instance) + { + P.EntityLockRequestedEvent lockRequestedEvent = protoEvent.EntityLockRequested; + string name = EncodeEventName(null); + string input = JsonConvert.SerializeObject( + new RequestMessage() + { + Operation = null, + Id = lockRequestedEvent.CriticalSectionId, + LockSet = lockRequestedEvent.LockSet.Select(s => EntityId.FromString(s)).ToArray(), + Position = lockRequestedEvent.Position, + ParentInstanceId = instance?.InstanceId + ?? throw new InvalidOperationException("missing instance id"), + }, + ConversionSettings); + + string? target = lockRequestedEvent.LockSet[lockRequestedEvent.Position]; + + return CreateEventRaisedOrSentEvent(protoEvent.EventId, name, input, target); + } + + /// + /// Encodes an unlock message. + /// + /// The proto event. + /// The orchestration instance. + /// The core event. + public static DTCore.History.HistoryEvent EncodeUnlockSent( + P.HistoryEvent protoEvent, + OrchestrationInstance? instance) + { + P.EntityUnlockSentEvent unlockSentEvent = protoEvent.EntityUnlockSent; + string name = EncodeEventName(null); + string input = JsonConvert.SerializeObject( + new ReleaseMessage() + { + Id = unlockSentEvent.CriticalSectionId, + ParentInstanceId = instance?.InstanceId + ?? throw new InvalidOperationException("missing instance id"), + }, + ConversionSettings); + + string? target = unlockSentEvent.TargetInstanceId + ?? throw new InvalidOperationException("missing target instance id"); + + return CreateEventRaisedOrSentEvent(protoEvent.EventId, name, input, target); + } + + /// + /// Encodes a lock grant. + /// + /// The proto event. + /// The core event. + public static DTCore.History.EventRaisedEvent EncodeLockGranted(P.HistoryEvent protoEvent) + { + P.EntityLockGrantedEvent grantEvent = protoEvent.EntityLockGranted; + return new DTCore.History.EventRaisedEvent( + protoEvent.EventId, + JsonConvert.SerializeObject( + new ResponseMessage() + { + Result = ResponseMessage.LockAcquisitionCompletion, + }, + ConversionSettings)) + { + Name = grantEvent.CriticalSectionId, + }; + } + + /// + /// Encodes an operation completion message. + /// + /// The proto event. + /// The core event. + public static DTCore.History.EventRaisedEvent EncodeOperationCompleted(P.HistoryEvent protoEvent) + { + P.EntityOperationCompletedEvent completedEvent = protoEvent.EntityOperationCompleted; + return new DTCore.History.EventRaisedEvent( + protoEvent.EventId, + JsonConvert.SerializeObject( + new ResponseMessage() + { + Result = completedEvent.Output, + }, + ConversionSettings)) + { + Name = completedEvent.RequestId, + }; + } + + /// + /// Encodes an operation failed message. + /// + /// The proto event. + /// The core event. + public static DTCore.History.EventRaisedEvent EncodeOperationFailed(P.HistoryEvent protoEvent) + { + P.EntityOperationFailedEvent failedEvent = protoEvent.EntityOperationFailed; + return new DTCore.History.EventRaisedEvent( + protoEvent.EventId, + JsonConvert.SerializeObject( + new ResponseMessage() + { + ErrorMessage = failedEvent.FailureDetails.ErrorType, + Result = failedEvent.FailureDetails.ErrorMessage, + FailureDetails = failedEvent.FailureDetails.ToCore(), + }, + ConversionSettings)) + { + Name = failedEvent.RequestId, + }; + } + + /// + /// Decodes an orchestration action that sends an entity message from an external event. + /// + /// The name of the external event. + /// The input of the external event. + /// The target of the external event. + /// The protobuf send action which should be assigned the correct action. + /// The request action. + internal static void DecodeEntityMessageAction( + string name, + string input, + string? target, + P.SendEntityMessageAction sendAction, + out string requestId) + { + RequestMessage? message = JsonConvert.DeserializeObject( + input, + ConversionSettings); + + if (message == null) + { + throw new InvalidOperationException("Cannot convert null event"); + } + + if (message.Id == null) + { + throw new InvalidOperationException("missing ID"); + } + + if (name.StartsWith("op", System.StringComparison.Ordinal)) + { + if (message.Operation == null) + { + // this is a lock request + sendAction.EntityLockRequested = new P.EntityLockRequestedEvent + { + CriticalSectionId = requestId = message.Id, + LockSet = { message.LockSet!.Select(e => e.ToString()) }, + ParentInstanceId = message.ParentInstanceId, + Position = message.Position, + }; + } + else + { + // this is an operation call or signal + Timestamp? scheduledTime = null; + + if (name.Length >= 3 && name[2] == '@' && DateTime.TryParse(name[3..], out DateTime time)) + { + scheduledTime = Timestamp.FromDateTime(time.ToUniversalTime()); + } + + if (message.IsSignal) + { + sendAction.EntityOperationSignaled = new P.EntityOperationSignaledEvent + { + RequestId = requestId = message.Id, + Input = message.Input, + Operation = message.Operation, + ScheduledTime = scheduledTime, + TargetInstanceId = target, + }; + } + else + { + sendAction.EntityOperationCalled = new P.EntityOperationCalledEvent + { + RequestId = requestId = message.Id, + Input = message.Input, + Operation = message.Operation, + ScheduledTime = scheduledTime, + ParentInstanceId = message.ParentInstanceId, + TargetInstanceId = target, + }; + } + } + } + else if (name == "release") + { + sendAction.EntityUnlockSent = new P.EntityUnlockSentEvent + { + CriticalSectionId = requestId = message.Id, + TargetInstanceId = target, + ParentInstanceId = message.ParentInstanceId, + }; + } + else + { + throw new InvalidOperationException($"Cannot convert event with name {name}"); + } + } + + static string EncodeEventName(DateTime? scheduledTime) + => scheduledTime.HasValue ? $"op@{scheduledTime.Value:o}" : "op"; + + static DTCore.History.HistoryEvent CreateEventRaisedOrSentEvent( + int eventId, + string name, + string input, + string? target) + { + if (target == null) + { + // the event is used inside a message, so it does not include a target + return new DTCore.History.EventRaisedEvent(eventId, input) + { + Name = name, + }; + } + else + { + // the event is used inside a history, so it includes a target instance + return new DTCore.History.EventSentEvent(eventId) + { + Name = name, + Input = input, + InstanceId = target, + }; + } + } + + /// + /// Copied from DT Core for serialization/deserialization. + /// Modified so that only the relevant data is serialized/deserialized. + /// + [DataContract] + class RequestMessage + { + /// + /// Gets or sets the name of the operation being called (if this is an operation message) or null + /// (if this is a lock request). + /// + [DataMember(Name = "op")] + public string? Operation { get; set; } + + /// + /// Gets or sets a value indicating whether or not this is a one-way message. + /// + [DataMember(Name = "signal", EmitDefaultValue = false)] + public bool IsSignal { get; set; } + + /// + /// Gets or sets the operation input. + /// + [DataMember(Name = "input", EmitDefaultValue = false)] + public string? Input { get; set; } + + /// + /// Gets or sets a unique identifier for this operation. The original data type is GUID but since + /// we use just strings in the backend, we may as well parse this as a string. + /// + [DataMember(Name = "id", IsRequired = true)] + public string? Id { get; set; } + + /// + /// Gets or sets the parent instance that called this operation. + /// + [DataMember(Name = "parent", EmitDefaultValue = false)] + public string? ParentInstanceId { get; set; } + + /// + /// Gets or sets the parent instance that called this operation. + /// + [DataMember(Name = "parentExecution", EmitDefaultValue = false)] + public string? ParentExecutionId { get; set; } + + /// + /// Gets or sets an optional avlue, a scheduled time at which to start the operation. + /// + [DataMember(Name = "due", EmitDefaultValue = false)] + public DateTime? ScheduledTime { get; set; } + + /// + /// Gets or sets the lock set, the set of locks being acquired. Is sorted, + /// contains at least one element, and has no repetitions. + /// + [DataMember(Name = "lockset", EmitDefaultValue = false)] + public DTCore.Entities.EntityId[]? LockSet { get; set; } + + /// + /// Gets or sets the message number For lock requests involving multiple locks. + /// + [DataMember(Name = "pos", EmitDefaultValue = false)] + public int Position { get; set; } + } + + /// + /// Copied from DT Core for serialization/deserialization. + /// Modified so that only the relevant data is serialized/deserialized. + /// + [DataContract] + class ResponseMessage + { + public const string LockAcquisitionCompletion = "Lock Acquisition Completed"; + + [DataMember(Name = "result")] + public string? Result { get; set; } + + [DataMember(Name = "exceptionType", EmitDefaultValue = false)] + public string? ErrorMessage { get; set; } + + [DataMember(Name = "failureDetails", EmitDefaultValue = false)] + public FailureDetails? FailureDetails { get; set; } + + [IgnoreDataMember] + public bool IsErrorResult => this.ErrorMessage != null; + } + + /// + /// Copied from DT Core for serialization/deserialization. + /// Modified so that only the relevant data is serialized/deserialized. + /// + [DataContract] + class ReleaseMessage + { + [DataMember(Name = "parent")] + public string? ParentInstanceId { get; set; } + + [DataMember(Name = "id")] + public string? Id { get; set; } + } +} diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index 405e6847..9ad4d2fd 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -4,6 +4,7 @@ using System.Buffers; using System.Buffers.Text; using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; using System.Text; using DurableTask.Core; using DurableTask.Core.Command; @@ -12,6 +13,7 @@ using DurableTask.Core.History; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; +using DTCore = DurableTask.Core; using P = Microsoft.DurableTask.Protobuf; namespace Microsoft.DurableTask; @@ -28,6 +30,19 @@ static class ProtoUtils /// The converted history event. /// When the provided history event type is not supported. internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto) + { + return ConvertHistoryEvent(proto, conversionState: null); + } + + /// + /// Converts a history event from to , and performs + /// stateful conversions of entity-related events. + /// + /// The proto history event to converter. + /// State needed for converting entity-related history entries and actions. + /// The converted history event. + /// When the provided history event type is not supported. + internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto, EntityConversionState? conversionState) { Check.NotNull(proto); HistoryEvent historyEvent; @@ -37,11 +52,13 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto) historyEvent = new ContinueAsNewEvent(proto.EventId, proto.ContinueAsNew.Input); break; case P.HistoryEvent.EventTypeOneofCase.ExecutionStarted: + OrchestrationInstance instance = proto.ExecutionStarted.OrchestrationInstance.ToCore(); + conversionState?.SetOrchestrationInstance(instance); historyEvent = new ExecutionStartedEvent(proto.EventId, proto.ExecutionStarted.Input) { Name = proto.ExecutionStarted.Name, Version = proto.ExecutionStarted.Version, - OrchestrationInstance = proto.ExecutionStarted.OrchestrationInstance.ToCore(), + OrchestrationInstance = instance, ParentInstance = proto.ExecutionStarted.ParentInstance == null ? null : new ParentInstance { Name = proto.ExecutionStarted.ParentInstance.Name, @@ -144,6 +161,31 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto) Name = proto.EventRaised.Name, }; break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationCalled: + historyEvent = EntityConversions.EncodeOperationCalled(proto, conversionState!.CurrentInstance); + conversionState?.EntityRequestIds.Add(proto.EntityOperationCalled.RequestId); + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationSignaled: + historyEvent = EntityConversions.EncodeOperationSignaled(proto); + conversionState?.EntityRequestIds.Add(proto.EntityOperationSignaled.RequestId); + break; + case P.HistoryEvent.EventTypeOneofCase.EntityLockRequested: + historyEvent = EntityConversions.EncodeLockRequested(proto, conversionState!.CurrentInstance); + conversionState?.AddUnlockObligations(proto.EntityLockRequested); + break; + case P.HistoryEvent.EventTypeOneofCase.EntityUnlockSent: + historyEvent = EntityConversions.EncodeUnlockSent(proto, conversionState!.CurrentInstance); + conversionState?.RemoveUnlockObligation(proto.EntityUnlockSent.TargetInstanceId); + break; + case P.HistoryEvent.EventTypeOneofCase.EntityLockGranted: + historyEvent = EntityConversions.EncodeLockGranted(proto); + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationCompleted: + historyEvent = EntityConversions.EncodeOperationCompleted(proto); + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationFailed: + historyEvent = EntityConversions.EncodeOperationFailed(proto); + break; case P.HistoryEvent.EventTypeOneofCase.GenericEvent: historyEvent = new GenericEvent(proto.EventId, proto.GenericEvent.Data); break; @@ -227,13 +269,15 @@ internal static Timestamp ToTimestamp(this DateTime dateTime) /// The completion token for the work item. It must be the exact same /// value that was provided by the corresponding that triggered the orchestrator execution. /// + /// The entity conversion state, or null if no conversion is required. /// The orchestrator response. /// When an orchestrator action is unknown. internal static P.OrchestratorResponse ConstructOrchestratorResponse( string instanceId, string? customStatus, IEnumerable actions, - string completionToken) + string completionToken, + EntityConversionState? entityConversionState) { Check.NotNull(actions); var response = new P.OrchestratorResponse @@ -283,14 +327,70 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse( $"{nameof(SendEventOrchestratorAction)} cannot have a null Instance property!"); } - protoAction.SendEvent = new P.SendEventAction + if (entityConversionState is not null + && DTCore.Common.Entities.IsEntityInstance(sendEventAction.Instance.InstanceId) + && sendEventAction.EventName is not null + && sendEventAction.EventData is not null) { - Instance = sendEventAction.Instance.ToProtobuf(), - Name = sendEventAction.EventName, - Data = sendEventAction.EventData, - }; + P.SendEntityMessageAction sendAction = new P.SendEntityMessageAction(); + protoAction.SendEntityMessage = sendAction; + + EntityConversions.DecodeEntityMessageAction( + sendEventAction.EventName, + sendEventAction.EventData, + sendEventAction.Instance.InstanceId, + sendAction, + out string requestId); + + entityConversionState.EntityRequestIds.Add(requestId); + + switch (sendAction.EntityMessageTypeCase) + { + case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityLockRequested: + entityConversionState.AddUnlockObligations(sendAction.EntityLockRequested); + break; + case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityUnlockSent: + entityConversionState.RemoveUnlockObligation(sendAction.EntityUnlockSent.TargetInstanceId); + break; + default: + break; + } + } + else + { + protoAction.SendEvent = new P.SendEventAction + { + Instance = sendEventAction.Instance.ToProtobuf(), + Name = sendEventAction.EventName, + Data = sendEventAction.EventData, + }; + } + break; case OrchestratorActionType.OrchestrationComplete: + + if (entityConversionState is not null) + { + // as a precaution, unlock any entities that were not unlocked for some reason, before + // completing the orchestration. + foreach ((string target, string criticalSectionId) in entityConversionState.ResetObligations()) + { + response.Actions.Add(new P.OrchestratorAction + { + Id = action.Id, + SendEntityMessage = new P.SendEntityMessageAction + { + EntityUnlockSent = new P.EntityUnlockSentEvent + { + CriticalSectionId = criticalSectionId, + TargetInstanceId = target, + ParentInstanceId = entityConversionState.CurrentInstance?.InstanceId, + }, + }, + }); + } + } + var completeAction = (OrchestrationCompleteOrchestratorAction)action; protoAction.CompleteOrchestration = new P.CompleteOrchestrationAction { @@ -413,6 +513,63 @@ internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status) }; } + /// + /// Converts a to a . + /// + /// The entity request to convert. + /// The converted request. + /// Additional info about each operation, required by DTS. + internal static void ToEntityBatchRequest( + this P.EntityRequest entityRequest, + out EntityBatchRequest batchRequest, + out List operationInfos) + { + batchRequest = new EntityBatchRequest() + { + EntityState = entityRequest.EntityState, + InstanceId = entityRequest.InstanceId, + Operations = [], // operations are added to this collection below + }; + + operationInfos = new(entityRequest.OperationRequests.Count); + + foreach (P.HistoryEvent? op in entityRequest.OperationRequests) + { + if (op.EntityOperationSignaled is not null) + { + batchRequest.Operations.Add(new OperationRequest + { + Id = Guid.Parse(op.EntityOperationSignaled.RequestId), + Operation = op.EntityOperationSignaled.Operation, + Input = op.EntityOperationSignaled.Input, + }); + operationInfos.Add(new P.OperationInfo + { + RequestId = op.EntityOperationSignaled.RequestId, + ResponseDestination = null, // means we don't send back a response to the caller + }); + } + else if (op.EntityOperationCalled is not null) + { + batchRequest.Operations.Add(new OperationRequest + { + Id = Guid.Parse(op.EntityOperationCalled.RequestId), + Operation = op.EntityOperationCalled.Operation, + Input = op.EntityOperationCalled.Input, + }); + operationInfos.Add(new P.OperationInfo + { + RequestId = op.EntityOperationCalled.RequestId, + ResponseDestination = new P.OrchestrationInstance + { + InstanceId = op.EntityOperationCalled.ParentInstanceId, + ExecutionId = op.EntityOperationCalled.ParentExecutionId, + }, + }); + } + } + } + /// /// Converts a to a . /// @@ -611,32 +768,29 @@ internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status) /// Converts a to . /// /// The operation result to convert. + /// The completion token, or null for the older protocol. + /// Additional information about each operation, required by DTS. /// The converted operation result. [return: NotNullIfNotNull(nameof(entityBatchResult))] - internal static P.EntityBatchResult? ToEntityBatchResult(this EntityBatchResult? entityBatchResult) + internal static P.EntityBatchResult? ToEntityBatchResult( + this EntityBatchResult? entityBatchResult, + string? completionToken = null, + IEnumerable? operationInfos = null) { if (entityBatchResult == null) { return null; } - var batchResult = new P.EntityBatchResult() + return new P.EntityBatchResult() { EntityState = entityBatchResult.EntityState, FailureDetails = entityBatchResult.FailureDetails.ToProtobuf(), + Actions = { entityBatchResult.Actions?.Select(a => a.ToOperationAction()) ?? [] }, + Results = { entityBatchResult.Results?.Select(a => a.ToOperationResult()) ?? [] }, + CompletionToken = completionToken, + OperationInfos = { operationInfos ?? [] }, }; - - foreach (OperationAction action in entityBatchResult.Actions!) - { - batchResult.Actions.Add(action.ToOperationAction()); - } - - foreach (OperationResult result in entityBatchResult.Results!) - { - batchResult.Results.Add(result.ToOperationResult()); - } - - return batchResult; } /// @@ -728,21 +882,12 @@ internal static T Base64Decode(this MessageParser parser, string encodedMessa } } - static P.OrchestrationStatus ToProtobuf(this OrchestrationStatus status) - { - return (P.OrchestrationStatus)status; - } - - static P.OrchestrationInstance ToProtobuf(this OrchestrationInstance instance) - { - return new P.OrchestrationInstance - { - InstanceId = instance.InstanceId, - ExecutionId = instance.ExecutionId, - }; - } - - static FailureDetails? ToCore(this P.TaskFailureDetails? failureDetails) + /// + /// Converts a grpc to a . + /// + /// The failure details to convert. + /// The converted failure details. + internal static FailureDetails? ToCore(this P.TaskFailureDetails? failureDetails) { if (failureDetails == null) { @@ -757,6 +902,11 @@ static P.OrchestrationInstance ToProtobuf(this OrchestrationInstance instance) failureDetails.IsNonRetriable); } + /// + /// Converts a to a grpc . + /// + /// The failure details to convert. + /// The converted failure details. static P.TaskFailureDetails? ToProtobuf(this FailureDetails? failureDetails) { if (failureDetails == null) @@ -773,4 +923,120 @@ static P.OrchestrationInstance ToProtobuf(this OrchestrationInstance instance) InnerFailure = failureDetails.InnerFailure.ToProtobuf(), }; } + + static P.OrchestrationStatus ToProtobuf(this OrchestrationStatus status) + { + return (P.OrchestrationStatus)status; + } + + static P.OrchestrationInstance ToProtobuf(this OrchestrationInstance instance) + { + return new P.OrchestrationInstance + { + InstanceId = instance.InstanceId, + ExecutionId = instance.ExecutionId, + }; + } + + /// + /// Tracks state required for converting orchestration histories containing entity-related events. + /// + internal class EntityConversionState + { + readonly bool insertMissingEntityUnlocks; + + OrchestrationInstance? instance; + HashSet? entityRequestIds; + Dictionary? unlockObligations; + + /// + /// Initializes a new instance of the class. + /// + /// Whether to insert missing unlock events in to the history + /// when the orchestration completes. + public EntityConversionState(bool insertMissingEntityUnlocks) + { + this.ConvertFromProto = (P.HistoryEvent e) => ProtoUtils.ConvertHistoryEvent(e, this); + this.insertMissingEntityUnlocks = insertMissingEntityUnlocks; + } + + /// + /// Gets a function that converts a history event in protobuf format to a core history event. + /// + public Func ConvertFromProto { get; } + + /// + /// Gets the orchestration instance of this history. + /// + public OrchestrationInstance? CurrentInstance => this.instance; + + /// + /// Gets the set of guids that have been used as entity request ids in this history. + /// + public HashSet EntityRequestIds => this.entityRequestIds ??= new(); + + /// + /// Records the orchestration instance, which may be needed for some conversions. + /// + /// The orchestration instance. + public void SetOrchestrationInstance(OrchestrationInstance instance) + { + this.instance = instance; + } + + /// + /// Adds unlock obligations for all entities that are being locked by this request. + /// + /// The lock request. + public void AddUnlockObligations(P.EntityLockRequestedEvent request) + { + if (!this.insertMissingEntityUnlocks) + { + return; + } + + this.unlockObligations ??= new(); + + foreach (string target in request.LockSet) + { + this.unlockObligations[target] = request.CriticalSectionId; + } + } + + /// + /// Removes an unlock obligation. + /// + /// The target entity. + public void RemoveUnlockObligation(string target) + { + if (!this.insertMissingEntityUnlocks) + { + return; + } + + this.unlockObligations?.Remove(target); + } + + /// + /// Returns the remaining unlock obligations, and clears the list. + /// + /// The unlock obligations. + public IEnumerable<(string Target, string CriticalSectionId)> ResetObligations() + { + if (!this.insertMissingEntityUnlocks) + { + yield break; + } + + if (this.unlockObligations is not null) + { + foreach (var kvp in this.unlockObligations) + { + yield return (kvp.Key, kvp.Value); + } + + this.unlockObligations = null; + } + } + } } diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index 2eec714c..63d19fa0 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -3,6 +3,7 @@ using Azure.Core; using Microsoft.DurableTask.Worker.Grpc; +using Microsoft.DurableTask.Worker.Grpc.Internal; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Options; @@ -85,6 +86,12 @@ static void ConfigureSchedulerOptions( .Configure(additionalConfig ?? (_ => { })) .ValidateDataAnnotations(); + builder.Services.AddOptions(builder.Name) + .Configure(options => + { + options.EnableEntitySupport = true; + }); + builder.Services.TryAddEnumerable( ServiceDescriptor.Singleton, ConfigureGrpcChannel>()); builder.UseGrpc(_ => { }); @@ -113,6 +120,7 @@ public void Configure(string? name, GrpcDurableTaskWorkerOptions options) { DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(name ?? Options.DefaultName); options.Channel = source.CreateChannel(); + options.ConfigureForAzureManaged(); } } } diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index 059d592d..81733a06 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -1,11 +1,13 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Diagnostics.CodeAnalysis; using System.Text; using DurableTask.Core; using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.History; +using Grpc.Core; using Microsoft.DurableTask.Entities; using Microsoft.DurableTask.Worker.Shims; using Microsoft.Extensions.DependencyInjection; @@ -28,12 +30,14 @@ class Processor readonly GrpcDurableTaskWorker worker; readonly TaskHubSidecarServiceClient client; readonly DurableTaskShimFactory shimFactory; + readonly GrpcDurableTaskWorkerOptions.InternalOptions internalOptions; public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient client) { this.worker = worker; this.client = client; this.shimFactory = new DurableTaskShimFactory(this.worker.grpcOptions, this.worker.loggerFactory); + this.internalOptions = this.worker.grpcOptions.Internal; } ILogger Logger => this.worker.logger; @@ -107,8 +111,13 @@ static string GetActionsListForLogging(IReadOnlyList actio async ValueTask BuildRuntimeStateAsync( P.OrchestratorRequest orchestratorRequest, + ProtoUtils.EntityConversionState? entityConversionState, CancellationToken cancellation) { + Func converter = entityConversionState is null + ? ProtoUtils.ConvertHistoryEvent + : entityConversionState.ConvertFromProto; + IEnumerable pastEvents = []; if (orchestratorRequest.RequiresHistoryStreaming) { @@ -125,16 +134,16 @@ async ValueTask BuildRuntimeStateAsync( await foreach (P.HistoryChunk chunk in streamResponse.ResponseStream.ReadAllAsync(cancellation)) { - pastEvents = pastEvents.Concat(chunk.Events.Select(ProtoUtils.ConvertHistoryEvent)); + pastEvents = pastEvents.Concat(chunk.Events.Select(converter)); } } else { // The history was already provided in the work item request - pastEvents = orchestratorRequest.PastEvents.Select(ProtoUtils.ConvertHistoryEvent); + pastEvents = orchestratorRequest.PastEvents.Select(converter); } - IEnumerable newEvents = orchestratorRequest.NewEvents.Select(ProtoUtils.ConvertHistoryEvent); + IEnumerable newEvents = orchestratorRequest.NewEvents.Select(converter); // Reconstruct the orchestration state in a way that correctly distinguishes new events from past events var runtimeState = new OrchestrationRuntimeState(pastEvents.ToList()); @@ -203,7 +212,21 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall stream, Ca { this.RunBackgroundTask( workItem, - () => this.OnRunEntityBatchAsync(workItem.EntityRequest, cancellation)); + () => this.OnRunEntityBatchAsync(workItem.EntityRequest.ToEntityBatchRequest(), cancellation)); + } + else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.EntityRequestV2) + { + workItem.EntityRequestV2.ToEntityBatchRequest( + out EntityBatchRequest batchRequest, + out List operationInfos); + + this.RunBackgroundTask( + workItem, + () => this.OnRunEntityBatchAsync( + batchRequest, + cancellation, + workItem.CompletionToken, + operationInfos)); } else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.HealthPing) { @@ -250,9 +273,18 @@ async Task OnRunOrchestratorAsync( P.TaskFailureDetails? failureDetails = null; TaskName name = new("(unknown)"); + ProtoUtils.EntityConversionState? entityConversionState = + this.internalOptions.ConvertOrchestrationEntityEvents + ? new(this.internalOptions.InsertEntityUnlocksOnCompletion) + : null; + try { - OrchestrationRuntimeState runtimeState = await this.BuildRuntimeStateAsync(request, cancellationToken); + OrchestrationRuntimeState runtimeState = await this.BuildRuntimeStateAsync( + request, + entityConversionState, + cancellationToken); + name = new TaskName(runtimeState.Name); this.Logger.ReceivedOrchestratorRequest( @@ -278,6 +310,7 @@ async Task OnRunOrchestratorAsync( runtimeState, shim, BehaviorOnContinueAsNew.Carryover, + request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails); result = executor.Execute(); } @@ -305,7 +338,8 @@ async Task OnRunOrchestratorAsync( request.InstanceId, result.CustomStatus, result.Actions, - completionToken); + completionToken, + entityConversionState); } else { @@ -400,14 +434,17 @@ async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, await this.client.CompleteActivityTaskAsync(response, cancellationToken: cancellation); } - async Task OnRunEntityBatchAsync(P.EntityBatchRequest request, CancellationToken cancellation) + async Task OnRunEntityBatchAsync( + EntityBatchRequest batchRequest, + CancellationToken cancellation, + string? completionToken = null, + List? operationInfos = null) { - var coreEntityId = DTCore.Entities.EntityId.FromString(request.InstanceId); + var coreEntityId = DTCore.Entities.EntityId.FromString(batchRequest.InstanceId!); EntityId entityId = new(coreEntityId.Name, coreEntityId.Key); TaskName name = new(entityId.Name); - EntityBatchRequest batchRequest = request.ToEntityBatchRequest(); EntityBatchResult? batchResult; try @@ -428,7 +465,7 @@ async Task OnRunEntityBatchAsync(P.EntityBatchRequest request, CancellationToken // so we return a non-retriable error-OperationResult for each operation in the batch. batchResult = new EntityBatchResult() { - Actions = new List(), // no actions + Actions = [], // no actions EntityState = batchRequest.EntityState, // state is unmodified Results = Enumerable.Repeat( new OperationResult() @@ -447,19 +484,19 @@ async Task OnRunEntityBatchAsync(P.EntityBatchRequest request, CancellationToken } catch (Exception frameworkException) { - // return a result with no results, same state, - // and which contains failure details + // return a result with failure details. + // this will cause the batch to be abandoned and retried + // (possibly after a delay and on a different worker). batchResult = new EntityBatchResult() { - Actions = new List(), - EntityState = batchRequest.EntityState, - Results = new List(), FailureDetails = new FailureDetails(frameworkException), }; } - // convert the result to protobuf format and send it back - P.EntityBatchResult response = batchResult.ToEntityBatchResult(); + P.EntityBatchResult response = batchResult.ToEntityBatchResult( + completionToken, + operationInfos?.Take(batchResult.Results?.Count ?? 0)); + await this.client.CompleteEntityTaskAsync(response, cancellationToken: cancellation); } } diff --git a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs index fd775928..be277a3a 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs @@ -22,4 +22,28 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions /// Gets or sets the gRPC call invoker to use. Will supersede when provided. /// public CallInvoker? CallInvoker { get; set; } + + /// + /// Gets the internal protocol options. These are used to control backend-dependent features. + /// + internal InternalOptions Internal { get; } = new(); + + /// + /// Internal options are not exposed directly, but configurable via . + /// + internal class InternalOptions + { + /// + /// Gets or sets a value indicating whether entity-related events appearing in orchestration histories should be + /// automatically converted back and forth between the old DT Core representation (JSON-encoded external events) + /// and the new protobuf representation (explicit history events), which is used by the DTS scheduler backend. + /// + public bool ConvertOrchestrationEntityEvents { get; set; } + + /// + /// Gets or sets a value indicating whether to automatically add entity + /// unlock events into the history when an orchestration terminates while holding an entity lock. + /// + public bool InsertEntityUnlocksOnCompletion { get; set; } + } } diff --git a/src/Worker/Grpc/GrpcOrchestrationRunner.cs b/src/Worker/Grpc/GrpcOrchestrationRunner.cs index 0092444f..d1737bf7 100644 --- a/src/Worker/Grpc/GrpcOrchestrationRunner.cs +++ b/src/Worker/Grpc/GrpcOrchestrationRunner.cs @@ -116,7 +116,8 @@ public static string LoadAndRun( request.InstanceId, result.CustomStatus, result.Actions, - completionToken: string.Empty /* doesn't apply */); + completionToken: string.Empty, /* doesn't apply */ + entityConversionState: null); byte[] responseBytes = response.ToByteArray(); return Convert.ToBase64String(responseBytes); } diff --git a/src/Worker/Grpc/Internal/InternalOptionsExtensions.cs b/src/Worker/Grpc/Internal/InternalOptionsExtensions.cs new file mode 100644 index 00000000..0739c15c --- /dev/null +++ b/src/Worker/Grpc/Internal/InternalOptionsExtensions.cs @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace Microsoft.DurableTask.Worker.Grpc.Internal; + +/// +/// Provides access to configuring internal options for the gRPC worker. +/// +public static class InternalOptionsExtensions +{ + /// + /// Configure the worker to use the default settings for connecting to the Azure Managed Durable Task service. + /// + /// The gRPC worker options. + /// + /// This is an internal API that supports the DurableTask infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new DurableTask release. + /// + public static void ConfigureForAzureManaged(this GrpcDurableTaskWorkerOptions options) + { + options.Internal.ConvertOrchestrationEntityEvents = true; + options.Internal.InsertEntityUnlocksOnCompletion = true; + } +} diff --git a/src/Worker/Grpc/Worker.Grpc.csproj b/src/Worker/Grpc/Worker.Grpc.csproj index c9bbf58e..991fef8f 100644 --- a/src/Worker/Grpc/Worker.Grpc.csproj +++ b/src/Worker/Grpc/Worker.Grpc.csproj @@ -17,5 +17,4 @@ -