Skip to content

Commit

Permalink
refactor(jobwrapper): refactor JobWrapper to Middleware and adjus…
Browse files Browse the repository at this point in the history
…t use of `Chain`
  • Loading branch information
flc1125 committed Oct 26, 2024
1 parent a35487a commit 41ff78e
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 67 deletions.
42 changes: 9 additions & 33 deletions chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import (
"time"
)

// Middleware is a chainable behavior modifier for jobs.
type Middleware func(Job) Job

// Chain is a helper function to compose Middlewares. It returns a Middleware that
// applies the Middlewares in order.
//
// Chain(m1, m2, m3) => m1(m2(m3(job)))
func Chain(m ...Middleware) Middleware {
return func(next Job) Job {
for i := len(m) - 1; i >= 0; i-- {
Expand All @@ -19,38 +24,9 @@ func Chain(m ...Middleware) Middleware {
}
}

// JobWrapper decorates the given Job with some behavior.
type JobWrapper func(Job) Job

// Chain is a sequence of JobWrappers that decorates submitted jobs with
// cross-cutting behaviors like logging or synchronization.
type Chain struct {
wrappers []JobWrapper
}

// NewChain returns a Chain consisting of the given JobWrappers.
func NewChain(c ...JobWrapper) Chain {
return Chain{c}
}

// Then decorates the given job with all JobWrappers in the chain.
//
// This:
//
// NewChain(m1, m2, m3).Then(job)
//
// is equivalent to:
//
// m1(m2(m3(job)))
func (c Chain) Then(j Job) Job {
for i := range c.wrappers {
j = c.wrappers[len(c.wrappers)-i-1](j)
}
return j
}

// Recover panics in wrapped jobs and log them with the provided logger.
func Recover(logger Logger) JobWrapper {
// Deprecated: recovery.New()
func Recover(logger Logger) Middleware {
return func(j Job) Job {
return JobFunc(func(ctx context.Context) error {
defer func() {
Expand All @@ -73,7 +49,7 @@ func Recover(logger Logger) JobWrapper {
// DelayIfStillRunning serializes jobs, delaying subsequent runs until the
// previous one is complete. Jobs running after a delay of more than a minute
// have the delay logged at Info.
func DelayIfStillRunning(logger Logger) JobWrapper {
func DelayIfStillRunning(logger Logger) Middleware {
return func(j Job) Job {
var mu sync.Mutex
return JobFunc(func(ctx context.Context) error {
Expand All @@ -90,7 +66,7 @@ func DelayIfStillRunning(logger Logger) JobWrapper {

// SkipIfStillRunning skips an invocation of the Job if a previous invocation is
// still running. It logs skips to the given logger at Info level.
func SkipIfStillRunning(logger Logger) JobWrapper {
func SkipIfStillRunning(logger Logger) Middleware {
return func(j Job) Job {
ch := make(chan struct{}, 1)
ch <- struct{}{}
Expand Down
32 changes: 15 additions & 17 deletions chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func appendingJob(slice *[]int, value int) Job {
})
}

func appendingWrapper(slice *[]int, value int) JobWrapper {
func appendingWrapper(slice *[]int, value int) Middleware {
return func(j Job) Job {
return JobFunc(func(ctx context.Context) error {
appendingJob(slice, value).Run(ctx) //nolint:errcheck
Expand All @@ -37,7 +37,7 @@ func TestChain(t *testing.T) {
append3 = appendingWrapper(&nums, 3)
append4 = appendingJob(&nums, 4)
)
NewChain(append1, append2, append3).Then(append4).Run(context.Background()) //nolint:errcheck
Chain(append1, append2, append3)(append4).Run(context.Background()) //nolint:errcheck
if !reflect.DeepEqual(nums, []int{1, 2, 3, 4}) {
t.Error("unexpected order of calls:", nums)
}
Expand All @@ -54,19 +54,17 @@ func TestChainRecover(t *testing.T) {
t.Errorf("panic expected, but none received")
}
}()
NewChain().Then(panickingJob).
Chain()(panickingJob).
Run(context.Background()) //nolint:errcheck
})

t.Run("Recovering JobWrapper recovers", func(*testing.T) {
NewChain(Recover(PrintfLogger(log.New(io.Discard, "", 0)))).
Then(panickingJob).
Chain(Recover(PrintfLogger(log.New(io.Discard, "", 0))))(panickingJob).
Run(context.Background()) //nolint:errcheck
})

t.Run("composed with the *IfStillRunning wrappers", func(*testing.T) {
NewChain(Recover(PrintfLogger(log.New(io.Discard, "", 0)))).
Then(panickingJob).
Chain(Recover(PrintfLogger(log.New(io.Discard, "", 0))))(panickingJob).
Run(context.Background()) //nolint:errcheck
})
}
Expand Down Expand Up @@ -104,7 +102,7 @@ func (j *countJob) Done() int {
func TestChainDelayIfStillRunning(t *testing.T) {
t.Run("runs immediately", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
wrappedJob := Chain(DelayIfStillRunning(DiscardLogger))(&j)
go wrappedJob.Run(context.Background()) //nolint:errcheck
time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
if c := j.Done(); c != 1 {
Expand All @@ -114,7 +112,7 @@ func TestChainDelayIfStillRunning(t *testing.T) {

t.Run("second run immediate if first done", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
wrappedJob := Chain(DelayIfStillRunning(DiscardLogger))(&j)
go func() {
go wrappedJob.Run(context.Background()) //nolint:errcheck
time.Sleep(time.Millisecond)
Expand All @@ -129,7 +127,7 @@ func TestChainDelayIfStillRunning(t *testing.T) {
t.Run("second run delayed if first not done", func(t *testing.T) {
var j countJob
j.delay = 10 * time.Millisecond
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
wrappedJob := Chain(DelayIfStillRunning(DiscardLogger))(&j)
go func() {
go wrappedJob.Run(context.Background()) //nolint:errcheck
time.Sleep(time.Millisecond)
Expand All @@ -156,7 +154,7 @@ func TestChainDelayIfStillRunning(t *testing.T) {
func TestChainSkipIfStillRunning(t *testing.T) {
t.Run("runs immediately", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
wrappedJob := Chain(SkipIfStillRunning(DiscardLogger))(&j)
go wrappedJob.Run(context.Background()) //nolint:errcheck
time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
if c := j.Done(); c != 1 {
Expand All @@ -166,7 +164,7 @@ func TestChainSkipIfStillRunning(t *testing.T) {

t.Run("second run immediate if first done", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
wrappedJob := Chain(SkipIfStillRunning(DiscardLogger))(&j)
go func() {
go wrappedJob.Run(context.Background()) //nolint:errcheck
time.Sleep(time.Millisecond)
Expand All @@ -181,7 +179,7 @@ func TestChainSkipIfStillRunning(t *testing.T) {
t.Run("second run skipped if first not done", func(t *testing.T) {
var j countJob
j.delay = 10 * time.Millisecond
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
wrappedJob := Chain(SkipIfStillRunning(DiscardLogger))(&j)
go func() {
go wrappedJob.Run(context.Background()) //nolint:errcheck
time.Sleep(time.Millisecond)
Expand All @@ -207,7 +205,7 @@ func TestChainSkipIfStillRunning(t *testing.T) {
t.Run("skip 10 jobs on rapid fire", func(t *testing.T) {
var j countJob
j.delay = 10 * time.Millisecond
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
wrappedJob := Chain(SkipIfStillRunning(DiscardLogger))(&j)
for i := 0; i < 11; i++ {
go wrappedJob.Run(context.Background()) //nolint:errcheck
}
Expand All @@ -222,9 +220,9 @@ func TestChainSkipIfStillRunning(t *testing.T) {
var j1, j2 countJob
j1.delay = 10 * time.Millisecond
j2.delay = 10 * time.Millisecond
chain := NewChain(SkipIfStillRunning(DiscardLogger))
wrappedJob1 := chain.Then(&j1)
wrappedJob2 := chain.Then(&j2)
chain := Chain(SkipIfStillRunning(DiscardLogger))
wrappedJob1 := chain(&j1)
wrappedJob2 := chain(&j2)
for i := 0; i < 11; i++ {
go wrappedJob1.Run(context.Background()) //nolint:errcheck
go wrappedJob2.Run(context.Background()) //nolint:errcheck
Expand Down
10 changes: 5 additions & 5 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
// specified by the schedule. It may be started, stopped, and the entries may
// be inspected while running.
type Cron struct {
ctx context.Context
entries []*Entry
chain Chain
ctx context.Context
entries []*Entry
middlewares []Middleware
// chain Chain
stop chan struct{}
add chan *Entry
remove chan EntryID
Expand Down Expand Up @@ -110,7 +111,6 @@ func New(opts ...Option) *Cron {
c := &Cron{
ctx: context.Background(),
entries: nil,
chain: NewChain(),
add: make(chan *Entry),
stop: make(chan struct{}),
snapshot: make(chan chan []Entry),
Expand Down Expand Up @@ -154,7 +154,7 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
entry := &Entry{
ID: c.nextID,
Schedule: schedule,
WrappedJob: c.chain.Then(cmd),
WrappedJob: Chain(c.middlewares...)(cmd),
Job: cmd,
}
if !c.running {
Expand Down
6 changes: 3 additions & 3 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newBufLogger(sw *syncWriter) Logger {
func TestFuncPanicRecovery(t *testing.T) {
var buf syncWriter
cron := New(WithParser(secondParser),
WithChain(Recover(newBufLogger(&buf))))
WithMiddleware(Recover(newBufLogger(&buf))))
cron.Start()
defer cron.Stop()
cron.AddFunc("* * * * * ?", func(context.Context) error { //nolint:errcheck
Expand All @@ -66,7 +66,7 @@ func TestJobPanicRecovery(t *testing.T) {

var buf syncWriter
cron := New(WithParser(secondParser),
WithChain(Recover(newBufLogger(&buf))))
WithMiddleware(Recover(newBufLogger(&buf))))
cron.Start()
defer cron.Stop()
cron.AddJob("* * * * * ?", job) //nolint:errcheck
Expand Down Expand Up @@ -756,5 +756,5 @@ func stop(cron *Cron) chan bool {

// newWithSeconds returns a Cron with the seconds field enabled.
func newWithSeconds() *Cron {
return New(WithParser(secondParser), WithChain())
return New(WithParser(secondParser), WithMiddleware())
}
36 changes: 31 additions & 5 deletions middleware/recovery/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,48 @@ import (
"github.com/flc1125/go-cron/v4"
)

func New(logger cron.Logger) cron.Middleware {
const size = 64 << 10

type options struct {
logger cron.Logger
}

type Option func(*options)

func newOptions(opts ...Option) *options {
opt := &options{
logger: cron.DefaultLogger,
}
for _, o := range opts {
o(opt)
}
return opt

Check warning on line 26 in middleware/recovery/recovery.go

View check run for this annotation

Codecov / codecov/patch

middleware/recovery/recovery.go#L19-L26

Added lines #L19 - L26 were not covered by tests
}

func WithLogger(logger cron.Logger) Option {
return func(o *options) {
o.logger = logger
}

Check warning on line 32 in middleware/recovery/recovery.go

View check run for this annotation

Codecov / codecov/patch

middleware/recovery/recovery.go#L29-L32

Added lines #L29 - L32 were not covered by tests
}

// New returns a new recovery middleware.
// It recovers from any panics and logs the panic with the provided logger.
func New(opts ...Option) cron.Middleware {
o := newOptions(opts...)
return func(next cron.Job) cron.Job {
return cron.JobFunc(func(ctx context.Context) {
return cron.JobFunc(func(ctx context.Context) error {
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err, ok := r.(error)
if !ok {
err = fmt.Errorf("%v", r)
}
logger.Error(err, "panic", "stack", "...\n"+string(buf))
o.logger.Error(err, "panic", "stack", "...\n"+string(buf))

Check warning on line 49 in middleware/recovery/recovery.go

View check run for this annotation

Codecov / codecov/patch

middleware/recovery/recovery.go#L37-L49

Added lines #L37 - L49 were not covered by tests
}
}()
next.Run(ctx)
return next.Run(ctx)

Check warning on line 52 in middleware/recovery/recovery.go

View check run for this annotation

Codecov / codecov/patch

middleware/recovery/recovery.go#L52

Added line #L52 was not covered by tests
})
}
}
15 changes: 11 additions & 4 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ func WithParser(p ScheduleParser) Option {
}
}

// WithChain specifies Job wrappers to apply to all jobs added to this cron.
// Refer to the Chain* functions in this package for provided wrappers.
func WithChain(wrappers ...JobWrapper) Option {
// // WithChain specifies Job wrappers to apply to all jobs added to this cron.
// // Refer to the Chain* functions in this package for provided wrappers.
// func WithChain(wrappers ...JobWrapper) Option {
// return func(c *Cron) {
// c.chain = NewChain(wrappers...)
// }
// }

// WithMiddleware specifies Middleware to apply to all jobs added to this cron.
func WithMiddleware(middlewares ...Middleware) Option {
return func(c *Cron) {
c.chain = NewChain(wrappers...)
c.middlewares = middlewares
}
}

Expand Down

0 comments on commit 41ff78e

Please sign in to comment.