Skip to content

Commit

Permalink
pubsub: send ack requests quickly
Browse files Browse the repository at this point in the history
See discussions in #684 (comment) .
Load testing shows no performance regression.

Change-Id: I45761540d2b248292b52118894d37712c0bfa654
Reviewed-on: https://code-review.googlesource.com/14472
Reviewed-by: kokoro <[email protected]>
Reviewed-by: Jonathan Amsterdam <[email protected]>
  • Loading branch information
pongad committed Jul 5, 2017
1 parent 558b56d commit 3d09da3
Showing 1 changed file with 6 additions and 10 deletions.
16 changes: 6 additions & 10 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,8 @@ func newPollingMessageIterator(ctx context.Context, s service, subName string, p
keepAlivePeriod := po.ackDeadline - 5*time.Second
kaTicker := time.NewTicker(keepAlivePeriod) // Stopped in it.Stop

// TODO: make ackTicker more configurable. Something less than
// kaTicker is a reasonable default (there's no point extending
// messages when they could be acked instead).
ackTicker := time.NewTicker(keepAlivePeriod / 2) // Stopped in it.Stop
// Ack promptly so users don't lose work if client crashes.
ackTicker := time.NewTicker(100 * time.Millisecond) // Stopped in it.Stop
ka := &keepAlive{
s: s,
Ctx: ctx,
Expand All @@ -116,7 +114,7 @@ func newPollingMessageIterator(ctx context.Context, s service, subName string, p
// redelivered.
_ = s.modifyAckDeadline(ctx, subName, 0, ackIDs.([]string))
})
nacker.DelayThreshold = keepAlivePeriod / 10 // nack promptly
nacker.DelayThreshold = 100 * time.Millisecond // nack promptly
nacker.BundleCountThreshold = 10

pull := newPuller(s, subName, ctx, po.maxPrefetch, ka.Add, ka.Remove)
Expand Down Expand Up @@ -240,11 +238,9 @@ func newStreamingMessageIterator(ctx context.Context, sp *streamingPuller, po *p
keepAlivePeriod := po.ackDeadline - 5*time.Second
kaTicker := time.NewTicker(keepAlivePeriod)

// TODO: make ackTicker more configurable. Something less than
// kaTicker is a reasonable default (there's no point extending
// messages when they could be acked instead).
ackTicker := time.NewTicker(keepAlivePeriod / 2)
nackTicker := time.NewTicker(keepAlivePeriod / 10)
// Ack promptly so users don't lose work if client crashes.
ackTicker := time.NewTicker(100 * time.Millisecond)
nackTicker := time.NewTicker(100 * time.Millisecond)
it := &streamingMessageIterator{
ctx: ctx,
sp: sp,
Expand Down

0 comments on commit 3d09da3

Please sign in to comment.