Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use BLPOP instead of LPOP + sleep #20

Merged
merged 3 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ This is a fork of a [bennmans' library goworker](https://github.com/benmanns/gow
- [xescugs again #2](https://github.com/cycloidio/goworker/pull/8) to prune all workers not mentioned in a heartbeat list
- [here](https://github.com/skaurus/goworker/pull/17) I added an option to pass a ready Redis client to the lib. If it is not a go-redis v9 instance, you have no guarantees that it will work as expected.
- [and here](https://github.com/skaurus/goworker/pull/17) I added an option to pass a context.Context to the lib. By default it will create a new one via context.Background(). This context is used in all Redis calls.
- [and here](https://github.com/skaurus/goworker/pull/20) I replaced LPOP + sleep with a BLPOP

Also [this PR](https://github.com/cycloidio/goworker/pull/9) might be of interest for some, but I did not merged it.

Expand Down Expand Up @@ -148,6 +149,7 @@ func init() {
ExitOnComplete: false,
Concurrency: 2,
Namespace: "resque:",
// timeout for BLPOP in seconds
IntervalFloat: 5.0,
}
goworker.SetSettings(settings)
Expand Down
34 changes: 16 additions & 18 deletions poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"github.com/pkg/errors"
)

var (
errorUnexpectedJob = errors.New("code expects exactly two items - queue name and task json")
)

type poller struct {
process
isStrict bool
Expand All @@ -26,24 +30,29 @@ func newPoller(queues []string, isStrict bool) (*poller, error) {
}, nil
}

func (p *poller) getJob(c *redis.Client) (*Job, error) {
func (p *poller) getJob(c *redis.Client, interval time.Duration) (*Job, error) {
for _, queue := range p.queues(p.isStrict) {
logger.Debugf("Checking %s", queue)

result, err := c.LPop(ctx, fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue)).Result()
results, err := c.BLPop(ctx, interval, fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue)).Result()
if err != nil {
// no jobs for now, continue on another queue
if err == redis.Nil {
continue
}
return nil, err
}
if result != "" {
if len(results) > 0 {
if len(results) != 2 {
return nil, errorUnexpectedJob
}
// at 0 index we have a queue name
task := results[1]
logger.Debugf("Found job on %s", queue)

job := &Job{Queue: queue}

decoder := json.NewDecoder(bytes.NewReader([]byte(result)))
decoder := json.NewDecoder(bytes.NewReader([]byte(task)))
if workerSettings.UseNumber {
decoder.UseNumber()
}
Expand Down Expand Up @@ -94,7 +103,7 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) (<-chan *Job, er
case <-quit:
return
default:
job, err := p.getJob(client)
job, err := p.getJob(client, interval)
if err != nil {
err = errors.WithStack(err)
_ = logger.Criticalf("Error on %v getting job from %v: %+v", p, p.Queues, err)
Expand Down Expand Up @@ -127,19 +136,8 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) (<-chan *Job, er

return
}
} else {
if workerSettings.ExitOnComplete {
return
}
logger.Debugf("Sleeping for %v", interval)
logger.Debugf("Waiting for %v", p.Queues)

timeout := time.After(interval)
select {
case <-quit:
return
case <-timeout:
}
} else if workerSettings.ExitOnComplete {
return
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ func TestEnqueue(t *testing.T) {
t.Errorf("(Enqueue) Failed on work %s", err)
}
if !reflect.DeepEqual(actualArgs, expectedArgs) {
t.Errorf("(Enqueue) Expected %v, actual %v", actualArgs, expectedArgs)
t.Errorf("(Enqueue) Expected %v, actual %v", expectedArgs, actualArgs)
}
if !reflect.DeepEqual(actualQueueName, queueName) {
t.Errorf("(Enqueue) Expected %v, actual %v", actualQueueName, queueName)
t.Errorf("(Enqueue) Expected %v, actual %v", queueName, actualQueueName)
}
}