-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathrunner.go
287 lines (246 loc) · 8.81 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package ep
import (
"context"
)
var _ = registerGob(&passThrough{}, &pick{}, &tail{})
// UnknownSize is used when size cannot be estimated
const UnknownSize = -1
// equals should be implemented by ALL entities in the system to allow
// equality check
type equals interface {
// Equals returns true if this equals to other, so they both resolve to same result.
// for example:
// - runner of 1+2 is not equal to runner of 1+3
// - batch(size:200) is equal to batch(size:300) as final rows will be the same
// - exchange(type:gather, id:8) is equal to exchange(type:gather, id:10), even though ids are different
Equals(other interface{}) bool
}
// returns should be implemented by ALL entities in the system to allow
// types check
// TODO: consider it make it public after interfaces restructure
type returns interface {
// Returns the constant list of data types that are produced by this entity.
//
// NOTE: Violation of meeting these defined types (either by producing
// mismatching number of Data objects within the produced Datasets, or by
// returning incorrect types) may result in a panic or worse - incorrect
// results
//
// NOTE: If you need to annotate the returned data with names for
// referencing later, use the `As()` helper function
//
// NOTE: In some cases you may not know the returned types ahead of time,
// because it's somehow depends on the input types. For such cases, use the
// Wildcard type.
Returns() []Type
}
// Runner represents objects that can receive a stream of input datasets,
// manipulate them in some way (filter, mapping, reduction, expansion, etc.)
// and produce a new stream of the formatted values.
// NOTE: Some Runners will run concurrently, this it's important to not modify
// the input in-place. Instead, copy/create a new dataset and use that
type Runner interface {
equals
returns // Runner must declare its return types
// Run the manipulation code. Receive datasets from the `inp` stream, cast
// and modify them as needed (no in-place), and send the results to the
// `out` stream. Return when the `inp` is closed.
//
// NOTE: This function will run concurrently with other runners, so it needs
// to be thread-safe. Ensure to clean up any created resources, including
// goroutines, files, connections, etc. Closing the provided `inp` and `out`
// channels is unnecessary as it's handled by the code that triggered this
// Run() function. But, of course, if this Runner uses other Runners and
// creates its own input and output channels, it should make sure to close
// them as needed
//
// NOTE: For long-running producing runners (runners that given a small
// input can produce un-proportionally large output, like scans, reading
// from file, etc.), you should receive from the context's Done() channel to
// know to break early in case of cancellation or an error to avoid doing
// extra work. For most Runners, this is not as critical, because their
// input will just close early.
//
// NOTE: By convention, empty Datasets should never be sent to `out`
// stream. The behavior of other Runners in case they receive empty
// Datasets to `inp` stream is undefined.
Run(ctx context.Context, inp, out chan Dataset) error
}
// RunnerArgs is a Runner that also exposes a list of argument types that it
// must accept as input.
type RunnerArgs interface {
Runner // it's a Runner
// Args returns the list of types that the runner must accept as input
Args() []Type
}
// RunnerPlan is a Runner that also acts as a Runner constructor. This is useful
// for cases when the Runner needs to be somehow configured, or even replaced
// altogether based on input arguments
type RunnerPlan interface {
Runner // it's a Runner
// Plan allows the Runner to plan itself given an arbitrary argument. The
// argument is context-dependent: it can be an AST node, or a composite
// object containing multiple properties.
Plan(ctx context.Context, arg interface{}) (Runner, error)
}
// RunnerExec is a Runner that also supports command execution.
type RunnerExec interface {
Runner // it's a Runner
// Exec executes a command and returns last inserted id, a number of rows
// affected by this command, and an error.
Exec(context.Context) (lastID int64, rowsAffected int64, err error)
}
// FilterRunner is a Runner that also exposes the ability to choose which
// results to return, and which are irrelevant and can be replaced with dummy
// placeholder
type FilterRunner interface {
Runner // it's a Runner
// Filter modifies internal Runner to return only the columns that their
// corresponding 'keep' values are true.
// length of 'keep' should be same as internal Runner's return types
Filter(keep []bool)
}
// ScopesRunner is a Runner that also exposes the ability
// to get all scopes involved
type ScopesRunner interface {
Runner // it's a Runner
// Scopes returns all internal runners scopes
Scopes() StringsSet
}
// PushRunner is a Runner that also exposes the ability
// to push another runner into the internal runner
type PushRunner interface {
ScopesRunner // it's a Runner
// Push tries to push a given runner into internal runner and returns true if succeeded
Push(toPush ScopesRunner) bool
}
// ApproxSizer is a Runner that can roughly predict the size of its output
type ApproxSizer interface {
Runner
// ApproxSize returns a roughly estimated size of the output produced by this Runner
ApproxSize() int
}
// Run runs given runner and takes care of channels management involved in runner execution
// safe to use only if caller created the out channel
func Run(ctx context.Context, r Runner, inp, out chan Dataset, cancel context.CancelFunc, err *error) {
// drain inp in case there are left overs in the channel.
// usually this will be a no-op, unless runner has exited early due to an
// error or some other logic (irrelevant inp, etc.). in such cases draining
// allows preceding runner to be canceled
defer drain(inp)
defer close(out)
*err = r.Run(ctx, inp, out)
if *err != nil && cancel != nil {
setError(ctx, *err)
cancel()
}
}
// PassThrough returns a runner that lets all of its input through as-is
func PassThrough(types ...Type) Runner {
if len(types) == 0 {
return passThroughSingleton
}
return &passThrough{ReturnTypes: types}
}
// PassThroughWithScopes returns a runner that lets all of its input through as-is
func PassThroughWithScopes(scopes StringsSet, types ...Type) Runner {
if len(types) == 0 {
panic("scopes without types are not allowed")
}
return &passThrough{types, scopes}
}
var passThroughSingleton = &passThrough{}
type passThrough struct {
ReturnTypes []Type
scopes StringsSet
}
func (r *passThrough) Equals(other interface{}) bool {
o, ok := other.(*passThrough)
return ok && AreEqualTypes(r.ReturnTypes, o.ReturnTypes)
}
func (r *passThrough) Scopes() StringsSet {
return r.scopes
}
func (*passThrough) Args() []Type { return []Type{Wildcard} }
func (r *passThrough) Returns() []Type {
if len(r.ReturnTypes) == 0 {
return []Type{Wildcard}
}
return r.ReturnTypes
}
func (*passThrough) Run(_ context.Context, inp, out chan Dataset) error {
for data := range inp {
out <- data
}
return nil
}
func (r *passThrough) run(data Dataset) (Dataset, error) {
return data, nil
}
func (r *passThrough) BatchFunction() BatchFunction {
return r.run
}
// Pick returns a new runner similar to PassThrough except that it picks and
// returns just the data at the provided indices
func Pick(indices ...int) Runner { return &pick{indices} }
type pick struct{ Indices []int }
func (r *pick) Equals(other interface{}) bool {
o, ok := other.(*pick)
if !ok || len(r.Indices) != len(o.Indices) {
return false
}
for i, idx := range r.Indices {
if idx != o.Indices[i] {
return false
}
}
return true
}
func (r *pick) Returns() []Type {
types := make([]Type, len(r.Indices))
for i, idx := range r.Indices {
types[i] = Wildcard.At(idx)
}
return types
}
func (r *pick) Run(_ context.Context, inp, out chan Dataset) error {
for data := range inp {
var result Dataset
if len(r.Indices) == 0 {
result = NewDataset(dummy.Data(data.Len()))
} else {
res := make([]Data, len(r.Indices))
for i, idx := range r.Indices {
res[i] = data.At(idx)
}
result = NewDataset(res...)
}
out <- result
}
return nil
}
// helper function to drain inp/out channel
func drain(c chan Dataset) {
for range c {
}
}
// Tail returns a runner that split data and returns only last tailWidth columns
func Tail(returnTypes []Type) Runner {
return &tail{returnTypes}
}
type tail struct {
Types []Type
}
func (r *tail) Equals(other interface{}) bool {
o, ok := other.(*tail)
return ok && AreEqualTypes(r.Types, o.Types)
}
func (r *tail) Returns() []Type { return r.Types }
func (r *tail) Run(_ context.Context, inp, out chan Dataset) error {
tailWidth := len(r.Types)
for data := range inp {
_, secondDataset := data.Split(tailWidth)
out <- secondDataset
}
return nil
}