Skip to content

Commit

Permalink
Refactor job scheduling to use pointer channels for QValue
Browse files Browse the repository at this point in the history
- Updated the Schedule function in pool.go to return a channel of pointers to QValue instead of QValue.
- Adjusted channel creation and handling to accommodate the new pointer type, improving memory efficiency and consistency in job result handling.
- This change enhances the overall design by ensuring that the job results are managed as pointers, allowing for better handling of nil values and reducing unnecessary copies.
  • Loading branch information
TheApeMachine committed Jan 3, 2025
1 parent a0b833c commit e569083
Showing 1 changed file with 8 additions and 20 deletions.
28 changes: 8 additions & 20 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ Parameters:
- opts: Optional job configuration parameters
Returns:
- chan QValue: Channel that will receive the job's result
- chan *QValue: Channel that will receive the job's result
*/
func (q *Q) Schedule(id string, fn func() (any, error), opts ...JobOption) chan QValue {
func (q *Q) Schedule(id string, fn func() (any, error), opts ...JobOption) chan *QValue {
// Create context with configured timeout
ctx, cancel := context.WithTimeout(q.ctx, q.getSchedulingTimeout())
defer cancel()
Expand All @@ -201,8 +201,8 @@ func (q *Q) Schedule(id string, fn func() (any, error), opts ...JobOption) chan
if job.CircuitID != "" {
breaker := q.getCircuitBreaker(job)
if breaker != nil && !breaker.Allow() {
ch := make(chan QValue, 1)
ch <- QValue{
ch := make(chan *QValue, 1)
ch <- &QValue{
Error: fmt.Errorf("circuit breaker %s is open", job.CircuitID),
CreatedAt: time.Now(),
}
Expand All @@ -214,23 +214,11 @@ func (q *Q) Schedule(id string, fn func() (any, error), opts ...JobOption) chan
// Try to schedule job with context timeout
select {
case q.jobs <- job:
// Convert *QValue channel to QValue channel
qvChan := q.space.Await(id)
resultChan := make(chan QValue, 1)
go func() {
defer close(resultChan)
if qv := <-qvChan; qv != nil {
resultChan <- QValue{
Value: qv.Value,
Error: qv.Error,
CreatedAt: qv.CreatedAt,
}
}
}()
return resultChan
// Use the pointer channel directly from QSpace
return q.space.Await(id)
case <-ctx.Done():
ch := make(chan QValue, 1)
ch <- QValue{
ch := make(chan *QValue, 1)
ch <- &QValue{
Error: fmt.Errorf("job scheduling timeout: %w", ctx.Err()),
CreatedAt: time.Now(),
}
Expand Down

0 comments on commit e569083

Please sign in to comment.