Skip to content

Commit

Permalink
Update terminate logic to handle scheduled orchestrations
Browse files Browse the repository at this point in the history
  • Loading branch information
cgillum committed Oct 11, 2024
1 parent e477b00 commit 7d58dee
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 38 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v1.4.1 (Unreleased)

### Updates

* Fix issue where scheduled orchestrations aren't immediately terminated ([#178](https://github.com/microsoft/durabletask-mssql/issues/178))

## v1.4.0

### New
Expand Down
1 change: 1 addition & 0 deletions src/DurableTask.SqlServer/DTUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public static bool HasPayload(HistoryEvent e)
return historyEvent.EventType switch
{
EventType.ExecutionStarted => ((ExecutionStartedEvent)historyEvent).Version,
EventType.SubOrchestrationInstanceCreated => ((SubOrchestrationInstanceCreatedEvent)historyEvent).Version,
EventType.TaskScheduled => ((TaskScheduledEvent)historyEvent).Version,
_ => null,
};
Expand Down
85 changes: 55 additions & 30 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -469,45 +469,70 @@ BEGIN

DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub()

DECLARE @existingStatus varchar(30) = (
SELECT TOP 1 existing.[RuntimeStatus]
FROM Instances existing WITH (HOLDLOCK)
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID
)
DECLARE @existingStatus varchar(30)
DECLARE @existingLockExpiration datetime2(7)

-- Get the status of an existing orchestration
SELECT TOP 1
@existingStatus = existing.[RuntimeStatus],
@existingLockExpiration = existing.[LockExpiration]
FROM Instances existing WITH (HOLDLOCK)
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID

IF @existingStatus IS NULL
BEGIN
ROLLBACK TRANSACTION;
THROW 50000, 'The instance does not exist.', 1;
END
-- If the instance is already completed, no need to terminate it.
IF @existingStatus IN ('Pending', 'Running')

DECLARE @now datetime2(7) = SYSUTCDATETIME()

IF @existingStatus IN ('Running', 'Pending')
BEGIN
IF NOT EXISTS (
SELECT TOP (1) 1 FROM NewEvents
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID AND [EventType] = 'ExecutionTerminated'
)
-- Create a payload to store the reason, if any
DECLARE @PayloadID uniqueidentifier = NULL
IF @Reason IS NOT NULL
BEGIN
-- Payloads are stored separately from the events
DECLARE @PayloadID uniqueidentifier = NULL
IF @Reason IS NOT NULL
-- Note that we don't use the Reason column for the Reason with terminate events
SET @PayloadID = NEWID()
INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text])
VALUES (@TaskHub, @InstanceID, @PayloadID, @Reason)
END

-- Check the status of the orchestration to determine which termination path to take
IF @existingStatus = 'Pending' AND (@existingLockExpiration IS NULL OR @existingLockExpiration <= @now)
BEGIN
-- The orchestration hasn't started yet - transition it directly to the Terminated state and delete
-- any pending messages
UPDATE Instances SET
[RuntimeStatus] = 'Terminated',
[LastUpdatedTime] = @now,
[CompletedTime] = @now,
[OutputPayloadID] = @PayloadID,
[LockExpiration] = NULL -- release the lock, if any
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID

DELETE FROM NewEvents WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID
END
ELSE
BEGIN
-- The orchestration has actually started running in this case
IF NOT EXISTS (
SELECT TOP (1) 1 FROM NewEvents
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID AND [EventType] = 'ExecutionTerminated'
)
BEGIN
-- Note that we don't use the Reason column for the Reason with terminate events
SET @PayloadID = NEWID()
INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text])
VALUES (@TaskHub, @InstanceID, @PayloadID, @Reason)
INSERT INTO NewEvents (
[TaskHub],
[InstanceID],
[EventType],
[PayloadID]
) VALUES (
@TaskHub,
@InstanceID,
'ExecutionTerminated',
@PayloadID)
END

INSERT INTO NewEvents (
[TaskHub],
[InstanceID],
[EventType],
[PayloadID]
) VALUES (
@TaskHub,
@InstanceID,
'ExecutionTerminated',
@PayloadID)
END
END

Expand Down Expand Up @@ -1444,7 +1469,7 @@ BEGIN
-- Instance IDs can be overwritten only if the orchestration is in a terminal state
IF @existingStatus NOT IN ('Failed')
BEGIN
DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewing instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus);
DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewind instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus);
THROW 50001, @msg, 1;
END

