-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcronalt.go
188 lines (149 loc) · 4.36 KB
/
cronalt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package cronalt
import (
"context"
"fmt"
"sync"
"time"
"github.com/ahmedalhulaibi/cronalt/job"
)
type empty struct{}
type clock interface {
Now() time.Time
}
type waitGroup interface {
Add(delta int)
Wait()
Done()
}
type Scheduler struct {
// jobPool is a Semaphore to limit the number of concurrent jobs running at once
jobPool chan empty
jobs jobStore
wg waitGroup
log logger
clock clock
}
var (
ErrMaxConcurrentJobsZero error = fmt.Errorf("maxConcurrentJobs must be greater than zero")
)
func NewScheduler(maxConcurrentJobs int, opts ...SchedulerOption) (*Scheduler, error) {
if maxConcurrentJobs <= 0 {
return nil, ErrMaxConcurrentJobsZero
}
var wg sync.WaitGroup
s := &Scheduler{
jobPool: make(chan empty, maxConcurrentJobs),
jobs: job.NewStore(),
log: noopLogger{},
clock: timeProvider{},
wg: &wg,
}
for _, opt := range opts {
s = opt(s)
}
return s, nil
}
type SchedulerOption func(s *Scheduler) *Scheduler
// WithLogger returns a SchedulerOption to inject a logger
func WithLogger(l logger) SchedulerOption {
return func(s *Scheduler) *Scheduler {
s.log = l
return s
}
}
// WithClock returns a SchedulerOption to inject a clock, default is time.Now
func WithClock(t clock) SchedulerOption {
return func(s *Scheduler) *Scheduler {
s.clock = t
return s
}
}
// WithWaitGroup returns a SchedulerOption to inject a waitgroup, default is sync.WaitGroup
// This can be used to inject your own instance of sync.WaitGroup
func WithWaitGroup(wg waitGroup) SchedulerOption {
return func(s *Scheduler) *Scheduler {
s.wg = wg
return s
}
}
// WithJobStore returns a SchedulerOption to inject a jobStore, default is job.store
func WithJobStore(js jobStore) SchedulerOption {
return func(s *Scheduler) *Scheduler {
s.jobs = js
return s
}
}
// Schedule registers a job and uses job function name as the job name
func (s *Scheduler) Schedule(jt job.Timer, j job.Job) error {
return s.jobs.Add(jobCfg{timer: jt, j: j})
}
// Start starts all the scheduled jobs in their own go routine and blocks indefinitely or until context is cancelled
func (s *Scheduler) Start(ctx context.Context) {
for _, pendingJob := range s.jobs.GetAll() {
s.wg.Add(1)
s.log.Info(ctx, "cronalt.Scheduler starting job", KeyVal{"job", pendingJob.Job().Name()})
go s.run(ctx, pendingJob)
}
s.wg.Wait()
}
func (s *Scheduler) run(ctx context.Context, runJobCfg job.Config) {
defer s.wg.Done()
jobName := runJobCfg.Job().Name()
timer := time.NewTimer(s.getTimeUntilNextRun(ctx, s.clock.Now(), runJobCfg))
for {
select {
case <-ctx.Done():
s.log.Info(ctx, "cronalt.Scheduler halted", KeyVal{"job", jobName})
return
case now := <-timer.C:
s.log.Info(ctx, "cronalt.Scheduler queued", KeyVal{"job", jobName})
// Acquire lock on job pool semaphore
s.jobPool <- empty{}
s.log.Info(ctx, "cronalt.Scheduler running", KeyVal{"job", jobName})
if err := call(ctx, runJobCfg.Job(), s.log); err != nil {
s.log.Error(
ctx,
"cronalt.Scheduler job completed with error",
KeyVal{"job", jobName},
KeyVal{"error", err.Error()},
)
}
s.log.Info(ctx, "cronalt.Scheduler completed", KeyVal{"job", jobName})
// Release lock on job pool semaphore
<-s.jobPool
// Use timer.Reset since we know the timer is expired and we can reset it
timer.Reset(s.getTimeUntilNextRun(ctx, now, runJobCfg))
}
}
}
func (s *Scheduler) getTimeUntilNextRun(ctx context.Context, prevTime time.Time, runJobCfg job.Config) time.Duration {
now := s.clock.Now()
nextExpectedRun := runJobCfg.Timer().Next(prevTime)
s.log.Info(
ctx,
"cronalt.Scheduler next run",
KeyVal{"job", runJobCfg.Job().Name()},
KeyVal{"next_run", nextExpectedRun.Format(time.RFC3339)},
)
return timeUntilNextRun(nextExpectedRun, now)
}
func timeUntilNextRun(nextExpectedRun, now time.Time) time.Duration {
if nextExpectedRun.After(now) || nextExpectedRun.Equal(now) {
return nextExpectedRun.Sub(now)
}
return 0
}
func call(ctx context.Context, job job.Job, log logger) error {
defer recoverJob(ctx, job, log)
return job.Runner()(ctx)
}
func recoverJob(ctx context.Context, job job.Job, log logger) {
if r := recover(); r != nil {
keys := make([]KeyVal, 1, 2)
keys[0] = KeyVal{"job", job.Name()}
if err, ok := r.(error); ok {
keys = append(keys, KeyVal{"error", err.Error()})
}
log.Error(ctx, "cronalt.Scheduler recovered", keys...)
}
}