diff --git a/README.md b/README.md index 7e49421d..2364b765 100644 --- a/README.md +++ b/README.md @@ -3,22 +3,29 @@ [![CI State](https://github.com/go-co-op/gocron/actions/workflows/go_test.yml/badge.svg?branch=main&event=push)](https://github.com/go-co-op/gocron/actions) ![Go Report Card](https://goreportcard.com/badge/github.com/go-co-op/gocron) [![Go Doc](https://godoc.org/github.com/go-co-op/gocron?status.svg)](https://pkg.go.dev/github.com/go-co-op/gocron) -gocron is a job scheduling package which lets you run Go functions at pre-determined intervals using a simple, human-friendly syntax. +gocron is a job scheduling package which lets you run Go functions at pre-determined intervals +using a simple, human-friendly syntax. -gocron is a Golang scheduler implementation similar to the Ruby module [clockwork](https://github.com/tomykaira/clockwork) and the Python job scheduling package [schedule](https://github.com/dbader/schedule). +gocron is a Golang scheduler implementation similar to the Ruby module +[clockwork](https://github.com/tomykaira/clockwork) and the Python job scheduling package [schedule](https://github.com/dbader/schedule). See also these two great articles that were used for design input: - [Rethinking Cron](http://adam.herokuapp.com/past/2010/4/13/rethinking_cron/) - [Replace Cron with Clockwork](http://adam.herokuapp.com/past/2010/6/30/replace_cron_with_clockwork/) -If you want to chat, you can find us at Slack! [](https://gophers.slack.com/archives/CQ7T0T1FW) +If you want to chat, you can find us at Slack! +[](https://gophers.slack.com/archives/CQ7T0T1FW) ## Concepts -- **Scheduler**: The scheduler tracks all the jobs assigned to it and makes sure they are passed to the executor when ready to be run. The scheduler is able to manage overall aspects of job behavior like limiting how many jobs are running at one time. -- **Job**: The job is simply aware of the task (go function) it's provided and is therefore only able to perform actions related to that task like preventing itself from overruning a previous task that is taking a long time. -- **Executor**: The executor, as it's name suggests, is simply responsible for calling the task (go function) that the job hands to it when sent by the scheduler. +- **Scheduler**: The scheduler tracks all the jobs assigned to it and makes sure they are passed to the executor when + ready to be run. The scheduler is able to manage overall aspects of job behavior like limiting how many jobs + are running at one time. +- **Job**: The job is simply aware of the task (go function) it's provided and is therefore only able to perform + actions related to that task like preventing itself from overruning a previous task that is taking a long time. +- **Executor**: The executor, as it's name suggests, is simply responsible for calling the task (go function) that + the job hands to it when sent by the scheduler. ## Examples diff --git a/executor.go b/executor.go index 144db6ed..06d7a41d 100644 --- a/executor.go +++ b/executor.go @@ -3,8 +3,7 @@ package gocron import ( "context" "sync" - - "golang.org/x/sync/semaphore" + "sync/atomic" ) const ( @@ -25,23 +24,29 @@ const ( ) type executor struct { - jobFunctions chan jobFunction // the chan upon which the jobFunctions are passed in from the scheduler - ctx context.Context // used to tell the executor to stop - cancel context.CancelFunc // used to tell the executor to stop - wg *sync.WaitGroup // used by the scheduler to wait for the executor to stop - jobsWg *sync.WaitGroup // used by the executor to wait for all jobs to finish - singletonWgs *sync.Map // used by the executor to wait for the singleton runners to complete - limitMode limitMode // when SetMaxConcurrentJobs() is set upon the scheduler - maxRunningJobs *semaphore.Weighted + jobFunctions chan jobFunction // the chan upon which the jobFunctions are passed in from the scheduler + ctx context.Context // used to tell the executor to stop + cancel context.CancelFunc // used to tell the executor to stop + wg *sync.WaitGroup // used by the scheduler to wait for the executor to stop + jobsWg *sync.WaitGroup // used by the executor to wait for all jobs to finish + singletonWgs *sync.Map // used by the executor to wait for the singleton runners to complete + + limitMode limitMode // when SetMaxConcurrentJobs() is set upon the scheduler + limitModeMaxRunningJobs int // stores the maximum number of concurrently running jobs + limitModeFuncRunning *atomic.Bool // tracks whether the function for handling limited run jobs is running + limitModeQueue []jobFunction // queues the limited jobs for running when able per limit mode + limitModeQueueMu *sync.Mutex // mutex for the queue + limitModeRunningJobs *atomic.Int64 // tracks the count of running jobs to check against the max } func newExecutor() executor { e := executor{ - jobFunctions: make(chan jobFunction, 1), - singletonWgs: &sync.Map{}, - wg: &sync.WaitGroup{}, + jobFunctions: make(chan jobFunction, 1), + singletonWgs: &sync.Map{}, + limitModeFuncRunning: &atomic.Bool{}, + limitModeQueueMu: &sync.Mutex{}, + limitModeRunningJobs: &atomic.Int64{}, } - e.wg.Add(1) return e } @@ -73,14 +78,71 @@ func (jf *jobFunction) singletonRunner() { } } +func (e *executor) limitModeRunner() { + for { + select { + case <-e.ctx.Done(): + e.limitModeQueueMu.Lock() + e.limitModeQueue = nil + e.limitModeQueueMu.Unlock() + e.limitModeFuncRunning.Store(false) + return + default: + e.limitModeQueueMu.Lock() + if e.limitModeQueue != nil && len(e.limitModeQueue) > 0 && e.limitModeRunningJobs.Load() < int64(e.limitModeMaxRunningJobs) { + jf := e.limitModeQueue[0] + e.limitModeQueue = e.limitModeQueue[1:] + e.limitModeQueueMu.Unlock() + + e.limitModeRunningJobs.Store(e.limitModeRunningJobs.Load() + 1) + + select { + case <-jf.ctx.Done(): + continue + default: + e.jobsWg.Add(1) + go func() { + runJob(jf) + e.jobsWg.Done() + e.limitModeRunningJobs.Store(e.limitModeRunningJobs.Load() - 1) + }() + } + } else { + e.limitModeQueueMu.Unlock() + } + } + } +} + func (e *executor) start() { + e.wg = &sync.WaitGroup{} + e.wg.Add(1) + + stopCtx, cancel := context.WithCancel(context.Background()) + e.ctx = stopCtx + e.cancel = cancel + + e.jobsWg = &sync.WaitGroup{} + + go e.run() +} + +func (e *executor) run() { for { select { case f := <-e.jobFunctions: + e.jobsWg.Add(1) go func() { defer e.jobsWg.Done() + if e.limitModeMaxRunningJobs > 0 { + if !e.limitModeFuncRunning.Load() { + go e.limitModeRunner() + e.limitModeFuncRunning.Store(true) + } + } + panicHandlerMutex.RLock() defer panicHandlerMutex.RUnlock() @@ -92,28 +154,20 @@ func (e *executor) start() { }() } - if e.maxRunningJobs != nil { - if !e.maxRunningJobs.TryAcquire(1) { - - switch e.limitMode { - case RescheduleMode: - return - case WaitMode: - select { - case <-e.ctx.Done(): - return - case <-f.ctx.Done(): - return - default: - } - - if err := e.maxRunningJobs.Acquire(f.ctx, 1); err != nil { - break - } + if e.limitModeMaxRunningJobs > 0 { + switch e.limitMode { + case RescheduleMode: + e.limitModeQueueMu.Lock() + if e.limitModeQueue == nil || len(e.limitModeQueue) < e.limitModeMaxRunningJobs { + e.limitModeQueue = append(e.limitModeQueue, f) } + e.limitModeQueueMu.Unlock() + case WaitMode: + e.limitModeQueueMu.Lock() + e.limitModeQueue = append(e.limitModeQueue, f) + e.limitModeQueueMu.Unlock() } - - defer e.maxRunningJobs.Release(1) + return } switch f.runConfig.mode { diff --git a/executor_test.go b/executor_test.go index 668f4958..c3aea3e1 100644 --- a/executor_test.go +++ b/executor_test.go @@ -1,7 +1,6 @@ package gocron import ( - "context" "sync" "sync/atomic" "testing" @@ -11,14 +10,10 @@ import ( func Test_ExecutorExecute(t *testing.T) { e := newExecutor() - stopCtx, cancel := context.WithCancel(context.Background()) - e.ctx = stopCtx - e.cancel = cancel - e.jobsWg = &sync.WaitGroup{} wg := &sync.WaitGroup{} wg.Add(1) - go e.start() + e.start() e.jobFunctions <- jobFunction{ name: "test_fn", @@ -46,14 +41,10 @@ func Test_ExecutorPanicHandling(t *testing.T) { SetPanicHandler(handler) e := newExecutor() - stopCtx, cancel := context.WithCancel(context.Background()) - e.ctx = stopCtx - e.cancel = cancel - e.jobsWg = &sync.WaitGroup{} wg := &sync.WaitGroup{} wg.Add(1) - go e.start() + e.start() e.jobFunctions <- jobFunction{ name: "test_fn", diff --git a/job.go b/job.go index 05edcd55..8678f035 100644 --- a/job.go +++ b/job.go @@ -451,6 +451,7 @@ func (j *Job) stop() { } if j.cancel != nil { j.cancel() + j.ctx, j.cancel = context.WithCancel(context.Background()) } } diff --git a/scheduler.go b/scheduler.go index 41c47756..f59d0999 100644 --- a/scheduler.go +++ b/scheduler.go @@ -10,7 +10,6 @@ import ( "time" "github.com/robfig/cron/v3" - "golang.org/x/sync/semaphore" ) type limitMode int8 @@ -70,7 +69,7 @@ func NewScheduler(loc *time.Location) *Scheduler { // SetMaxConcurrentJobs limits how many jobs can be running at the same time. // This is useful when running resource intensive jobs and a precise start time is not critical. func (s *Scheduler) SetMaxConcurrentJobs(n int, mode limitMode) { - s.executor.maxRunningJobs = semaphore.NewWeighted(int64(n)) + s.executor.limitModeMaxRunningJobs = n s.executor.limitMode = mode } @@ -93,12 +92,7 @@ func (s *Scheduler) StartAsync() { // start starts the scheduler, scheduling and running jobs func (s *Scheduler) start() { - stopCtx, cancel := context.WithCancel(context.Background()) - s.executor.ctx = stopCtx - s.executor.cancel = cancel - s.executor.jobsWg = &sync.WaitGroup{} - - go s.executor.start() + s.executor.start() s.setRunning(true) s.runJobs(s.Jobs()) } @@ -595,7 +589,7 @@ func (s *Scheduler) runContinuous(job *Job) { } nr := next.dateTime.Sub(s.now()) if nr < 0 { - time.Sleep(absDuration(nr)) + job.setLastRun(s.now()) shouldRun, next := s.scheduleNextRun(job) if !shouldRun { return @@ -1252,7 +1246,6 @@ func (s *Scheduler) Update() (*Job, error) { } s.updateJob = false job.stop() - job.ctx, job.cancel = context.WithCancel(context.Background()) job.setStartsImmediately(false) if job.runWithDetails { diff --git a/scheduler_test.go b/scheduler_test.go index 722a27cb..caed039d 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -308,11 +308,19 @@ func TestAt(t *testing.T) { }) t.Run("Week() and multiple At() times, all in past", func(t *testing.T) { - atTime1 := time.Now().UTC().Add(time.Hour * -3).Round(time.Second) - atTime2 := time.Now().UTC().Add(time.Hour * -2).Round(time.Second) - atTime3 := time.Now().UTC().Add(time.Hour * -1).Round(time.Second) + tm := time.Date(2020, 1, 1, 12, 0, 0, 0, time.UTC) + + ft := fakeTime{onNow: func(l *time.Location) time.Time { + return tm + }} + + atTime1 := tm.Add(time.Hour * -6).Round(time.Second) + atTime2 := tm.Add(time.Hour * -5).Round(time.Second) + atTime3 := tm.Add(time.Hour * -4).Round(time.Second) s := NewScheduler(time.UTC) + s.time = ft + job, err := s.Week().At(atTime1).At(atTime2).At(atTime3).Every(1).Do(func() {}) require.NoError(t, err) s.StartAsync() @@ -1577,10 +1585,10 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { f func() }{ // Expecting a total of 4 job runs: - // 0s - jobs 1 & 3 run, job 2 hits the limit and is skipped - // 1s - job 1 hits the limit and is skipped - // 2s - job 1 & 2 run - // 3s - job 1 hits the limit and is skipped + // 0ms - 2 jobs are run, the 3rd job hits the limit and is skipped + // 100ms - job 1 hits the limit and is skipped + // 200ms - job 1 & 2 run + // 300ms - jobs 1 & 3 hit the limit and are skipped { "reschedule mode", 2, RescheduleMode, 4, false, func() { @@ -1590,10 +1598,10 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { }, // Expecting a total of 8 job runs. The exact order of jobs may vary, for example: - // 0s - jobs 2 & 3 run, job 1 hits the limit and waits - // 1s - job 1 runs twice, the blocked run and the regularly scheduled run - // 2s - jobs 1 & 3 run - // 3s - jobs 2 & 3 run, job 1 hits the limit and waits + // 0ms - jobs 2 & 3 run, job 1 hits the limit and waits + // 100ms - job 1 runs twice, the blocked run and the regularly scheduled run + // 200ms - jobs 1 & 3 run + // 300ms - jobs 2 & 3 run, job 1 hits the limit and waits { "wait mode", 2, WaitMode, 8, false, func() { @@ -1602,7 +1610,7 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { }, }, - // Same as above - this confirms the same behavior when jobs are removed rather than the scheduler being stopped + //// Same as above - this confirms the same behavior when jobs are removed rather than the scheduler being stopped { "wait mode - with job removal", 2, WaitMode, 8, true, func() { @@ -1643,7 +1651,6 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { s.RemoveByReference(j1) s.RemoveByReference(j2) s.RemoveByReference(j3) - defer s.Stop() } else { s.Stop() } @@ -1660,7 +1667,12 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { } } - assert.Equal(t, tc.expectedRuns, counter) + assert.GreaterOrEqual(t, counter, tc.expectedRuns-2) + assert.LessOrEqual(t, counter, tc.expectedRuns+2) + + if tc.removeJobs { + s.Stop() + } }) } }