Expand Down
8 changes: 7 additions & 1 deletion src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,14 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync(
IList<TaskMessage> orchestratorMessages,
IList<TaskMessage> timerMessages,
TaskMessage continuedAsNewMessage,
OrchestrationState orchestrationState)
OrchestrationState? orchestrationState)
{
if (orchestrationState is null || !newRuntimeState.IsValid)
{
// The work item was invalid. We can't do anything with it so we ignore it.
return;
}

ExtendedOrchestrationWorkItem currentWorkItem = (ExtendedOrchestrationWorkItem)workItem;

this.traceHelper.CheckpointStarting(orchestrationState);
Expand Down
2 changes: 1 addition & 1 deletion src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>4</MinorVersion>
<PatchVersion>0</PatchVersion>
<PatchVersion>1</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).$(MinorVersion).0.0</AssemblyVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ async Task ValidateDatabaseSchemaAsync(TestDatabase database, string schemaName
schemaName);
Assert.Equal(1, currentSchemaVersion.Major);
Assert.Equal(4, currentSchemaVersion.Minor);
Assert.Equal(0, currentSchemaVersion.Patch);
Assert.Equal(1, currentSchemaVersion.Patch);
}

sealed class TestDatabase : IDisposable
Expand Down
24 changes: 24 additions & 0 deletions test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -862,5 +862,29 @@ await Assert.ThrowsAnyAsync<OperationCanceledException>(
// Now the orchestration should complete immediately
await instance.WaitForCompletion(timeout: TimeSpan.FromSeconds(3), expectedOutput: EventCount);
}

[Fact]
public async Task TerminateScheduledOrchestration()
{
string orchestrationName = "ScheduledOrchestration";

// Does nothing except return the original input
TestInstance<object> instance = await this.testService.RunOrchestration(
input: (object)null,
orchestrationName,
version: null,
instanceId: null,
scheduledStartTime: DateTime.UtcNow.AddSeconds(30),
implementation: (ctx, input) => Task.FromResult("done"));

// Terminate the orchestration before it starts
await instance.TerminateAsync("Bye!");

await instance.WaitForCompletion(
expectedStatus: OrchestrationStatus.Terminated,
expectedOutput: "Bye!");

LogAssert.NoWarningsOrErrors(this.testService.LogProvider);
}
}
}
69 changes: 64 additions & 5 deletions test/DurableTask.SqlServer.Tests/Utils/TestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,32 @@ public Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
activities);
}

public Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
TInput input,
string orchestrationName,
string version,
string instanceId,
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
Action<OrchestrationContext, string, string> onEvent = null,
params (string name, TaskActivity activity)[] activities)
{
return this.RunOrchestration(
input,
orchestrationName,
version,
instanceId,
scheduledStartTime: null,
implementation,
onEvent,
activities);
}

public async Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
TInput input,
string orchestrationName,
string version,
string instanceId,
DateTime? scheduledStartTime,
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
Action<OrchestrationContext, string, string> onEvent = null,
params (string name, TaskActivity activity)[] activities)
Expand All @@ -147,19 +168,43 @@ public async Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
inputGenerator: i => input,
orchestrationName: orchestrationName,
version: version,
scheduledStartTime: scheduledStartTime,
implementation,
onEvent,
activities);

return instances[0];
}

public Task<IReadOnlyList<TestInstance<TInput>>> RunOrchestrations<TOutput, TInput>(
int count,
Func<int, string> instanceIdGenerator,
Func<int, TInput> inputGenerator,
string orchestrationName,
string version,
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
Action<OrchestrationContext, string, string> onEvent = null,
params (string name, TaskActivity activity)[] activities)
{
return this.RunOrchestrations(
count,
instanceIdGenerator,
inputGenerator,
orchestrationName,
version,
scheduledStartTime: null,
implementation,
onEvent,
activities);
}

public async Task<IReadOnlyList<TestInstance<TInput>>> RunOrchestrations<TOutput, TInput>(
int count,
Func<int, string> instanceIdGenerator,
Func<int, TInput> inputGenerator,
string orchestrationName,
string version,
DateTime? scheduledStartTime,
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
Action<OrchestrationContext, string, string> onEvent = null,
params (string name, TaskActivity activity)[] activities)
Expand All @@ -178,11 +223,25 @@ public async Task<IReadOnlyList<TestInstance<TInput>>> RunOrchestrations<TOutput
TInput input = inputGenerator != null ? inputGenerator(i) : default;

DateTime utcNow = DateTime.UtcNow;
OrchestrationInstance instance = await this.client.CreateOrchestrationInstanceAsync(
orchestrationName,
version,
instanceId,
input);

OrchestrationInstance instance;
if (scheduledStartTime.HasValue)
{
instance = await this.client.CreateScheduledOrchestrationInstanceAsync(
orchestrationName,
version,
instanceId,
input,
startAt: scheduledStartTime.Value);
}
else
{
instance = await this.client.CreateOrchestrationInstanceAsync(
orchestrationName,
version,
instanceId,
input);
}

return new TestInstance<TInput>(
this.client,
Expand Down

0 comments on commit 7d58dee

Please sign in to comment.