-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunner.go
95 lines (83 loc) · 2.08 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// Copyright 2021 John Papandriopoulos. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package concurrent
import (
"runtime"
"sync"
"sync/atomic"
)
// Runner runs jobs with a specified maximum limit on concurrency.
type Runner struct {
sync.Mutex
width int
nsuccess uint32
nfailures uint32
ch chan struct{}
errors []error
wg sync.WaitGroup
}
// NewRunner returns a new Runner that executes jobs concurrently with at most
// n jobs running at any one time.
func NewRunner(n int) *Runner {
if n == 0 {
n = runtime.NumCPU()
}
return &Runner{
width: n,
ch: make(chan struct{}, n),
}
}
// Errors returns all of the errors reported by jobs.
// The order is given by job completion, not submission.
func (jr *Runner) Errors() []error {
jr.Lock()
defer jr.Unlock()
return append(([]error)(nil), jr.errors...) // copy
}
// Failures returns the number of errors reported by jobs so far.
// This is useful for callers to fail-fast and stop submitting
// jobs if an error has been reported.
func (jr *Runner) Failures() int {
return int(atomic.LoadUint32(&jr.nfailures))
}
// RunErr runs a job that reports an error if it fails.
func (jr *Runner) RunErr(job func() error) {
if jr.width == 1 {
if err := job(); err != nil {
jr.errors = append(jr.errors, err)
jr.nfailures++
} else {
jr.nsuccess++
}
return
}
// Block if we exceed width
jr.ch <- struct{}{}
jr.wg.Add(1)
// Run job
go func() {
if err := job(); err != nil {
atomic.AddUint32(&jr.nfailures, 1)
jr.Lock()
jr.errors = append(jr.errors, err)
jr.Unlock()
} else {
atomic.AddUint32(&jr.nsuccess, 1)
}
// Job is done
jr.wg.Done()
<-jr.ch
}()
}
// Run is like RunErr, but the job func does not need to return an error.
func (jr *Runner) Run(job func()) {
jr.RunErr(func() error { job(); return nil })
}
// Finish waits for all executing jobs to complete, and returns the number
// of successful jobs completed.
func (jr *Runner) Finish() int {
jr.wg.Wait()
close(jr.ch)
return int(jr.nsuccess)
}