Skip to content

Commit

Permalink
fix(pubsub): nack messages properly with error from receive scheduler (
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex authored Apr 25, 2022
1 parent 3a1a55a commit 80edea4
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
9 changes: 9 additions & 0 deletions pubsub/internal/scheduler/receive_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
package scheduler

import (
"errors"
"sync"
)

// ErrReceiveDraining indicates the scheduler has shutdown and is draining.
var ErrReceiveDraining error = errors.New("pubsub: receive scheduler draining")

// ReceiveScheduler is a scheduler which is designed for Pub/Sub's Receive flow.
//
// Each item is added with a given key. Items added to the empty string key are
Expand Down Expand Up @@ -67,6 +71,11 @@ func NewReceiveScheduler(workers int) *ReceiveScheduler {
// call causes pushback to the pubsub service (less Receive calls on the
// long-lived stream), which keeps memory footprint stable.
func (s *ReceiveScheduler) Add(key string, item interface{}, handle func(item interface{})) error {
select {
case <-s.done:
return ErrReceiveDraining
default:
}
if key == "" {
// Spawn a worker.
s.workers <- struct{}{}
Expand Down
7 changes: 7 additions & 0 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,13 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
f(ctx2, msg.(*Message))
}); err != nil {
wg.Done()
// If there are any errors with scheduling messages,
// nack them so they can be redelivered.
msg.Nack()
// Currently, only this error is returned by the receive scheduler.
if errors.Is(err, scheduler.ErrReceiveDraining) {
return nil
}
return err
}
}
Expand Down

0 comments on commit 80edea4

Please sign in to comment.