-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmultiple.go
48 lines (43 loc) · 1.05 KB
/
multiple.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package flow
import (
"context"
"fmt"
"github.com/bytepowered/assert-go"
)
type MultipleRunner struct {
inputRef Input
pipelines []*Pipeline
}
func NewMultipleRunner(input Input) *MultipleRunner {
assert.MustNotNil(input, "Input is nil")
return &MultipleRunner{
inputRef: input,
}
}
func (r *MultipleRunner) AddPipeline(v *Pipeline) *MultipleRunner {
assert.MustNotNil(v, "Pipeline is nil")
r.pipelines = append(r.pipelines, v)
return r
}
func (r *MultipleRunner) Serve(ctx context.Context) error {
assert.MustTrue(len(r.pipelines) > 0, "MultipleRunner requires pipelines")
type deliverer struct {
fun DeliverFunc
id string
}
var delivers []deliverer
for _, pipe := range r.pipelines {
delivers = append(delivers, deliverer{
fun: pipe.buildDeliverFunc(ctx),
id: pipe.Id,
})
}
return r.inputRef.OnRead(ctx, func(msg Message) error {
for _, deliver := range delivers {
if err := deliver.fun(msg); err != nil {
return fmt.Errorf("MultipleRunner deliver error, pipeline: %s, %w", deliver.id, err)
}
}
return nil
})
}