Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact: rewriting scheduler logic #54

Merged
merged 15 commits into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Job struct {
err error // error related to Job
lastRun time.Time // datetime of last run
nextRun time.Time // datetime of next run
startDay time.Weekday // Specific day of the week to start on
scheduledWeekday *time.Weekday // Specific day of the week to start on
dayOfTheMonth int // Specific day of the month to run the job
funcs map[string]interface{} // Map for the function task store
fparams map[string][]interface{} // Map for function and params of function
Expand All @@ -26,12 +26,10 @@ type Job struct {

// NewJob creates a new Job with the provided interval
func NewJob(interval uint64) *Job {
th := newTimeWrapper()
return &Job{
interval: interval,
lastRun: th.Unix(0, 0),
nextRun: th.Unix(0, 0),
startDay: time.Sunday,
lastRun: time.Time{},
nextRun: time.Time{},
funcs: make(map[string]interface{}),
fparams: make(map[string][]interface{}),
tags: []string{},
Expand All @@ -43,6 +41,10 @@ func (j *Job) run() {
callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc])
}

func (j Job) neverRan() bool {
return j.lastRun.IsZero()
}

// Err returns an error if one ocurred while creating the Job
func (j *Job) Err() error {
return j.err
Expand Down Expand Up @@ -110,7 +112,7 @@ func (j *Job) ScheduledAtTime() string {
// will return an error if the Job is not scheduled weekly
func (j *Job) Weekday() (time.Weekday, error) {
if j.unit == weeks {
return j.startDay, nil
return *j.scheduledWeekday, nil
}
return time.Sunday, ErrNotScheduledWeekday
}
127 changes: 89 additions & 38 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (s *Scheduler) StartAsync() chan struct{} {
}
s.running = true

s.scheduleAllJobs()
ticker := s.time.NewTicker(1 * time.Second)
go func() {
for {
Expand Down Expand Up @@ -88,38 +89,97 @@ func (s *Scheduler) ChangeLocation(newLocation *time.Location) {
func (s *Scheduler) scheduleNextRun(j *Job) error {
now := s.time.Now(s.loc)

var delta time.Time
if j.neverRan() {
if !j.nextRun.IsZero() { // scheduled for future run, wait to run at least once
return nil
}
delta = now
} else {
delta = j.lastRun
}

switch j.unit {
case seconds, minutes, hours:
j.nextRun = j.lastRun.Add(j.periodDuration)
if j.neverRan() && j.atTime != 0 && s.shouldRunToday(delta, j) { // ugly. in order to avoid this we could prohibit setting .At() and allowing only .StartAt() when dealing with Duration types
j.nextRun = s.roundToMidnight(delta.Add(j.periodDuration)).Add(j.atTime)
return nil
}
j.nextRun = delta.Add(j.periodDuration)
case days:
j.nextRun = s.roundToMidnight(j.lastRun)
j.nextRun = j.nextRun.Add(j.atTime).Add(j.periodDuration)
if s.shouldRunToday(now, j) {
j.nextRun = s.roundToMidnight(delta).Add(j.atTime)
return nil
}
delta = s.roundToMidnight(delta)
j.nextRun = delta.AddDate(0, 0, int(j.interval)).Add(j.atTime)
case weeks:
j.nextRun = s.roundToMidnight(j.lastRun)
dayDiff := int(j.startDay)
dayDiff -= int(j.nextRun.Weekday())
if dayDiff != 0 {
j.nextRun = j.nextRun.Add(time.Duration(dayDiff) * 24 * time.Hour)
days := int(j.interval) * 7
if j.scheduledWeekday != nil { // Every().Monday(), for example
days = s.calculateWeekday(now, j)
}
j.nextRun = j.nextRun.Add(j.atTime)
delta = s.roundToMidnight(delta)
j.nextRun = delta.AddDate(0, 0, days).Add(j.atTime)
case months:
increment := j.lastRun.Month() + time.Month(j.interval)
nextMonth := increment % 12
year := j.lastRun.Year() + int(increment/12)
j.nextRun = time.Date(year, nextMonth, j.dayOfTheMonth, 0, 0, 0, 0, s.loc).Add(j.atTime)
delta = s.roundToMidnight(delta)
j.nextRun = delta.AddDate(0, int(j.interval), 0).Add(j.atTime)
}

// advance to next possible Schedule
for j.nextRun.Before(now) || j.nextRun.Before(j.lastRun) {
j.nextRun = j.nextRun.Add(j.periodDuration)
return nil
}

func (s *Scheduler) calculateWeekday(now time.Time, j *Job) int {
remainingDaysToWeekday := remainingDaysToWeekday(now.Weekday(), *j.scheduledWeekday)
if j.neverRan() || j.startsImmediately {
if j.startsImmediately {
j.startsImmediately = false
}
return s.calculateFirstWeekday(now, remainingDaysToWeekday, j)
}

return nil
return int(j.interval) * 7
}

func (s *Scheduler) calculateFirstWeekday(now time.Time, daysToWeekday int, j *Job) int {
if daysToWeekday < 0 { // negative means next weekday is on next week
return daysToWeekday + int(j.interval)*7
}

// following path means weekday is somewhere within this week

if j.interval > 1 { // more than 1 week apart
return daysToWeekday + int(j.interval-1)*7 // skip a week since there's already an upcoming requested weekday
}

// interval is one; should run within this week
if daysToWeekday > 0 {
return daysToWeekday
}

// today or in 7 days
atJobTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, s.loc).Add(j.atTime)
if now.Before(atJobTime) {
return 0
}
return 7
}

func (s *Scheduler) shouldRunToday(now time.Time, job *Job) bool {
atTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, s.loc).Add(job.atTime)
return now.Before(atTime)
}

func remainingDaysToWeekday(from time.Weekday, to time.Weekday) int {
daysUntilScheduledDay := int(to) - int(from)
if daysUntilScheduledDay < 0 {
daysUntilScheduledDay += 7
}
return daysUntilScheduledDay
}

// roundToMidnight truncate time to midnight
func (s *Scheduler) roundToMidnight(t time.Time) time.Time {
return s.time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, s.loc)
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, s.loc)
}

// Get the current runnable Jobs, which shouldRun is True
Expand Down Expand Up @@ -154,8 +214,7 @@ func (s *Scheduler) Every(interval uint64) *Scheduler {

// RunPending runs all the Jobs that are scheduled to run.
func (s *Scheduler) RunPending() {
runnableJobs := s.runnableJobs()
for _, job := range runnableJobs {
for _, job := range s.runnableJobs() {
s.runAndReschedule(job) // we should handle this error somehow
}
}
Expand Down Expand Up @@ -306,23 +365,6 @@ func (s *Scheduler) Do(jobFun interface{}, params ...interface{}) (*Job, error)
}
}

if !j.startsImmediately {

if j.lastRun == s.time.Unix(0, 0) {
Copy link
Member Author

@Streppel Streppel Aug 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a workaround that had impact on the job scheduling logic and got removed

j.lastRun = s.time.Now(s.loc)

if j.atTime != 0 {
j.lastRun = j.lastRun.Add(-j.periodDuration)
}
}

if j.nextRun == s.time.Unix(0, 0) {
if err := s.scheduleNextRun(j); err != nil {
return nil, err
}
}
}

return j, nil
}

Expand Down Expand Up @@ -362,6 +404,9 @@ func (s *Scheduler) StartImmediately() *Scheduler {

// shouldRun returns true if the Job should be run now
func (s *Scheduler) shouldRun(j *Job) bool {
if j.neverRan() && j.startsImmediately {
return true
}
return s.time.Now(s.loc).Unix() >= j.nextRun.Unix()
}

Expand Down Expand Up @@ -447,7 +492,7 @@ func (s *Scheduler) Months(dayOfTheMonth int) *Scheduler {

// Weekday sets the start with a specific weekday weekday
func (s *Scheduler) Weekday(startDay time.Weekday) *Scheduler {
s.getCurrentJob().startDay = startDay
s.getCurrentJob().scheduledWeekday = &startDay
s.setUnit(weeks)
return s
}
Expand Down Expand Up @@ -496,3 +541,9 @@ func (s *Scheduler) Lock() *Scheduler {
s.getCurrentJob().lock = true
return s
}

func (s *Scheduler) scheduleAllJobs() {
for _, j := range s.jobs {
s.scheduleNextRun(j)
}
}
Loading