From e0d49584d155d5ea8d56042ed7286087d67894e2 Mon Sep 17 00:00:00 2001 From: Dmitry Shalashov Date: Wed, 10 Aug 2022 03:03:16 +0300 Subject: [PATCH 1/3] refactor: use BLPOP instead of LPOP+sleep --- README.md | 1 + poller.go | 22 ++++++---------------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index c4d5790..b8750d7 100644 --- a/README.md +++ b/README.md @@ -148,6 +148,7 @@ func init() { ExitOnComplete: false, Concurrency: 2, Namespace: "resque:", + // timeout for BLPOP in seconds IntervalFloat: 5.0, } goworker.SetSettings(settings) diff --git a/poller.go b/poller.go index 006d58b..367db3c 100644 --- a/poller.go +++ b/poller.go @@ -26,11 +26,11 @@ func newPoller(queues []string, isStrict bool) (*poller, error) { }, nil } -func (p *poller) getJob(c *redis.Client) (*Job, error) { +func (p *poller) getJob(c *redis.Client, interval time.Duration) (*Job, error) { for _, queue := range p.queues(p.isStrict) { logger.Debugf("Checking %s", queue) - result, err := c.LPop(ctx, fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue)).Result() + results, err := c.BLPop(ctx, interval, fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue)).Result() if err != nil { // no jobs for now, continue on another queue if err == redis.Nil { @@ -38,6 +38,7 @@ func (p *poller) getJob(c *redis.Client) (*Job, error) { } return nil, err } + result := results[0] if result != "" { logger.Debugf("Found job on %s", queue) @@ -94,7 +95,7 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) (<-chan *Job, er case <-quit: return default: - job, err := p.getJob(client) + job, err := p.getJob(client, interval) if err != nil { err = errors.WithStack(err) _ = logger.Criticalf("Error on %v getting job from %v: %+v", p, p.Queues, err) @@ -127,19 +128,8 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) (<-chan *Job, er return } - } else { - if workerSettings.ExitOnComplete { - return - } - logger.Debugf("Sleeping for %v", interval) - logger.Debugf("Waiting for %v", p.Queues) - - timeout := time.After(interval) - select { - case <-quit: - return - case <-timeout: - } + } else if workerSettings.ExitOnComplete { + return } } } From c000ccb20c39e0e7cd4cef6fd6e4dc04bff9b89c Mon Sep 17 00:00:00 2001 From: Dmitry Shalashov Date: Wed, 10 Aug 2022 03:23:51 +0300 Subject: [PATCH 2/3] fix: fixed the tests (in more than one way) and code itself due to changed POP semantics --- poller.go | 14 +++++++++++--- worker_test.go | 4 ++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/poller.go b/poller.go index 367db3c..74a1c10 100644 --- a/poller.go +++ b/poller.go @@ -10,6 +10,10 @@ import ( "github.com/pkg/errors" ) +var ( + errorUnexpectedJob = errors.New("code expects exactly two items - queue name and task json") +) + type poller struct { process isStrict bool @@ -38,13 +42,17 @@ func (p *poller) getJob(c *redis.Client, interval time.Duration) (*Job, error) { } return nil, err } - result := results[0] - if result != "" { + if len(results) > 0 { + if len(results) != 2 { + return nil, errorUnexpectedJob + } + // at 0 index we have a queue name + task := results[1] logger.Debugf("Found job on %s", queue) job := &Job{Queue: queue} - decoder := json.NewDecoder(bytes.NewReader([]byte(result))) + decoder := json.NewDecoder(bytes.NewReader([]byte(task))) if workerSettings.UseNumber { decoder.UseNumber() } diff --git a/worker_test.go b/worker_test.go index 1c13193..f53cec7 100644 --- a/worker_test.go +++ b/worker_test.go @@ -86,9 +86,9 @@ func TestEnqueue(t *testing.T) { t.Errorf("(Enqueue) Failed on work %s", err) } if !reflect.DeepEqual(actualArgs, expectedArgs) { - t.Errorf("(Enqueue) Expected %v, actual %v", actualArgs, expectedArgs) + t.Errorf("(Enqueue) Expected %v, actual %v", expectedArgs, actualArgs) } if !reflect.DeepEqual(actualQueueName, queueName) { - t.Errorf("(Enqueue) Expected %v, actual %v", actualQueueName, queueName) + t.Errorf("(Enqueue) Expected %v, actual %v", queueName, actualQueueName) } } From d77773943467a94d942f9b5797b1b5addb36495b Mon Sep 17 00:00:00 2001 From: Dmitry Shalashov Date: Wed, 10 Aug 2022 03:25:46 +0300 Subject: [PATCH 3/3] docs: added this change to README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index b8750d7..589040c 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ This is a fork of a [bennmans' library goworker](https://github.com/benmanns/gow - [xescugs again #2](https://github.com/cycloidio/goworker/pull/8) to prune all workers not mentioned in a heartbeat list - [here](https://github.com/skaurus/goworker/pull/17) I added an option to pass a ready Redis client to the lib. If it is not a go-redis v9 instance, you have no guarantees that it will work as expected. - [and here](https://github.com/skaurus/goworker/pull/17) I added an option to pass a context.Context to the lib. By default it will create a new one via context.Background(). This context is used in all Redis calls. +- [and here](https://github.com/skaurus/goworker/pull/20) I replaced LPOP + sleep with a BLPOP Also [this PR](https://github.com/cycloidio/goworker/pull/9) might be of interest for some, but I did not merged it.