Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for suspend/resume #195

Merged
merged 5 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@

(Add new notes here)

## v1.2.1

### New

* Support suspend/resume of orchestrations

### Updates

* SqlOrchestrationService.WaitForInstanceAsync no longer throws `TimeoutException` - only `OperationCanceledException` (previously could be either, depending on timing)
* Fix default DateTime values to have DateTimeKind of UTC (instead of Unspecified)

## v1.2.0

### New
Expand Down
1 change: 0 additions & 1 deletion src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,6 @@ BEGIN
E.[InstanceID] = I.[InstanceID]
WHERE
I.TaskHub = @TaskHub AND
I.[RuntimeStatus] NOT IN ('Suspended') AND
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added long before we actually implemented suspend/resume in DurableTask.Core, and was based on the assumption that we wouldn't even load suspended orchestrations. In the final design, however, we decided that we needed to continue loading suspended orchestrations, so this line needed to be removed.

(I.[LockExpiration] IS NULL OR I.[LockExpiration] < @now) AND
(E.[VisibleTime] IS NULL OR E.[VisibleTime] < @now)

Expand Down
15 changes: 2 additions & 13 deletions src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public override Task DeleteAsync(bool deleteInstanceStore)
currentStatus = SqlUtils.GetRuntimeStatus(reader);
isRunning =
currentStatus == OrchestrationStatus.Running ||
currentStatus == OrchestrationStatus.Suspended ||
currentStatus == OrchestrationStatus.Pending;
}
else
Expand Down Expand Up @@ -570,19 +571,7 @@ public override async Task<OrchestrationState> WaitForOrchestrationAsync(
return state;
}

try
{
await Task.Delay(TimeSpan.FromSeconds(1), combinedCts.Token);
}
catch (TaskCanceledException)
{
if (timeoutCts.Token.IsCancellationRequested)
{
throw new TimeoutException($"A caller-specified timeout of {timeout} has expired, but instance '{instanceId}' is still in an {state?.OrchestrationStatus.ToString() ?? "unknown"} state.");
}

throw;
}
await Task.Delay(TimeSpan.FromSeconds(1), combinedCts.Token);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above code was deleted to ensure that we only throw OperationCanceledException (or TaskCanceledException) if this times out. Otherwise, sometimes we'd throw TimeoutException and sometimes we'd throw OperationCanceledException, depending on which line of code was running when the cancellation token was signaled.

}
}

Expand Down
29 changes: 20 additions & 9 deletions src/DurableTask.SqlServer/SqlUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ public static HistoryEvent GetHistoryEvent(this DbDataReader reader, bool isOrch
TimerId = GetTaskId(reader),
};
break;
case EventType.ExecutionSuspended:
historyEvent = new ExecutionSuspendedEvent(eventId, GetPayloadText(reader));
break;
case EventType.ExecutionResumed:
historyEvent = new ExecutionResumedEvent(eventId, GetPayloadText(reader));
break;
default:
throw new InvalidOperationException($"Don't know how to interpret '{eventType}'.");
}
Expand Down Expand Up @@ -247,10 +253,10 @@ public static OrchestrationState GetOrchestrationState(this DbDataReader reader)

var state = new OrchestrationState
{
CompletedTime = reader.GetUtcDateTimeOrNull(reader.GetOrdinal("CompletedTime")) ?? default,
CreatedTime = reader.GetUtcDateTimeOrNull(reader.GetOrdinal("CreatedTime")) ?? default,
CompletedTime = GetUtcDateTime(reader, "CompletedTime"),
CreatedTime = GetUtcDateTime(reader, "CreatedTime"),
Input = reader.GetStringOrNull(reader.GetOrdinal("InputText")),
LastUpdatedTime = reader.GetUtcDateTimeOrNull(reader.GetOrdinal("LastUpdatedTime")) ?? default,
LastUpdatedTime = GetUtcDateTime(reader, "LastUpdatedTime"),
Name = GetName(reader),
Version = GetVersion(reader),
OrchestrationInstance = new OrchestrationInstance
Expand Down Expand Up @@ -411,23 +417,28 @@ internal static string GetInstanceId(DbDataReader reader)

static DateTime GetVisibleTime(DbDataReader reader)
{
int ordinal = reader.GetOrdinal("VisibleTime");
return GetUtcDateTime(reader, ordinal);
return GetUtcDateTime(reader, "VisibleTime");
}

static DateTime GetTimestamp(DbDataReader reader)
{
int ordinal = reader.GetOrdinal("Timestamp");
return GetUtcDateTime(reader, ordinal);
return GetUtcDateTime(reader, "Timestamp");
}

static DateTime? GetUtcDateTimeOrNull(this DbDataReader reader, int columnIndex)
static DateTime GetUtcDateTime(DbDataReader reader, string columnName)
{
return reader.IsDBNull(columnIndex) ? (DateTime?)null : GetUtcDateTime(reader, columnIndex);
int ordinal = reader.GetOrdinal(columnName);
return GetUtcDateTime(reader, ordinal);
}

static DateTime GetUtcDateTime(DbDataReader reader, int ordinal)
{
if (reader.IsDBNull(ordinal))
{
// Note that some serializers (like protobuf) won't accept non-UTC DateTime objects.
return DateTime.SpecifyKind(default, DateTimeKind.Utc);
}

// The SQL client always assumes DateTimeKind.Unspecified. We need to modify the result so that it knows it is UTC.
return DateTime.SpecifyKind(reader.GetDateTime(ordinal), DateTimeKind.Utc);
}
Expand Down
2 changes: 1 addition & 1 deletion src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>2</MinorVersion>
<PatchVersion>0</PatchVersion>
<PatchVersion>1</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).$(MinorVersion).0.0</AssemblyVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ async Task ValidateDatabaseSchemaAsync(TestDatabase database, string schemaName
schemaName);
Assert.Equal(1, currentSchemaVersion.Major);
Assert.Equal(2, currentSchemaVersion.Minor);
Assert.Equal(0, currentSchemaVersion.Patch);
Assert.Equal(1, currentSchemaVersion.Patch);
}

