diff --git a/pubsub/internal/scheduler/receive_scheduler.go b/pubsub/internal/scheduler/receive_scheduler.go index e2cda6dd6bfa..dafccdd1cee3 100644 --- a/pubsub/internal/scheduler/receive_scheduler.go +++ b/pubsub/internal/scheduler/receive_scheduler.go @@ -15,9 +15,13 @@ package scheduler import ( + "errors" "sync" ) +// ErrReceiveDraining indicates the scheduler has shutdown and is draining. +var ErrReceiveDraining error = errors.New("pubsub: receive scheduler draining") + // ReceiveScheduler is a scheduler which is designed for Pub/Sub's Receive flow. // // Each item is added with a given key. Items added to the empty string key are @@ -67,6 +71,11 @@ func NewReceiveScheduler(workers int) *ReceiveScheduler { // call causes pushback to the pubsub service (less Receive calls on the // long-lived stream), which keeps memory footprint stable. func (s *ReceiveScheduler) Add(key string, item interface{}, handle func(item interface{})) error { + select { + case <-s.done: + return ErrReceiveDraining + default: + } if key == "" { // Spawn a worker. s.workers <- struct{}{} diff --git a/pubsub/subscription.go b/pubsub/subscription.go index b3bb646a41ee..2d316e61ac59 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -1003,6 +1003,13 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes f(ctx2, msg.(*Message)) }); err != nil { wg.Done() + // If there are any errors with scheduling messages, + // nack them so they can be redelivered. + msg.Nack() + // Currently, only this error is returned by the receive scheduler. + if errors.Is(err, scheduler.ErrReceiveDraining) { + return nil + } return err } }