From e569083866e62424a1b68e3d1ae06afe48401fd8 Mon Sep 17 00:00:00 2001 From: Daniel Owen van Dommelen Date: Fri, 3 Jan 2025 05:37:08 +0100 Subject: [PATCH] Refactor job scheduling to use pointer channels for QValue - Updated the Schedule function in pool.go to return a channel of pointers to QValue instead of QValue. - Adjusted channel creation and handling to accommodate the new pointer type, improving memory efficiency and consistency in job result handling. - This change enhances the overall design by ensuring that the job results are managed as pointers, allowing for better handling of nil values and reducing unnecessary copies. --- pool.go | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/pool.go b/pool.go index 2b999c7..14b25c6 100644 --- a/pool.go +++ b/pool.go @@ -173,9 +173,9 @@ Parameters: - opts: Optional job configuration parameters Returns: - - chan QValue: Channel that will receive the job's result + - chan *QValue: Channel that will receive the job's result */ -func (q *Q) Schedule(id string, fn func() (any, error), opts ...JobOption) chan QValue { +func (q *Q) Schedule(id string, fn func() (any, error), opts ...JobOption) chan *QValue { // Create context with configured timeout ctx, cancel := context.WithTimeout(q.ctx, q.getSchedulingTimeout()) defer cancel() @@ -201,8 +201,8 @@ func (q *Q) Schedule(id string, fn func() (any, error), opts ...JobOption) chan if job.CircuitID != "" { breaker := q.getCircuitBreaker(job) if breaker != nil && !breaker.Allow() { - ch := make(chan QValue, 1) - ch <- QValue{ + ch := make(chan *QValue, 1) + ch <- &QValue{ Error: fmt.Errorf("circuit breaker %s is open", job.CircuitID), CreatedAt: time.Now(), } @@ -214,23 +214,11 @@ func (q *Q) Schedule(id string, fn func() (any, error), opts ...JobOption) chan // Try to schedule job with context timeout select { case q.jobs <- job: - // Convert *QValue channel to QValue channel - qvChan := q.space.Await(id) - resultChan := make(chan QValue, 1) - go func() { - defer close(resultChan) - if qv := <-qvChan; qv != nil { - resultChan <- QValue{ - Value: qv.Value, - Error: qv.Error, - CreatedAt: qv.CreatedAt, - } - } - }() - return resultChan + // Use the pointer channel directly from QSpace + return q.space.Await(id) case <-ctx.Done(): - ch := make(chan QValue, 1) - ch <- QValue{ + ch := make(chan *QValue, 1) + ch <- &QValue{ Error: fmt.Errorf("job scheduling timeout: %w", ctx.Err()), CreatedAt: time.Now(), }