Skip to content

Commit

Permalink
feat(eventListener): add BeforeJobRunsSkipIfBeforeFuncErrors()
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Morelly authored and FalcoSuessgott committed Jan 8, 2025
1 parent abfd9e5 commit 898762c
Showing 5 changed files with 42 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/file_formatting.yml
Original file line number Diff line number Diff line change
@@ -16,4 +16,4 @@ jobs:
uses: actions/checkout@v4
- name: verify example_test.go
run: |
grep "^func " example_test.go | sort -c
grep "^func [a-z-A-Z]" example_test.go | sort -c
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ fmt:
@go list -f {{.Dir}} ./... | xargs -I{} gofmt -w -s {}

lint:
@grep "^func " example_test.go | sort -c
@grep "^func [a-zA-Z]" example_test.go | sort -c
@golangci-lint run

test:
14 changes: 13 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
@@ -389,16 +389,28 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
}
defer func() { _ = lock.Unlock(j.ctx) }()
}

_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)

err := callJobFuncWithParams(j.beforeJobRunsSkipIfBeforeFuncErrors, j.id, j.name)
if err != nil {
e.sendOutForRescheduling(&jIn)

select {
case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done():
}

return
}

e.sendOutForRescheduling(&jIn)
select {
case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done():
}

startTime := time.Now()
var err error
if j.afterJobRunsWithPanic != nil {
err = e.callJobWithRecover(j)
} else {
26 changes: 20 additions & 6 deletions job.go
Original file line number Diff line number Diff line change
@@ -40,12 +40,13 @@ type internalJob struct {
startImmediately bool
stopTime time.Time
// event listeners
afterJobRuns func(jobID uuid.UUID, jobName string)
beforeJobRuns func(jobID uuid.UUID, jobName string)
afterJobRunsWithError func(jobID uuid.UUID, jobName string, err error)
afterJobRunsWithPanic func(jobID uuid.UUID, jobName string, recoverData any)
afterLockError func(jobID uuid.UUID, jobName string, err error)
disabledLocker bool
afterJobRuns func(jobID uuid.UUID, jobName string)
beforeJobRuns func(jobID uuid.UUID, jobName string)
beforeJobRunsSkipIfBeforeFuncErrors func(jobID uuid.UUID, jobName string) error
afterJobRunsWithError func(jobID uuid.UUID, jobName string, err error)
afterJobRunsWithPanic func(jobID uuid.UUID, jobName string, recoverData any)
afterLockError func(jobID uuid.UUID, jobName string, err error)
disabledLocker bool

locker Locker
}
@@ -724,6 +725,19 @@ func BeforeJobRuns(eventListenerFunc func(jobID uuid.UUID, jobName string)) Even
}
}

// BeforeJobRunsSkipIfBeforeFuncErrors is used to listen for when a job is about to run and
// then runs the provided function. If the provided function returns an error, the job will be
// rescheduled and the current run will be skipped.
func BeforeJobRunsSkipIfBeforeFuncErrors(eventListenerFunc func(jobID uuid.UUID, jobName string) error) EventListener {
return func(j *internalJob) error {
if eventListenerFunc == nil {
return ErrEventListenerFuncNil
}
j.beforeJobRunsSkipIfBeforeFuncErrors = eventListenerFunc
return nil
}
}

// AfterJobRuns is used to listen for when a job has run
// without an error, and then run the provided function.
func AfterJobRuns(eventListenerFunc func(jobID uuid.UUID, jobName string)) EventListener {
7 changes: 7 additions & 0 deletions job_test.go
Original file line number Diff line number Diff line change
@@ -482,6 +482,13 @@ func TestWithEventListeners(t *testing.T) {
},
ErrEventListenerFuncNil,
},
{
"nil before job runs error listener",
[]EventListener{
BeforeJobRunsSkipIfBeforeFuncErrors(nil),
},
ErrEventListenerFuncNil,
},
}

for _, tt := range tests {

0 comments on commit 898762c

Please sign in to comment.