diff --git a/internal/logger/buffer.go b/internal/logger/buffer.go index 7aec897..da7b7a8 100644 --- a/internal/logger/buffer.go +++ b/internal/logger/buffer.go @@ -29,6 +29,12 @@ func (b *Buffer) String() string { return b.buf.String() } +func (b *Buffer) Reset() { + b.mu.Lock() + defer b.mu.Unlock() + b.buf.Reset() +} + func NewBufferLogger(buffer *Buffer) cron.Logger { - return cron.PrintfLogger(log.New(buffer, "", log.LstdFlags)) + return cron.VerbosePrintfLogger(log.New(buffer, "", log.LstdFlags)) } diff --git a/middleware/nooverlapping/nooverlapping.go b/middleware/nooverlapping/nooverlapping.go new file mode 100644 index 0000000..dc303e2 --- /dev/null +++ b/middleware/nooverlapping/nooverlapping.go @@ -0,0 +1,50 @@ +package nooverlapping + +import ( + "context" + + "github.com/flc1125/go-cron/v4" +) + +type options struct { + logger cron.Logger +} + +type Option func(*options) + +func WithLogger(logger cron.Logger) Option { + return func(o *options) { + o.logger = logger + } +} + +func newOptions(opts ...Option) options { + opt := options{ + logger: cron.DefaultLogger, + } + for _, o := range opts { + o(&opt) + } + return opt +} + +// New returns a without Overlapping middleware. +// if the job is running, skip the job. +// Based on the old version of SkipIfStillRunning +func New(opts ...Option) cron.Middleware { + o := newOptions(opts...) + return func(job cron.Job) cron.Job { + ch := make(chan struct{}, 1) + ch <- struct{}{} + return cron.JobFunc(func(ctx context.Context) error { + select { + case v := <-ch: + defer func() { ch <- v }() + return job.Run(ctx) + default: + o.logger.Info("job is still running, skip") + return nil + } + }) + } +} diff --git a/middleware/nooverlapping/nooverlapping_test.go b/middleware/nooverlapping/nooverlapping_test.go new file mode 100644 index 0000000..7bce27f --- /dev/null +++ b/middleware/nooverlapping/nooverlapping_test.go @@ -0,0 +1,183 @@ +package nooverlapping + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/flc1125/go-cron/v4" + "github.com/flc1125/go-cron/v4/internal/logger" +) + +var ( + ctx = context.Background() + buf = logger.NewBuffer() + noOverlapping = New(WithLogger(logger.NewBufferLogger(buf))) + wg = sync.WaitGroup{} +) + +func TestNoOverlapping(t *testing.T) { + buf.Reset() + + var ( + ch = make(chan struct{}, 100) + wg = sync.WaitGroup{} + job = noOverlapping(cron.JobFunc(func(context.Context) error { + ch <- struct{}{} + time.Sleep(2 * time.Millisecond) + return nil + })) + ) + + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, job.Run(ctx)) + }() + } + wg.Wait() + + assert.True(t, len(ch) < 100) + assert.Contains(t, buf.String(), "job is still running, skip") +} + +func TestNoOverlapping_Chain(t *testing.T) { + buf.Reset() + + var ( + ch = make(chan struct{}, 100) + job = cron.Chain(noOverlapping)(cron.JobFunc(func(context.Context) error { + ch <- struct{}{} + time.Sleep(2 * time.Millisecond) + return nil + })) + ) + + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, job.Run(ctx)) + }() + } + wg.Wait() + + assert.True(t, len(ch) < 100) + assert.Contains(t, buf.String(), "job is still running, skip") +} + +func TestNoOverlapping_Cases(t *testing.T) { + t.Run("second run immediate if first done", func(t *testing.T) { + buf.Reset() + ch := make(chan struct{}, 10) + job := noOverlapping(cron.JobFunc(func(context.Context) error { + ch <- struct{}{} + time.Sleep(2 * time.Millisecond) + return nil + })) + + wg.Add(3) + go func() { + defer wg.Done() + go func() { + defer wg.Done() + assert.NoError(t, job.Run(ctx)) + }() + + time.Sleep(10 * time.Millisecond) + + go func() { + defer wg.Done() + assert.NoError(t, job.Run(ctx)) + }() + }() + wg.Wait() + + assert.Len(t, ch, 2) + }) + + t.Run("second run skipped if first not done", func(t *testing.T) { + buf.Reset() + ch := make(chan struct{}, 10) + job := noOverlapping(cron.JobFunc(func(context.Context) error { + ch <- struct{}{} + time.Sleep(10 * time.Millisecond) + return nil + })) + + wg.Add(3) + go func() { + defer wg.Done() + go func() { + defer wg.Done() + assert.NoError(t, job.Run(ctx)) + }() + + go func() { + defer wg.Done() + assert.NoError(t, job.Run(ctx)) + }() + }() + wg.Wait() + + assert.Len(t, ch, 1) + assert.Contains(t, buf.String(), "job is still running, skip") + }) + + t.Run("skip 10 jobs on rapid fire", func(t *testing.T) { + buf.Reset() + ch := make(chan struct{}, 10) + job := noOverlapping(cron.JobFunc(func(context.Context) error { + ch <- struct{}{} + time.Sleep(100 * time.Millisecond) + return nil + })) + + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + assert.NoError(t, job.Run(ctx)) + }() + } + wg.Wait() + + assert.Len(t, ch, 1) + assert.Contains(t, buf.String(), "job is still running, skip") + }) + + t.Run("different jobs independent", func(t *testing.T) { + buf.Reset() + ch := make(chan struct{}, 100) + job1 := noOverlapping(cron.JobFunc(func(context.Context) error { + ch <- struct{}{} + time.Sleep(100 * time.Millisecond) + return nil + })) + job2 := noOverlapping(cron.JobFunc(func(context.Context) error { + ch <- struct{}{} + time.Sleep(100 * time.Millisecond) + return nil + })) + + for i := 0; i < 10; i++ { + wg.Add(2) + go func() { + defer wg.Done() + assert.NoError(t, job1.Run(ctx)) + }() + go func() { + defer wg.Done() + assert.NoError(t, job2.Run(ctx)) + }() + } + + wg.Wait() + assert.Len(t, ch, 2) + assert.Contains(t, buf.String(), "job is still running, skip") + }) +}