-
Notifications
You must be signed in to change notification settings - Fork 175
/
Copy pathclient.go
253 lines (210 loc) · 8.41 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workflow
import (
"context"
"errors"
"fmt"
"time"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"
"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/backend"
durabletaskclient "github.com/dapr/durabletask-go/client"
dapr "github.com/dapr/go-sdk/client"
)
type Client struct {
conn *grpc.ClientConn
taskHubClient *durabletaskclient.TaskHubGrpcClient
}
type WorkflowIDReusePolicy struct {
OperationStatus []Status
Action CreateWorkflowAction
}
type CreateWorkflowAction = api.CreateOrchestrationAction
const (
ReuseIDActionError CreateWorkflowAction = api.REUSE_ID_ACTION_ERROR
ReuseIDActionIgnore CreateWorkflowAction = api.REUSE_ID_ACTION_IGNORE
ReuseIDActionTerminate CreateWorkflowAction = api.REUSE_ID_ACTION_TERMINATE
)
// WithInstanceID is an option to set an InstanceID when scheduling a new workflow.
func WithInstanceID(id string) api.NewOrchestrationOptions {
return api.WithInstanceID(api.InstanceID(id))
}
// TODO: Implement WithOrchestrationIdReusePolicy
// WithInput is an option to pass an input when scheduling a new workflow.
func WithInput(input any) api.NewOrchestrationOptions {
return api.WithInput(input)
}
// WithRawInput is an option to pass a byte slice as an input when scheduling a new workflow.
func WithRawInput(input string) api.NewOrchestrationOptions {
return api.WithRawInput(wrapperspb.String(input))
}
// WithStartTime is an option to set the start time when scheduling a new workflow.
func WithStartTime(time time.Time) api.NewOrchestrationOptions {
return api.WithStartTime(time)
}
func WithReuseIDPolicy(policy WorkflowIDReusePolicy) api.NewOrchestrationOptions {
return api.WithOrchestrationIdReusePolicy(&api.OrchestrationIdReusePolicy{
OperationStatus: convertStatusSlice(policy.OperationStatus),
Action: policy.Action,
})
}
// WithFetchPayloads is an option to return the payload from a workflow.
func WithFetchPayloads(fetchPayloads bool) api.FetchOrchestrationMetadataOptions {
return api.WithFetchPayloads(fetchPayloads)
}
// WithEventPayload is an option to send a payload with an event to a workflow.
func WithEventPayload(data any) api.RaiseEventOptions {
return api.WithEventPayload(data)
}
// WithRawEventData is an option to send a byte slice with an event to a workflow.
func WithRawEventData(data string) api.RaiseEventOptions {
return api.WithRawEventData(wrapperspb.String(data))
}
// WithOutput is an option to define an output when terminating a workflow.
func WithOutput(data any) api.TerminateOptions {
return api.WithOutput(data)
}
// WithRawOutput is an option to define a byte slice to output when terminating a workflow.
func WithRawOutput(data string) api.TerminateOptions {
return api.WithRawOutput(wrapperspb.String(data))
}
// WithRecursiveTerminate configures whether to terminate all sub-workflows created by the target workflow.
func WithRecursiveTerminate(recursive bool) api.TerminateOptions {
return api.WithRecursiveTerminate(recursive)
}
// WithRecursivePurge configures whether to purge all sub-workflows created by the target workflow.
func WithRecursivePurge(recursive bool) api.PurgeOptions {
return api.WithRecursivePurge(recursive)
}
type clientOption func(*clientOptions) error
type clientOptions struct {
daprClient dapr.Client
}
// WithDaprClient is an option to supply a custom dapr.Client to the workflow client.
func WithDaprClient(input dapr.Client) clientOption {
return func(opt *clientOptions) error {
opt.daprClient = input
return nil
}
}
// TODO: Implement mocks
// NewClient returns a workflow client.
func NewClient(opts ...clientOption) (*Client, error) {
options := new(clientOptions)
for _, configure := range opts {
if err := configure(options); err != nil {
return &Client{}, fmt.Errorf("failed to load options: %v", err)
}
}
var daprClient dapr.Client
var err error
if options.daprClient == nil {
daprClient, err = dapr.NewClient()
} else {
daprClient = options.daprClient
}
if err != nil {
return &Client{}, fmt.Errorf("failed to initialise dapr.Client: %v", err)
}
conn := daprClient.GrpcClientConn()
taskHubClient := durabletaskclient.NewTaskHubGrpcClient(conn, backend.DefaultLogger())
return &Client{
conn: conn,
taskHubClient: taskHubClient,
}, nil
}
// ScheduleNewWorkflow will start a workflow and return the ID and/or error.
func (c *Client) ScheduleNewWorkflow(ctx context.Context, workflow string, opts ...api.NewOrchestrationOptions) (id string, err error) {
if workflow == "" {
return "", errors.New("no workflow specified")
}
workflowID, err := c.taskHubClient.ScheduleNewOrchestration(ctx, workflow, opts...)
return string(workflowID), err
}
// FetchWorkflowMetadata will return the metadata for a given workflow InstanceID and/or error.
func (c *Client) FetchWorkflowMetadata(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*Metadata, error) {
if id == "" {
return nil, errors.New("no workflow id specified")
}
wfMetadata, err := c.taskHubClient.FetchOrchestrationMetadata(ctx, api.InstanceID(id), opts...)
if err != nil {
return nil, err
}
return convertMetadata(wfMetadata), err
}
// WaitForWorkflowStart will wait for a given workflow to start and return metadata and/or an error.
func (c *Client) WaitForWorkflowStart(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*Metadata, error) {
if id == "" {
return nil, errors.New("no workflow id specified")
}
wfMetadata, err := c.taskHubClient.WaitForOrchestrationStart(ctx, api.InstanceID(id), opts...)
if err != nil {
return nil, err
}
return convertMetadata(wfMetadata), err
}
// WaitForWorkflowCompletion will block pending the completion of a specified workflow and return the metadata and/or error.
func (c *Client) WaitForWorkflowCompletion(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*Metadata, error) {
if id == "" {
return nil, errors.New("no workflow id specified")
}
wfMetadata, err := c.taskHubClient.WaitForOrchestrationCompletion(ctx, api.InstanceID(id), opts...)
if err != nil {
return nil, err
}
return convertMetadata(wfMetadata), err
}
// TerminateWorkflow will stop a given workflow and return an error output.
func (c *Client) TerminateWorkflow(ctx context.Context, id string, opts ...api.TerminateOptions) error {
if id == "" {
return errors.New("no workflow id specified")
}
return c.taskHubClient.TerminateOrchestration(ctx, api.InstanceID(id), opts...)
}
// RaiseEvent will raise an event on a given workflow and return an error output.
func (c *Client) RaiseEvent(ctx context.Context, id, eventName string, opts ...api.RaiseEventOptions) error {
if id == "" {
return errors.New("no workflow id specified")
}
if eventName == "" {
return errors.New("no event name specified")
}
return c.taskHubClient.RaiseEvent(ctx, api.InstanceID(id), eventName, opts...)
}
// SuspendWorkflow will pause a given workflow and return an error output.
func (c *Client) SuspendWorkflow(ctx context.Context, id, reason string) error {
if id == "" {
return errors.New("no workflow id specified")
}
return c.taskHubClient.SuspendOrchestration(ctx, api.InstanceID(id), reason)
}
// ResumeWorkflow will resume a suspended workflow and return an error output.
func (c *Client) ResumeWorkflow(ctx context.Context, id, reason string) error {
if id == "" {
return errors.New("no workflow id specified")
}
return c.taskHubClient.ResumeOrchestration(ctx, api.InstanceID(id), reason)
}
// PurgeWorkflow will purge a given workflow and return an error output.
// NOTE: The workflow must be in a terminated or completed state.
func (c *Client) PurgeWorkflow(ctx context.Context, id string, opts ...api.PurgeOptions) error {
if id == "" {
return errors.New("no workflow id specified")
}
return c.taskHubClient.PurgeOrchestrationState(ctx, api.InstanceID(id), opts...)
}
func (c *Client) Close() {
_ = c.conn.Close()
}