sealed class TestDatabase : IDisposable
Expand Down
57 changes: 57 additions & 0 deletions test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -768,5 +768,62 @@ public async Task TraceContextFlowCorrectly()
Assert.True(activitySpan.Duration > delay);
Assert.True(activitySpan.Duration < delay * 2);
}

[Fact]
public async Task SuspendAndResumeInstance()
{
TaskCompletionSource<int> tcs = null;

const int EventCount = 5;
string orchestrationName = "SuspendResumeOrchestration";

TestInstance<string> instance = await this.testService.RunOrchestration<int, string>(
null,
orchestrationName,
implementation: async (ctx, _) =>
{
tcs = new TaskCompletionSource<int>();

int i;
for (i = 0; i < EventCount; i++)
{
await tcs.Task;
tcs = new TaskCompletionSource<int>();
}

return i;
},
onEvent: (ctx, name, value) =>
{
Assert.Equal("Event" + value, name);
tcs.TrySetResult(int.Parse(value));
});

// Wait for the orchestration to finish starting
await instance.WaitForStart();

// Suspend the orchestration so that it won't process any new events
await instance.SuspendAsync();

// Raise the events, which should get buffered but not consumed
for (int i = 0; i < EventCount; i++)
{
await instance.RaiseEventAsync($"Event{i}", i);
}

// Make sure that the orchestration *doesn't* complete
await Assert.ThrowsAnyAsync<OperationCanceledException>(
() => instance.WaitForCompletion(TimeSpan.FromSeconds(3), doNotAdjustTimeout: true));

// Confirm that the orchestration is in a suspended state
OrchestrationState state = await instance.GetStateAsync();
Assert.Equal(OrchestrationStatus.Suspended, state.OrchestrationStatus);

// Resume the orchestration
await instance.ResumeAsync();

// Now the orchestration should complete immediately
await instance.WaitForCompletion(timeout: TimeSpan.FromSeconds(3), expectedOutput: EventCount);
}
}
}
19 changes: 17 additions & 2 deletions test/DurableTask.SqlServer.Tests/Utils/TestInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,13 @@ public async Task<OrchestrationState> WaitForCompletion(
OrchestrationStatus expectedStatus = OrchestrationStatus.Completed,
object expectedOutput = null,
string expectedOutputRegex = null,
bool continuedAsNew = false)
bool continuedAsNew = false,
bool doNotAdjustTimeout = false)
{
AdjustTimeout(ref timeout);
if (!doNotAdjustTimeout)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to add this, otherwise we can't write test code that waits for timeouts (like I do in my new test) when a debugger is attached.

{
AdjustTimeout(ref timeout);
}

OrchestrationState state = await this.client.WaitForOrchestrationAsync(this.GetInstanceForAnyExecution(), timeout);
Assert.NotNull(state);
Expand Down Expand Up @@ -158,6 +162,17 @@ internal async Task RestartAsync(TInput newInput, OrchestrationStatus[] dedupeSt
this.instance.ExecutionId = newInstance.ExecutionId;
}


internal Task SuspendAsync(string reason = null)
{
return this.client.SuspendInstanceAsync(this.instance, reason);
}

internal Task ResumeAsync(string reason = null)
{
return this.client.ResumeInstanceAsync(this.instance, reason);
}

static void AdjustTimeout(ref TimeSpan timeout)
{
timeout = timeout.AdjustForDebugging();
Expand Down