Skip to content

Commit

Permalink
backoff: fix enforcing max backoff (#200)
Browse files Browse the repository at this point in the history
* backoff: fix enforcing max backoff

* backoff: log backoff time
  • Loading branch information
andydunstall authored Dec 5, 2024
1 parent 6662723 commit 1da1354
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
7 changes: 6 additions & 1 deletion client/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,19 @@ func (u *Upstream) connect(ctx context.Context, endpointID string) (*yamux.Sessi
return nil, err
}

backoff, _ := backoff.Backoff()
u.logger().Warn(
"connect failed; retrying",
zap.String("endpoint-id", endpointID),
zap.String("url", url),
zap.String("backoff", backoff.String()),
zap.Error(err),
)

if !backoff.Wait(ctx) {
select {
case <-time.After(backoff):
continue
case <-ctx.Done():
return nil, ctx.Err()
}
}
Expand Down
18 changes: 7 additions & 11 deletions pkg/backoff/backoff.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package backoff

import (
"context"
"math/rand"
"time"
)
Expand Down Expand Up @@ -30,23 +29,17 @@ func New(retries int, minBackoff time.Duration, maxBackoff time.Duration) *Backo
}
}

// Wait blocks until the next retry. Returns false if the number of retries has
// been reached so the client should stop.
func (b *Backoff) Wait(ctx context.Context) bool {
// Backoff returns whether to retry or abort, and how long to backoff for.
func (b *Backoff) Backoff() (time.Duration, bool) {
if b.retries != 0 && b.attempts > b.retries {
return false
return 0, false
}
b.attempts++

backoff := b.nextWait()
b.lastBackoff = backoff

select {
case <-time.After(b.lastBackoff):
return true
case <-ctx.Done():
return false
}
return backoff, true
}

func (b *Backoff) nextWait() time.Duration {
Expand All @@ -56,6 +49,9 @@ func (b *Backoff) nextWait() time.Duration {
} else {
backoff = b.lastBackoff * 2
}
if backoff > b.maxBackoff {
backoff = b.maxBackoff
}

jitterMultipler := 1.0 + (rand.Float64() * 0.1)
return time.Duration(float64(backoff) * jitterMultipler)
Expand Down
17 changes: 15 additions & 2 deletions server/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,23 @@ func (g *Gossip) JoinOnStartup(ctx context.Context, addrs []string) ([]string, e
if err == nil {
return nodeIDs, nil
}
g.logger.Warn("failed to join cluster", zap.Error(err))

backoff, retry := backoff.Backoff()
if !retry {
return nil, lastErr
}

g.logger.Warn(
"failed to join cluster; retrying",
zap.String("backoff", backoff.String()),
zap.Error(err),
)
lastErr = err

if !backoff.Wait(ctx) {
select {
case <-time.After(backoff):
continue
case <-ctx.Done():
return nil, lastErr
}
}
Expand Down

0 comments on commit 1da1354

Please sign in to comment.