Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow scheduler functions pertaining to a job to be called in any order #453

Merged
merged 1 commit into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 18 additions & 43 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,17 @@ type Scheduler struct {
updateJob bool // so the scheduler knows to create a new job or update the current
waitForInterval bool // defaults jobs to waiting for first interval to start
singletonMode bool // defaults all jobs to use SingletonMode()
jobCreated bool // so the scheduler knows a job was created prior to calling Every or Cron

startBlockingStopChanMutex sync.Mutex
startBlockingStopChan chan struct{} // stops the scheduler

// tracks whether we're in a chain of scheduling methods for a job
// a chain is started with any of the scheduler methods that operate
// upon a job and are ended with one of [ Do(), Update() ] - note that
// Update() calls Do(), so really they all end with Do().
// This allows the caller to begin with any job related scheduler method
// and only with one of [ Every(), EveryRandom(), Cron(), CronWithSeconds(), MonthFirstWeekday() ]
inScheduleChain bool
}

// days in a week
Expand Down Expand Up @@ -497,22 +504,9 @@ func (s *Scheduler) NextRun() (*Job, time.Time) {
// The default unit is Seconds(). Call a different unit in the chain
// if you would like to change that. For example, Minutes(), Hours(), etc.
func (s *Scheduler) EveryRandom(lower, upper int) *Scheduler {
job := s.newJob(0)
if s.updateJob || s.jobCreated {
job = s.getCurrentJob()
}
job := s.getCurrentJob()

job.setRandomInterval(lower, upper)

if s.updateJob || s.jobCreated {
s.setJobs(append(s.Jobs()[:len(s.Jobs())-1], job))
if s.jobCreated {
s.jobCreated = false
}
} else {
s.setJobs(append(s.Jobs(), job))
}

return s
}

Expand All @@ -521,10 +515,7 @@ func (s *Scheduler) EveryRandom(lower, upper int) *Scheduler {
// parses with time.ParseDuration().
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
func (s *Scheduler) Every(interval any) *Scheduler {
job := s.newJob(0)
if s.updateJob || s.jobCreated {
job = s.getCurrentJob()
}
job := s.getCurrentJob()

switch interval := interval.(type) {
case int:
Expand All @@ -547,15 +538,6 @@ func (s *Scheduler) Every(interval any) *Scheduler {
job.error = wrapOrError(job.error, ErrInvalidIntervalType)
}

if s.updateJob || s.jobCreated {
s.setJobs(append(s.Jobs()[:len(s.Jobs())-1], job))
if s.jobCreated {
s.jobCreated = false
}
} else {
s.setJobs(append(s.Jobs(), job))
}

return s
}

Expand Down Expand Up @@ -871,6 +853,7 @@ func (s *Scheduler) stopJobs(jobs []*Job) {

func (s *Scheduler) doCommon(jobFun any, params ...any) (*Job, error) {
job := s.getCurrentJob()
s.inScheduleChain = false

jobUnit := job.getUnit()
jobLastRun := job.LastRun()
Expand Down Expand Up @@ -1207,17 +1190,17 @@ func (s *Scheduler) Sunday() *Scheduler {
}

func (s *Scheduler) getCurrentJob() *Job {

if len(s.Jobs()) == 0 {
s.setJobs([]*Job{s.newJob(0)})
s.jobCreated = true
if !s.inScheduleChain {
s.jobsMutex.Lock()
s.jobs = append(s.jobs, s.newJob(0))
s.jobsMutex.Unlock()
s.inScheduleChain = true
}

s.jobsMutex.RLock()
defer s.jobsMutex.RUnlock()

return s.jobs[len(s.jobs)-1]

}

func (s *Scheduler) now() time.Time {
Expand All @@ -1243,6 +1226,7 @@ func (s *Scheduler) Job(j *Job) *Scheduler {
s.Swap(len(jobs)-1, index)
}
}
s.inScheduleChain = true
s.updateJob = true
return s
}
Expand Down Expand Up @@ -1281,10 +1265,7 @@ func (s *Scheduler) CronWithSeconds(cronExpression string) *Scheduler {
}

func (s *Scheduler) cron(cronExpression string, withSeconds bool) *Scheduler {
job := s.newJob(0)
if s.updateJob || s.jobCreated {
job = s.getCurrentJob()
}
job := s.getCurrentJob()

var withLocation string
if strings.HasPrefix(cronExpression, "TZ=") || strings.HasPrefix(cronExpression, "CRON_TZ=") {
Expand Down Expand Up @@ -1313,12 +1294,6 @@ func (s *Scheduler) cron(cronExpression string, withSeconds bool) *Scheduler {
job.setUnit(crontab)
job.startsImmediately = false

if s.updateJob || s.jobCreated {
s.setJobs(append(s.Jobs()[:len(s.Jobs())-1], job))
s.jobCreated = false
} else {
s.setJobs(append(s.Jobs(), job))
}
return s
}

Expand Down
163 changes: 102 additions & 61 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func TestAt(t *testing.T) {
atTime3 := time.Now().UTC().Add(time.Hour * -1).Round(time.Second)

s := NewScheduler(time.UTC)
job, err := s.Every(1).Week().At(atTime1).At(atTime2).At(atTime3).Do(func() {})
job, err := s.Week().At(atTime1).At(atTime2).At(atTime3).Every(1).Do(func() {})
require.NoError(t, err)
s.StartAsync()

Expand Down Expand Up @@ -512,11 +512,11 @@ func TestScheduler_Remove(t *testing.T) {
t.Run("remove from non-running", func(t *testing.T) {
s := NewScheduler(time.UTC)
s.TagsUnique()
_, err := s.Every(1).Minute().Tag("tag1").Do(task)
_, err := s.Minute().Tag("tag1").Every(1).Do(task)
require.NoError(t, err)
_, err = s.Every(1).Minute().Do(taskWithParams, 1, "hello")
require.NoError(t, err)
_, err = s.Every(1).Minute().Do(task)
_, err = s.Minute().Every(1).Do(task)
require.NoError(t, err)

assert.Equal(t, 3, s.Len(), "Incorrect number of jobs")
Expand Down Expand Up @@ -1732,17 +1732,17 @@ func TestScheduler_Job(t *testing.T) {

j1, err := s.Every("1s").Do(func() {})
require.NoError(t, err)
assert.Equal(t, j1, s.getCurrentJob())
assert.Equal(t, j1, s.jobs[len(s.jobs)-1])

j2, err := s.Every("1s").Do(func() {})
require.NoError(t, err)
assert.Equal(t, j2, s.getCurrentJob())
assert.Equal(t, j2, s.jobs[len(s.jobs)-1])

s.Job(j1)
assert.Equal(t, j1, s.getCurrentJob())
assert.Equal(t, j1, s.jobs[len(s.jobs)-1])

s.Job(j2)
assert.Equal(t, j2, s.getCurrentJob())
assert.Equal(t, j2, s.jobs[len(s.jobs)-1])
}

func TestScheduler_Update(t *testing.T) {
Expand Down Expand Up @@ -2038,10 +2038,18 @@ func TestScheduler_WaitForSchedules(t *testing.T) {
var counterMutex sync.RWMutex
counter := 0

_, err := s.Every("1s").Do(func() { counterMutex.Lock(); defer counterMutex.Unlock(); counter++ })
_, err := s.Every("1s").Do(func() {
counterMutex.Lock()
defer counterMutex.Unlock()
counter++
})
require.NoError(t, err)

_, err = s.CronWithSeconds("*/1 * * * * *").Do(func() { counterMutex.Lock(); defer counterMutex.Unlock(); counter++ })
_, err = s.CronWithSeconds("*/1 * * * * *").Do(func() {
counterMutex.Lock()
defer counterMutex.Unlock()
counter++
})
require.NoError(t, err)
s.StartAsync()

Expand Down Expand Up @@ -2433,65 +2441,98 @@ func TestScheduler_DoWithJobDetails(t *testing.T) {
})
}

func TestScheduler_GetAllTags(t *testing.T) {
t.Run("tags unique", func(t *testing.T) {
testCases := []struct {
description string
tags []string
expected []string
}{
{"no tags", []string{}, nil},
{"one tag", []string{"tag1"}, []string{"tag1"}},
{"two tags", []string{"tag1", "tag2"}, []string{"tag1", "tag2"}},
{"two tags with duplicates", []string{"tag1", "tag2", "tag1"}, []string{"tag1", "tag2"}},
}
func TestScheduler_GetAllTags_Unique(t *testing.T) {
testCases := []struct {
description string
tags []string
expected []string
expectedError error
}{
{"unique: no tags", []string{}, nil, nil},
{"unique: one tag", []string{"tag1"}, []string{"tag1"}, nil},
{"unique: two tags", []string{"tag1", "tag2"}, []string{"tag1", "tag2"}, nil},
{"unique: two tags with duplicates", []string{"tag1", "tag2", "tag1"}, nil, ErrTagsUnique("tag1")},
}

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
s := NewScheduler(time.UTC)
s.TagsUnique()
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
s := NewScheduler(time.UTC)
s.TagsUnique()

for _, tag := range tc.tags {
_, err := s.Tag(tag).Every("100ms").Do(func() {})
require.NoError(t, err)
}
_, err := s.Tag(tc.tags...).Every("100ms").Do(func() {})
if tc.expectedError == nil {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, tc.expectedError.Error())
}

tags := s.GetAllTags()
sort.Strings(tc.expected)
sort.Strings(tags)
tags := s.GetAllTags()
sort.Strings(tc.expected)
sort.Strings(tags)

assert.Equal(t, tc.expected, tags)
})
}
})
assert.Equal(t, tc.expected, tags)
})
}
}

t.Run("tags not unique", func(t *testing.T) {
testCases := []struct {
description string
tags []string
expected []string
}{
{"no tags", []string{}, nil},
{"one tag", []string{"tag1"}, []string{"tag1"}},
{"two tags", []string{"tag1", "tag2"}, []string{"tag1", "tag2"}},
{"two tags with duplicates", []string{"tag1", "tag2", "tag1"}, []string{"tag1", "tag2", "tag1"}},
}
func TestScheduler_GetAllTags_NotUnique(t *testing.T) {
testCases := []struct {
description string
tags []string
expected []string
}{
{"no tags", []string{}, nil},
{"one tag", []string{"tag1"}, []string{"tag1"}},
{"two tags", []string{"tag1", "tag2"}, []string{"tag1", "tag2"}},
{"two tags with duplicates", []string{"tag1", "tag2", "tag1"}, []string{"tag1", "tag2", "tag1"}},
}

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
s := NewScheduler(time.UTC)
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
s := NewScheduler(time.UTC)

for _, tag := range tc.tags {
_, err := s.Tag(tag).Every("100ms").Do(func() {})
require.NoError(t, err)
}
_, err := s.Tag(tc.tags...).Every("100ms").Do(func() {})
require.NoError(t, err)

tags := s.GetAllTags()
sort.Strings(tc.expected)
sort.Strings(tags)
tags := s.GetAllTags()
sort.Strings(tc.expected)
sort.Strings(tags)

assert.Equal(t, tc.expected, tags)
})
}
}

func TestScheduler_ChainOrder(t *testing.T) {
s := NewScheduler(time.UTC)

func1 := func() { panic("func 1 not implemented") }
func2 := func() { panic("func 2 not implemented") }
func3 := func() { panic("func 3 not implemented") }

funcs := []any{func1, func2, func3}

_, err := s.Tag("1").SingletonMode().Milliseconds().EveryRandom(100, 200).Do(func1)
require.NoError(t, err)

_, err = s.Monday().Every(4).Tag("2").Do(func2)
require.NoError(t, err)

_, err = s.Months(1).Tag("3").Every(1).SingletonMode().At("1:00").Do(func3)
require.NoError(t, err)

require.Len(t, s.jobs, 3)
for i, j := range s.jobs {
assert.Equal(t, fmt.Sprint(funcs[i]), fmt.Sprint(j.function))
}

err = s.RemoveByTag("2")
require.NoError(t, err)

require.Len(t, s.jobs, 2)
funcs = append(funcs[:1], funcs[2])
for i, j := range s.jobs {
assert.Equal(t, fmt.Sprint(funcs[i]), fmt.Sprint(j.function))
}

assert.Equal(t, tc.expected, tags)
})
}
})
}