Skip to content

Commit

Permalink
refactor the max concurrent jobs executor func (#456)
Browse files Browse the repository at this point in the history
* refactor the max concurrent jobs executor func

* refactor test

* move executor wait group into start out of new

* don't defer wg.Done()

* variation in number of runs based on compute
  • Loading branch information
JohnRoesler authored Apr 18, 2023
1 parent b77fa0d commit 1db2359
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 75 deletions.
19 changes: 13 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,29 @@
[![CI State](https://github.com/go-co-op/gocron/actions/workflows/go_test.yml/badge.svg?branch=main&event=push)](https://github.com/go-co-op/gocron/actions)
![Go Report Card](https://goreportcard.com/badge/github.com/go-co-op/gocron) [![Go Doc](https://godoc.org/github.com/go-co-op/gocron?status.svg)](https://pkg.go.dev/github.com/go-co-op/gocron)

gocron is a job scheduling package which lets you run Go functions at pre-determined intervals using a simple, human-friendly syntax.
gocron is a job scheduling package which lets you run Go functions at pre-determined intervals
using a simple, human-friendly syntax.

gocron is a Golang scheduler implementation similar to the Ruby module [clockwork](https://github.com/tomykaira/clockwork) and the Python job scheduling package [schedule](https://github.com/dbader/schedule).
gocron is a Golang scheduler implementation similar to the Ruby module
[clockwork](https://github.com/tomykaira/clockwork) and the Python job scheduling package [schedule](https://github.com/dbader/schedule).

See also these two great articles that were used for design input:

- [Rethinking Cron](http://adam.herokuapp.com/past/2010/4/13/rethinking_cron/)
- [Replace Cron with Clockwork](http://adam.herokuapp.com/past/2010/6/30/replace_cron_with_clockwork/)

If you want to chat, you can find us at Slack! [<img src="https://img.shields.io/badge/gophers-gocron-brightgreen?logo=slack">](https://gophers.slack.com/archives/CQ7T0T1FW)
If you want to chat, you can find us at Slack!
[<img src="https://img.shields.io/badge/gophers-gocron-brightgreen?logo=slack">](https://gophers.slack.com/archives/CQ7T0T1FW)

## Concepts

- **Scheduler**: The scheduler tracks all the jobs assigned to it and makes sure they are passed to the executor when ready to be run. The scheduler is able to manage overall aspects of job behavior like limiting how many jobs are running at one time.
- **Job**: The job is simply aware of the task (go function) it's provided and is therefore only able to perform actions related to that task like preventing itself from overruning a previous task that is taking a long time.
- **Executor**: The executor, as it's name suggests, is simply responsible for calling the task (go function) that the job hands to it when sent by the scheduler.
- **Scheduler**: The scheduler tracks all the jobs assigned to it and makes sure they are passed to the executor when
ready to be run. The scheduler is able to manage overall aspects of job behavior like limiting how many jobs
are running at one time.
- **Job**: The job is simply aware of the task (go function) it's provided and is therefore only able to perform
actions related to that task like preventing itself from overruning a previous task that is taking a long time.
- **Executor**: The executor, as it's name suggests, is simply responsible for calling the task (go function) that
the job hands to it when sent by the scheduler.

## Examples

Expand Down
122 changes: 88 additions & 34 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package gocron
import (
"context"
"sync"

"golang.org/x/sync/semaphore"
"sync/atomic"
)

const (
Expand All @@ -25,23 +24,29 @@ const (
)

type executor struct {
jobFunctions chan jobFunction // the chan upon which the jobFunctions are passed in from the scheduler
ctx context.Context // used to tell the executor to stop
cancel context.CancelFunc // used to tell the executor to stop
wg *sync.WaitGroup // used by the scheduler to wait for the executor to stop
jobsWg *sync.WaitGroup // used by the executor to wait for all jobs to finish
singletonWgs *sync.Map // used by the executor to wait for the singleton runners to complete
limitMode limitMode // when SetMaxConcurrentJobs() is set upon the scheduler
maxRunningJobs *semaphore.Weighted
jobFunctions chan jobFunction // the chan upon which the jobFunctions are passed in from the scheduler
ctx context.Context // used to tell the executor to stop
cancel context.CancelFunc // used to tell the executor to stop
wg *sync.WaitGroup // used by the scheduler to wait for the executor to stop
jobsWg *sync.WaitGroup // used by the executor to wait for all jobs to finish
singletonWgs *sync.Map // used by the executor to wait for the singleton runners to complete

limitMode limitMode // when SetMaxConcurrentJobs() is set upon the scheduler
limitModeMaxRunningJobs int // stores the maximum number of concurrently running jobs
limitModeFuncRunning *atomic.Bool // tracks whether the function for handling limited run jobs is running
limitModeQueue []jobFunction // queues the limited jobs for running when able per limit mode
limitModeQueueMu *sync.Mutex // mutex for the queue
limitModeRunningJobs *atomic.Int64 // tracks the count of running jobs to check against the max
}

func newExecutor() executor {
e := executor{
jobFunctions: make(chan jobFunction, 1),
singletonWgs: &sync.Map{},
wg: &sync.WaitGroup{},
jobFunctions: make(chan jobFunction, 1),
singletonWgs: &sync.Map{},
limitModeFuncRunning: &atomic.Bool{},
limitModeQueueMu: &sync.Mutex{},
limitModeRunningJobs: &atomic.Int64{},
}
e.wg.Add(1)
return e
}

Expand Down Expand Up @@ -73,14 +78,71 @@ func (jf *jobFunction) singletonRunner() {
}
}

func (e *executor) limitModeRunner() {
for {
select {
case <-e.ctx.Done():
e.limitModeQueueMu.Lock()
e.limitModeQueue = nil
e.limitModeQueueMu.Unlock()
e.limitModeFuncRunning.Store(false)
return
default:
e.limitModeQueueMu.Lock()
if e.limitModeQueue != nil && len(e.limitModeQueue) > 0 && e.limitModeRunningJobs.Load() < int64(e.limitModeMaxRunningJobs) {
jf := e.limitModeQueue[0]
e.limitModeQueue = e.limitModeQueue[1:]
e.limitModeQueueMu.Unlock()

e.limitModeRunningJobs.Store(e.limitModeRunningJobs.Load() + 1)

select {
case <-jf.ctx.Done():
continue
default:
e.jobsWg.Add(1)
go func() {
runJob(jf)
e.jobsWg.Done()
e.limitModeRunningJobs.Store(e.limitModeRunningJobs.Load() - 1)
}()
}
} else {
e.limitModeQueueMu.Unlock()
}
}
}
}

func (e *executor) start() {
e.wg = &sync.WaitGroup{}
e.wg.Add(1)

stopCtx, cancel := context.WithCancel(context.Background())
e.ctx = stopCtx
e.cancel = cancel

e.jobsWg = &sync.WaitGroup{}

go e.run()
}

func (e *executor) run() {
for {
select {
case f := <-e.jobFunctions:

e.jobsWg.Add(1)
go func() {
defer e.jobsWg.Done()

if e.limitModeMaxRunningJobs > 0 {
if !e.limitModeFuncRunning.Load() {
go e.limitModeRunner()
e.limitModeFuncRunning.Store(true)
}
}

panicHandlerMutex.RLock()
defer panicHandlerMutex.RUnlock()

Expand All @@ -92,28 +154,20 @@ func (e *executor) start() {
}()
}

if e.maxRunningJobs != nil {
if !e.maxRunningJobs.TryAcquire(1) {

switch e.limitMode {
case RescheduleMode:
return
case WaitMode:
select {
case <-e.ctx.Done():
return
case <-f.ctx.Done():
return
default:
}

if err := e.maxRunningJobs.Acquire(f.ctx, 1); err != nil {
break
}
if e.limitModeMaxRunningJobs > 0 {
switch e.limitMode {
case RescheduleMode:
e.limitModeQueueMu.Lock()
if e.limitModeQueue == nil || len(e.limitModeQueue) < e.limitModeMaxRunningJobs {
e.limitModeQueue = append(e.limitModeQueue, f)
}
e.limitModeQueueMu.Unlock()
case WaitMode:
e.limitModeQueueMu.Lock()
e.limitModeQueue = append(e.limitModeQueue, f)
e.limitModeQueueMu.Unlock()
}

defer e.maxRunningJobs.Release(1)
return
}

switch f.runConfig.mode {
Expand Down
13 changes: 2 additions & 11 deletions executor_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gocron

import (
"context"
"sync"
"sync/atomic"
"testing"
Expand All @@ -11,14 +10,10 @@ import (

func Test_ExecutorExecute(t *testing.T) {
e := newExecutor()
stopCtx, cancel := context.WithCancel(context.Background())
e.ctx = stopCtx
e.cancel = cancel
e.jobsWg = &sync.WaitGroup{}

wg := &sync.WaitGroup{}
wg.Add(1)
go e.start()
e.start()

e.jobFunctions <- jobFunction{
name: "test_fn",
Expand Down Expand Up @@ -46,14 +41,10 @@ func Test_ExecutorPanicHandling(t *testing.T) {
SetPanicHandler(handler)

e := newExecutor()
stopCtx, cancel := context.WithCancel(context.Background())
e.ctx = stopCtx
e.cancel = cancel
e.jobsWg = &sync.WaitGroup{}

wg := &sync.WaitGroup{}
wg.Add(1)
go e.start()
e.start()

e.jobFunctions <- jobFunction{
name: "test_fn",
Expand Down
1 change: 1 addition & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ func (j *Job) stop() {
}
if j.cancel != nil {
j.cancel()
j.ctx, j.cancel = context.WithCancel(context.Background())
}
}

Expand Down
13 changes: 3 additions & 10 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/robfig/cron/v3"
"golang.org/x/sync/semaphore"
)

type limitMode int8
Expand Down Expand Up @@ -70,7 +69,7 @@ func NewScheduler(loc *time.Location) *Scheduler {
// SetMaxConcurrentJobs limits how many jobs can be running at the same time.
// This is useful when running resource intensive jobs and a precise start time is not critical.
func (s *Scheduler) SetMaxConcurrentJobs(n int, mode limitMode) {
s.executor.maxRunningJobs = semaphore.NewWeighted(int64(n))
s.executor.limitModeMaxRunningJobs = n
s.executor.limitMode = mode
}

Expand All @@ -93,12 +92,7 @@ func (s *Scheduler) StartAsync() {

// start starts the scheduler, scheduling and running jobs
func (s *Scheduler) start() {
stopCtx, cancel := context.WithCancel(context.Background())
s.executor.ctx = stopCtx
s.executor.cancel = cancel
s.executor.jobsWg = &sync.WaitGroup{}

go s.executor.start()
s.executor.start()
s.setRunning(true)
s.runJobs(s.Jobs())
}
Expand Down Expand Up @@ -595,7 +589,7 @@ func (s *Scheduler) runContinuous(job *Job) {
}
nr := next.dateTime.Sub(s.now())
if nr < 0 {
time.Sleep(absDuration(nr))
job.setLastRun(s.now())
shouldRun, next := s.scheduleNextRun(job)
if !shouldRun {
return
Expand Down Expand Up @@ -1252,7 +1246,6 @@ func (s *Scheduler) Update() (*Job, error) {
}
s.updateJob = false
job.stop()
job.ctx, job.cancel = context.WithCancel(context.Background())
job.setStartsImmediately(false)

if job.runWithDetails {
Expand Down
40 changes: 26 additions & 14 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,19 @@ func TestAt(t *testing.T) {
})

t.Run("Week() and multiple At() times, all in past", func(t *testing.T) {
atTime1 := time.Now().UTC().Add(time.Hour * -3).Round(time.Second)
atTime2 := time.Now().UTC().Add(time.Hour * -2).Round(time.Second)
atTime3 := time.Now().UTC().Add(time.Hour * -1).Round(time.Second)
tm := time.Date(2020, 1, 1, 12, 0, 0, 0, time.UTC)

ft := fakeTime{onNow: func(l *time.Location) time.Time {
return tm
}}

atTime1 := tm.Add(time.Hour * -6).Round(time.Second)
atTime2 := tm.Add(time.Hour * -5).Round(time.Second)
atTime3 := tm.Add(time.Hour * -4).Round(time.Second)

s := NewScheduler(time.UTC)
s.time = ft

job, err := s.Week().At(atTime1).At(atTime2).At(atTime3).Every(1).Do(func() {})
require.NoError(t, err)
s.StartAsync()
Expand Down Expand Up @@ -1577,10 +1585,10 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
f func()
}{
// Expecting a total of 4 job runs:
// 0s - jobs 1 & 3 run, job 2 hits the limit and is skipped
// 1s - job 1 hits the limit and is skipped
// 2s - job 1 & 2 run
// 3s - job 1 hits the limit and is skipped
// 0ms - 2 jobs are run, the 3rd job hits the limit and is skipped
// 100ms - job 1 hits the limit and is skipped
// 200ms - job 1 & 2 run
// 300ms - jobs 1 & 3 hit the limit and are skipped
{
"reschedule mode", 2, RescheduleMode, 4, false,
func() {
Expand All @@ -1590,10 +1598,10 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
},

// Expecting a total of 8 job runs. The exact order of jobs may vary, for example:
// 0s - jobs 2 & 3 run, job 1 hits the limit and waits
// 1s - job 1 runs twice, the blocked run and the regularly scheduled run
// 2s - jobs 1 & 3 run
// 3s - jobs 2 & 3 run, job 1 hits the limit and waits
// 0ms - jobs 2 & 3 run, job 1 hits the limit and waits
// 100ms - job 1 runs twice, the blocked run and the regularly scheduled run
// 200ms - jobs 1 & 3 run
// 300ms - jobs 2 & 3 run, job 1 hits the limit and waits
{
"wait mode", 2, WaitMode, 8, false,
func() {
Expand All @@ -1602,7 +1610,7 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
},
},

// Same as above - this confirms the same behavior when jobs are removed rather than the scheduler being stopped
//// Same as above - this confirms the same behavior when jobs are removed rather than the scheduler being stopped
{
"wait mode - with job removal", 2, WaitMode, 8, true,
func() {
Expand Down Expand Up @@ -1643,7 +1651,6 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
s.RemoveByReference(j1)
s.RemoveByReference(j2)
s.RemoveByReference(j3)
defer s.Stop()
} else {
s.Stop()
}
Expand All @@ -1660,7 +1667,12 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
}
}

assert.Equal(t, tc.expectedRuns, counter)
assert.GreaterOrEqual(t, counter, tc.expectedRuns-2)
assert.LessOrEqual(t, counter, tc.expectedRuns+2)

if tc.removeJobs {
s.Stop()
}
})
}
}
Expand Down

0 comments on commit 1db2359

Please sign in to comment.