-
Notifications
You must be signed in to change notification settings - Fork 345
/
Copy pathDaprWorkflowClient.cs
295 lines (283 loc) · 15.5 KB
/
DaprWorkflowClient.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using System;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Client;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
namespace Dapr.Workflow
{
/// <summary>
/// Defines client operations for managing Dapr Workflow instances.
/// </summary>
/// <remarks>
/// This is an alternative to the general purpose Dapr client. It uses a gRPC connection to send
/// commands directly to the workflow engine, bypassing the Dapr API layer.
/// </remarks>
public class DaprWorkflowClient : IAsyncDisposable
{
readonly DurableTaskClient innerClient;
/// <summary>
/// Initializes a new instance of the <see cref="DaprWorkflowClient"/> class.
/// </summary>
/// <param name="innerClient">The Durable Task client used to communicate with the Dapr sidecar.</param>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="innerClient"/> is <c>null</c>.</exception>
public DaprWorkflowClient(DurableTaskClient innerClient)
{
this.innerClient = innerClient ?? throw new ArgumentNullException(nameof(innerClient));
}
/// <summary>
/// Schedules a new workflow instance for execution.
/// </summary>
/// <param name="name">The name of the workflow to schedule.</param>
/// <param name="instanceId">
/// The unique ID of the workflow instance to schedule. If not specified, a new GUID value is used.
/// </param>
/// <param name="startTime">
/// The time when the workflow instance should start executing. If not specified or if a date-time in the past
/// is specified, the workflow instance will be scheduled immediately.
/// </param>
/// <param name="input">
/// The optional input to pass to the scheduled workflow instance. This must be a serializable value.
/// </param>
public Task<string> ScheduleNewWorkflowAsync(
string name,
string? instanceId = null,
object? input = null,
DateTime? startTime = null)
{
StartOrchestrationOptions options = new(instanceId, startTime);
return this.innerClient.ScheduleNewOrchestrationInstanceAsync(name, input, options);
}
/// <summary>
/// Fetches runtime state for the specified workflow instance.
/// </summary>
/// <param name="instanceId">The unique ID of the workflow instance to fetch.</param>
/// <param name="getInputsAndOutputs">
/// Specify <c>true</c> to fetch the workflow instance's inputs, outputs, and custom status, or <c>false</c> to
/// omit them. Defaults to true.
/// </param>
public async Task<WorkflowState> GetWorkflowStateAsync(string instanceId, bool getInputsAndOutputs = true)
{
OrchestrationMetadata? metadata = await this.innerClient.GetInstancesAsync(
instanceId,
getInputsAndOutputs);
return new WorkflowState(metadata);
}
/// <summary>
/// Waits for a workflow to start running and returns a <see cref="WorkflowState"/> object that contains metadata
/// about the started workflow.
/// </summary>
/// <remarks>
/// <para>
/// A "started" workflow instance is any instance not in the <see cref="WorkflowRuntimeStatus.Pending"/> state.
/// </para><para>
/// This method will return a completed task if the workflow has already started running or has already completed.
/// </para>
/// </remarks>
/// <param name="instanceId">The unique ID of the workflow instance to wait for.</param>
/// <param name="getInputsAndOutputs">
/// Specify <c>true</c> to fetch the workflow instance's inputs, outputs, and custom status, or <c>false</c> to
/// omit them. Setting this value to <c>false</c> can help minimize the network bandwidth, serialization, and memory costs
/// associated with fetching the instance metadata.
/// </param>
/// <param name="cancellation">A <see cref="CancellationToken"/> that can be used to cancel the wait operation.</param>
/// <returns>
/// Returns a <see cref="WorkflowState"/> record that describes the workflow instance and its execution
/// status. If the specified workflow isn't found, the <see cref="WorkflowState.Exists"/> value will be <c>false</c>.
/// </returns>
public async Task<WorkflowState> WaitForWorkflowStartAsync(
string instanceId,
bool getInputsAndOutputs = true,
CancellationToken cancellation = default)
{
OrchestrationMetadata metadata = await this.innerClient.WaitForInstanceStartAsync(
instanceId,
getInputsAndOutputs,
cancellation);
return new WorkflowState(metadata);
}
/// <summary>
/// Waits for a workflow to complete and returns a <see cref="WorkflowState"/>
/// object that contains metadata about the started instance.
/// </summary>
/// <remarks>
/// <para>
/// A "completed" workflow instance is any instance in one of the terminal states. For example, the
/// <see cref="WorkflowRuntimeStatus.Completed"/>, <see cref="WorkflowRuntimeStatus.Failed"/>, or
/// <see cref="WorkflowRuntimeStatus.Terminated"/> states.
/// </para><para>
/// Workflows are long-running and could take hours, days, or months before completing.
/// Workflows can also be eternal, in which case they'll never complete unless terminated.
/// In such cases, this call may block indefinitely, so care must be taken to ensure appropriate timeouts are
/// enforced using the <paramref name="cancellation"/> parameter.
/// </para><para>
/// If a workflow instance is already complete when this method is called, the method will return immediately.
/// </para>
/// </remarks>
/// <inheritdoc cref="WaitForWorkflowStartAsync(string, bool, CancellationToken)"/>
public async Task<WorkflowState> WaitForWorkflowCompletionAsync(
string instanceId,
bool getInputsAndOutputs = true,
CancellationToken cancellation = default)
{
OrchestrationMetadata metadata = await this.innerClient.WaitForInstanceCompletionAsync(
instanceId,
getInputsAndOutputs,
cancellation);
return new WorkflowState(metadata);
}
/// <summary>
/// Terminates a running workflow instance and updates its runtime status to
/// <see cref="WorkflowRuntimeStatus.Terminated"/>.
/// </summary>
/// <remarks>
/// <para>
/// This method internally enqueues a "terminate" message in the task hub. When the task hub worker processes
/// this message, it will update the runtime status of the target instance to
/// <see cref="WorkflowRuntimeStatus.Terminated"/>. You can use the
/// <see cref="WaitForWorkflowCompletionAsync(string, bool, CancellationToken)"/> to wait for the instance to reach
/// the terminated state.
/// </para>
/// <para>
/// Terminating a workflow instance has no effect on any in-flight activity function executions
/// or child workflows that were started by the terminated instance. Those actions will continue to run
/// without interruption. However, their results will be discarded. If you want to terminate child-workflows,
/// you must issue separate terminate commands for each child workflow instance individually.
/// </para><para>
/// At the time of writing, there is no way to terminate an in-flight activity execution.
/// </para>
/// </remarks>
/// <param name="instanceId">The ID of the workflow instance to terminate.</param>
/// <param name="output">The optional output to set for the terminated workflow instance.</param>
/// <param name="cancellation">
/// The cancellation token. This only cancels enqueueing the termination request to the backend. Does not abort
/// termination of the workflow once enqueued.
/// </param>
/// <returns>A task that completes when the terminate message is enqueued.</returns>
public Task TerminateWorkflowAsync(
string instanceId,
string? output = null,
CancellationToken cancellation = default)
{
return this.innerClient.TerminateInstanceAsync(instanceId, output, cancellation);
}
/// <summary>
/// Sends an event notification message to a waiting workflow instance.
/// </summary>
/// <remarks>
/// <para>
/// In order to handle the event, the target workflow instance must be waiting for an
/// event named <paramref name="eventName"/> using the
/// <see cref="WorkflowContext.WaitForExternalEventAsync{T}(string, CancellationToken)"/> API.
/// If the target workflow instance is not yet waiting for an event named <paramref name="eventName"/>,
/// then the event will be saved in the workflow instance state and dispatched immediately when the
/// workflow calls <see cref="WorkflowContext.WaitForExternalEventAsync{T}(string, CancellationToken)"/>.
/// This event saving occurs even if the workflow has canceled its wait operation before the event was received.
/// </para><para>
/// Workflows can wait for the same event name multiple times, so sending multiple events with the same name is
/// allowed. Each external event received by an workflow will complete just one task returned by the
/// <see cref="WorkflowContext.WaitForExternalEventAsync{T}(string, CancellationToken)"/> method.
/// </para><para>
/// Raised events for a completed or non-existent workflow instance will be silently discarded.
/// </para>
/// </remarks>
/// <param name="instanceId">The ID of the workflow instance that will handle the event.</param>
/// <param name="eventName">The name of the event. Event names are case-insensitive.</param>
/// <param name="eventPayload">The serializable data payload to include with the event.</param>
/// <param name="cancellation">
/// The cancellation token. This only cancels enqueueing the event to the backend. Does not abort sending the event
/// once enqueued.
/// </param>
/// <returns>A task that completes when the event notification message has been enqueued.</returns>
/// <exception cref="ArgumentNullException">
/// Thrown if <paramref name="instanceId"/> or <paramref name="eventName"/> is null or empty.
/// </exception>
public Task RaiseEventAsync(
string instanceId,
string eventName,
object? eventPayload = null,
CancellationToken cancellation = default)
{
return this.innerClient.RaiseEventAsync(instanceId, eventName, eventPayload, cancellation);
}
/// <summary>
/// Suspends a workflow instance, halting processing of it until
/// <see cref="ResumeWorkflowAsync(string, string, CancellationToken)" /> is used to resume the workflow.
/// </summary>
/// <param name="instanceId">The instance ID of the workflow to suspend.</param>
/// <param name="reason">The optional suspension reason.</param>
/// <param name="cancellation">
/// A <see cref="CancellationToken"/> that can be used to cancel the suspend operation. Note, cancelling this token
/// does <b>not</b> resume the workflow if suspend was successful.
/// </param>
/// <returns>A task that completes when the suspend has been committed to the backend.</returns>
public Task SuspendWorkflowAsync(
string instanceId,
string? reason = null,
CancellationToken cancellation = default)
{
return this.innerClient.SuspendInstanceAsync(instanceId, reason, cancellation);
}
/// <summary>
/// Resumes a workflow instance that was suspended via <see cref="SuspendWorkflowAsync(string, string, CancellationToken)" />.
/// </summary>
/// <param name="instanceId">The instance ID of the workflow to resume.</param>
/// <param name="reason">The optional resume reason.</param>
/// <param name="cancellation">
/// A <see cref="CancellationToken"/> that can be used to cancel the resume operation. Note, cancelling this token
/// does <b>not</b> re-suspend the workflow if resume was successful.
/// </param>
/// <returns>A task that completes when the resume has been committed to the backend.</returns>
public Task ResumeWorkflowAsync(
string instanceId,
string? reason = null,
CancellationToken cancellation = default)
{
return this.innerClient.ResumeInstanceAsync(instanceId, reason, cancellation);
}
/// <summary>
/// Purges workflow instance state from the workflow state store.
/// </summary>
/// <remarks>
/// <para>
/// This method can be used to permanently delete workflow metadata from the underlying state store,
/// including any stored inputs, outputs, and workflow history records. This is often useful for implementing
/// data retention policies and for keeping storage costs minimal. Only workflow instances in the
/// <see cref="WorkflowRuntimeStatus.Completed"/>, <see cref="WorkflowRuntimeStatus.Failed"/>, or
/// <see cref="WorkflowRuntimeStatus.Terminated"/> state can be purged.
/// </para>
/// </remarks>
/// <param name="instanceId">The unique ID of the workflow instance to purge.</param>
/// <param name="cancellation">
/// A <see cref="CancellationToken"/> that can be used to cancel the purge operation.
/// </param>
/// <returns>
/// Returns a task that completes when the purge operation has completed. The value of this task will be
/// <c>true</c> if the workflow state was found and purged successfully; <c>false</c> otherwise.
/// </returns>
public async Task<bool> PurgeInstanceAsync(string instanceId, CancellationToken cancellation = default)
{
PurgeResult result = await this.innerClient.PurgeInstanceAsync(instanceId, cancellation);
return result.PurgedInstanceCount > 0;
}
/// <summary>
/// Disposes any unmanaged resources associated with this client.
/// </summary>
public ValueTask DisposeAsync()
{
return ((IAsyncDisposable)this.innerClient).DisposeAsync();
}
}
}