Skip to content

Commit

Permalink
refactor(middleware): SkipIfStillRunning to nooverlapping (#27)
Browse files Browse the repository at this point in the history
* refactor(middleware): refactor `SkipIfStillRunning` to `nooverlapping.New()`

* refactor(middleware): refactor SkipIfStillRunning to nooverlapping.New()
  • Loading branch information
flc1125 authored Oct 28, 2024
1 parent 89fec4b commit da81449
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 1 deletion.
8 changes: 7 additions & 1 deletion internal/logger/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ func (b *Buffer) String() string {
return b.buf.String()
}

func (b *Buffer) Reset() {
b.mu.Lock()
defer b.mu.Unlock()
b.buf.Reset()
}

func NewBufferLogger(buffer *Buffer) cron.Logger {
return cron.PrintfLogger(log.New(buffer, "", log.LstdFlags))
return cron.VerbosePrintfLogger(log.New(buffer, "", log.LstdFlags))
}
50 changes: 50 additions & 0 deletions middleware/nooverlapping/nooverlapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package nooverlapping

import (
"context"

"github.com/flc1125/go-cron/v4"
)

type options struct {
logger cron.Logger
}

type Option func(*options)

func WithLogger(logger cron.Logger) Option {
return func(o *options) {
o.logger = logger
}
}

func newOptions(opts ...Option) options {
opt := options{
logger: cron.DefaultLogger,
}
for _, o := range opts {
o(&opt)
}
return opt
}

// New returns a without Overlapping middleware.
// if the job is running, skip the job.
// Based on the old version of SkipIfStillRunning
func New(opts ...Option) cron.Middleware {
o := newOptions(opts...)
return func(job cron.Job) cron.Job {
ch := make(chan struct{}, 1)
ch <- struct{}{}
return cron.JobFunc(func(ctx context.Context) error {
select {
case v := <-ch:
defer func() { ch <- v }()
return job.Run(ctx)
default:
o.logger.Info("job is still running, skip")
return nil
}
})
}
}
183 changes: 183 additions & 0 deletions middleware/nooverlapping/nooverlapping_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package nooverlapping

import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/flc1125/go-cron/v4"
"github.com/flc1125/go-cron/v4/internal/logger"
)

var (
ctx = context.Background()
buf = logger.NewBuffer()
noOverlapping = New(WithLogger(logger.NewBufferLogger(buf)))
wg = sync.WaitGroup{}
)

func TestNoOverlapping(t *testing.T) {
buf.Reset()

var (
ch = make(chan struct{}, 100)
wg = sync.WaitGroup{}
job = noOverlapping(cron.JobFunc(func(context.Context) error {
ch <- struct{}{}
time.Sleep(2 * time.Millisecond)
return nil
}))
)

for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, job.Run(ctx))
}()
}
wg.Wait()

assert.True(t, len(ch) < 100)
assert.Contains(t, buf.String(), "job is still running, skip")
}

func TestNoOverlapping_Chain(t *testing.T) {
buf.Reset()

var (
ch = make(chan struct{}, 100)
job = cron.Chain(noOverlapping)(cron.JobFunc(func(context.Context) error {
ch <- struct{}{}
time.Sleep(2 * time.Millisecond)
return nil
}))
)

for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, job.Run(ctx))
}()
}
wg.Wait()

assert.True(t, len(ch) < 100)
assert.Contains(t, buf.String(), "job is still running, skip")
}

func TestNoOverlapping_Cases(t *testing.T) {
t.Run("second run immediate if first done", func(t *testing.T) {
buf.Reset()
ch := make(chan struct{}, 10)
job := noOverlapping(cron.JobFunc(func(context.Context) error {
ch <- struct{}{}
time.Sleep(2 * time.Millisecond)
return nil
}))

wg.Add(3)
go func() {
defer wg.Done()
go func() {
defer wg.Done()
assert.NoError(t, job.Run(ctx))
}()

time.Sleep(10 * time.Millisecond)

go func() {
defer wg.Done()
assert.NoError(t, job.Run(ctx))
}()
}()
wg.Wait()

assert.Len(t, ch, 2)
})

t.Run("second run skipped if first not done", func(t *testing.T) {
buf.Reset()
ch := make(chan struct{}, 10)
job := noOverlapping(cron.JobFunc(func(context.Context) error {
ch <- struct{}{}
time.Sleep(10 * time.Millisecond)
return nil
}))

wg.Add(3)
go func() {
defer wg.Done()
go func() {
defer wg.Done()
assert.NoError(t, job.Run(ctx))
}()

go func() {
defer wg.Done()
assert.NoError(t, job.Run(ctx))
}()
}()
wg.Wait()

assert.Len(t, ch, 1)
assert.Contains(t, buf.String(), "job is still running, skip")
})

t.Run("skip 10 jobs on rapid fire", func(t *testing.T) {
buf.Reset()
ch := make(chan struct{}, 10)
job := noOverlapping(cron.JobFunc(func(context.Context) error {
ch <- struct{}{}
time.Sleep(100 * time.Millisecond)
return nil
}))

wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
assert.NoError(t, job.Run(ctx))
}()
}
wg.Wait()

assert.Len(t, ch, 1)
assert.Contains(t, buf.String(), "job is still running, skip")
})

t.Run("different jobs independent", func(t *testing.T) {
buf.Reset()
ch := make(chan struct{}, 100)
job1 := noOverlapping(cron.JobFunc(func(context.Context) error {
ch <- struct{}{}
time.Sleep(100 * time.Millisecond)
return nil
}))
job2 := noOverlapping(cron.JobFunc(func(context.Context) error {
ch <- struct{}{}
time.Sleep(100 * time.Millisecond)
return nil
}))

for i := 0; i < 10; i++ {
wg.Add(2)
go func() {
defer wg.Done()
assert.NoError(t, job1.Run(ctx))
}()
go func() {
defer wg.Done()
assert.NoError(t, job2.Run(ctx))
}()
}

wg.Wait()
assert.Len(t, ch, 2)
assert.Contains(t, buf.String(), "job is still running, skip")
})
}

0 comments on commit da81449

Please sign in to comment.