-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.go
146 lines (124 loc) · 2.72 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package pzip
import (
"context"
"errors"
"sync"
"sync/atomic"
)
const (
// OPENED represents that the pool is opened.
OPENED = iota + 1
// CLOSED represents that the pool is closed.
CLOSED
)
var (
ErrWorkerClosed = errors.New("this worker has been closed")
ErrWorkerNotOpened = errors.New("this worker has not been opened")
)
type Executor[T any] func(params *T) error
type FailFastWorker[T any] struct {
wg sync.WaitGroup
ctx context.Context
cancel func(err error)
// state is used to notice the pool to closed itself.
state int32
parallelism int
capacity int
tasks chan *T
err error
errOnce sync.Once
executor Executor[T]
}
func NewFailFastWorker[T any](executor Executor[T], parallelism int, Capacity int) *FailFastWorker[T] {
return &FailFastWorker[T]{
tasks: make(chan *T, Capacity),
executor: executor,
parallelism: parallelism,
capacity: Capacity,
}
}
func (fw *FailFastWorker[T]) reset(ctx context.Context) {
atomic.StoreInt32(&fw.state, OPENED)
fw.tasks = make(chan *T, fw.capacity)
fw.ctx, fw.cancel = context.WithCancelCause(ctx)
fw.err = nil
fw.errOnce = sync.Once{}
}
func (fw *FailFastWorker[T]) Start(ctx context.Context) {
fw.reset(ctx)
for i := 0; i < fw.parallelism; i++ {
fw.wg.Add(1)
go func() {
defer fw.wg.Done()
if err := fw.exec(); err != nil {
fw.errOnce.Do(func() {
fw.err = err
if fw.cancel != nil {
fw.cancel(fw.err)
}
})
}
}()
}
}
func (fw *FailFastWorker[T]) exec() error {
for {
select {
case <-fw.ctx.Done():
return fw.ctx.Err()
case t, ok := <-fw.tasks:
if !ok {
return nil
}
if err := fw.executor(t); err != nil {
return err
}
}
}
}
// Len returns the number of tasks that are waiting to be processed
func (fw *FailFastWorker[T]) Len() int {
return len(fw.tasks)
}
func (fw *FailFastWorker[T]) Wait() error {
if fw.IsClosed() {
return ErrWorkerClosed
}
if !fw.IsOpened() {
return ErrWorkerNotOpened
}
// close
close(fw.tasks)
fw.wg.Wait()
atomic.StoreInt32(&fw.state, CLOSED)
return fw.err
}
// IsClosed indicates whether the worker is closed.
func (fw *FailFastWorker[T]) IsClosed() bool {
return atomic.LoadInt32(&fw.state) == CLOSED
}
// IsOpened indicates whether the worker is opened.
func (fw *FailFastWorker[T]) IsOpened() bool {
return atomic.LoadInt32(&fw.state) == OPENED
}
func (fw *FailFastWorker[T]) Submit(task *T) error {
if fw.err != nil {
return fw.err
}
if !fw.IsOpened() {
return ErrWorkerNotOpened
}
if fw.IsClosed() {
return ErrWorkerClosed
}
select {
case fw.tasks <- task:
// Task submitted successfully
case <-fw.ctx.Done():
if fw.err != nil {
return fw.err
}
return fw.ctx.Err()
}
return nil
}