Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Owen van Dommelen committed Jan 25, 2025
1 parent 205b5c0 commit e879134
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 6 deletions.
Binary file added .DS_Store
Binary file not shown.
4 changes: 2 additions & 2 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (q *Q) Schedule(id string, fn func() (any, error), opts ...JobOption) chan
if breaker != nil && !breaker.Allow() {
ch := make(chan *QValue, 1)
ch <- &QValue{
Error: errnie.Error(fmt.Errorf("circuit breaker %s is open", job.CircuitID)),
Error: fmt.Errorf("circuit breaker %s is open", job.CircuitID),
CreatedAt: time.Now(),
}
close(ch)
Expand All @@ -220,7 +220,7 @@ func (q *Q) Schedule(id string, fn func() (any, error), opts ...JobOption) chan
case <-ctx.Done():
ch := make(chan *QValue, 1)
ch <- &QValue{
Error: errnie.Error(fmt.Errorf("job scheduling timeout: %w", ctx.Err())),
Error: fmt.Errorf("job scheduling timeout: %w", ctx.Err()),
CreatedAt: time.Now(),
}
close(ch)
Expand Down
7 changes: 3 additions & 4 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (w *Worker) run() {
// Handle result
if err != nil {
w.pool.metrics.RecordJobFailure()
errnie.Error(fmt.Errorf("Job %s failed: %v", job.ID, err))
// Store error result
w.pool.space.StoreError(job.ID, err, job.TTL)
} else {
Expand Down Expand Up @@ -106,7 +105,7 @@ func (w *Worker) processJobWithTimeout(ctx context.Context, job Job) (any, error
select {
case <-ctx.Done():
w.pool.metrics.RecordJobFailure()
return nil, errnie.Error(fmt.Errorf("job %s timed out", job.ID))
return nil, fmt.Errorf("job %s timed out", job.ID)
case <-done:
w.pool.metrics.RecordJobExecution(startTime, err == nil)
return result, err
Expand Down Expand Up @@ -171,7 +170,7 @@ func (w *Worker) checkSingleDependency(depID string, retryPolicy *RetryPolicy) e
}
w.pool.space.mu.Unlock()

return errnie.Error(fmt.Errorf("dependency %s failed after %d attempts", depID, maxAttempts))
return fmt.Errorf("dependency %s failed after %d attempts", depID, maxAttempts)
}

// recordFailure records a failure for a specific circuit breaker
Expand All @@ -192,5 +191,5 @@ func (w *Worker) recordFailure(circuitID string) {
// Add this method to the Worker struct
func (w *Worker) handleJobTimeout(job Job) error {
w.pool.metrics.RecordJobFailure()
return errnie.Error(fmt.Errorf("job %s timed out", job.ID))
return fmt.Errorf("job %s timed out", job.ID)
}

0 comments on commit e879134

Please sign in to comment.