Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gossip: add join retries #12

Merged
merged 1 commit into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/configure.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ cluster:
# so the initial set of configured members only needs to be a subset of nodes.
join: []

# Whether the server node should abort if it is configured with more than one
# node to join (excluding itself) but fails to join any members.
abort_if_join_fails: true

auth:
# Secret key to authenticate HMAC endpoint connection JWTs.
token_hmac_secret_key: ""
Expand Down
59 changes: 59 additions & 0 deletions pkg/backoff/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package backoff

import (
"context"
"math/rand"
"time"
)

// Backoff implements exponential backoff with jitter.
type Backoff struct {
// retries is the maximum number of attempts.
retries int
minBackoff time.Duration
maxBackoff time.Duration

// attempts is the number of attempts so far.
attempts int
lastBackoff time.Duration
}

func New(retries int, minBackoff time.Duration, maxBackoff time.Duration) *Backoff {
return &Backoff{
retries: retries,
minBackoff: minBackoff,
maxBackoff: maxBackoff,
attempts: 0,
}
}

// Wait blocks until the next retry. Returns false if the number of retries has
// been reached so the client should stop.
func (b *Backoff) Wait(ctx context.Context) bool {
if b.attempts > b.retries {
return false
}
b.attempts++

backoff := b.nextWait()
b.lastBackoff = backoff

select {
case <-time.After(b.lastBackoff):
return true
case <-ctx.Done():
return false
}
}

func (b *Backoff) nextWait() time.Duration {
var backoff time.Duration
if b.lastBackoff == 0 {
backoff = b.minBackoff
} else {
backoff = b.lastBackoff * 2
}

jitterMultipler := 1.0 + (rand.Float64() * 0.1)
return time.Duration(float64(backoff) * jitterMultipler)
}
11 changes: 10 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type ClusterConfig struct {

// Join contians a list of addresses of members in the cluster to join.
Join []string `json:"join" yaml:"join"`

AbortIfJoinFails bool `json:"abort_if_join_fails" yaml:"abort_if_join_fails"`
}

func (c *ClusterConfig) Validate() error {
Expand Down Expand Up @@ -256,7 +258,6 @@ the given prefix.
Such as you could use the node or pod name as a prefix, then add a unique
identifier to ensure the node ID is unique across restarts.`,
)

fs.StringSliceVar(
&c.Cluster.Join,
"cluster.join",
Expand All @@ -275,6 +276,14 @@ port is given, the gossip port of this node is used.
Note each node propagates membership information to the other known nodes,
so the initial set of configured members only needs to be a subset of nodes.`,
)
fs.BoolVar(
&c.Cluster.AbortIfJoinFails,
"cluster.abort-if-join-fails",
true,
`
Whether the server node should abort if it is configured with more than one
node to join (excluding itself) but fails to join any members.`,
)

c.Auth.RegisterFlags(fs)

Expand Down
32 changes: 29 additions & 3 deletions server/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"context"
"fmt"
"net"
"time"

"github.com/andydunstall/kite"
"github.com/andydunstall/pico/pkg/backoff"
"github.com/andydunstall/pico/pkg/log"
"github.com/andydunstall/pico/server/netmap"
"go.uber.org/zap"
)

// Gossip is responsible for maintaining this nodes local NetworkMap
Expand Down Expand Up @@ -61,12 +64,35 @@ func NewGossip(
}, nil
}

// Join attempts to join an existing cluster by syncronising with the members
// at the given addresses.
func (g *Gossip) Join(addrs []string) ([]string, error) {
// JoinOnBoot attempts to join an existing cluster by syncronising with the
// members at the given addresses.
//
// This will only attempt to join once and won't retry.
func (g *Gossip) JoinOnBoot(addrs []string) ([]string, error) {
return g.gossiper.Join(addrs)
}

// JoinOnStartup attempts to join an existing cluster by syncronising with the
// members at the given addresses.
//
// This will retry 5 times (with backoff).
func (g *Gossip) JoinOnStartup(ctx context.Context, addrs []string) ([]string, error) {
backoff := backoff.New(5, time.Second, time.Minute)
var lastErr error
for {
if !backoff.Wait(ctx) {
return nil, lastErr
}

nodeIDs, err := g.gossiper.Join(addrs)
if err == nil {
return nodeIDs, nil
}
g.logger.Warn("failed to join cluster", zap.Error(err))
lastErr = err
}
}

// Leave notifies the known members that this node is leaving the cluster.
//
// This will attempt to sync with up to 3 nodes to ensure the leave status is
Expand Down
68 changes: 48 additions & 20 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,15 @@ func (s *Server) Run(ctx context.Context) error {
defer gossiper.Close()
adminServer.AddStatus("/gossip", gossip.NewStatus(gossiper))

// Attempt to join an existing cluster. Note if 'join' is a domain that
// doesn't map to any entries (except ourselves), then join will succeed
// since it means we're the first member.
nodeIDs, err := gossiper.Join(s.conf.Cluster.Join)
// Attempt to join an existing cluster.
//
// Note when running on Kubernetes, if this is the first member, as it is
// not yet ready the service DNS record won't resolve so this may fail.
// Therefore we attempt to join though continue booting if join fails.
// Once booted we then attempt to join again with retries.
nodeIDs, err := gossiper.JoinOnBoot(s.conf.Cluster.Join)
if err != nil {
return fmt.Errorf("join cluster: %w", err)
s.logger.Warn("failed to join cluster", zap.Error(err))
}
if len(nodeIDs) > 0 {
s.logger.Info(
Expand Down Expand Up @@ -228,21 +231,6 @@ func (s *Server) Run(ctx context.Context) error {
case <-ctx.Done():
case <-shutdownCtx.Done():
}

leaveCtx, cancel := context.WithTimeout(
context.Background(),
s.conf.Server.GracefulShutdownTimeout,
)
defer cancel()

// Leave as soon as we receive the shutdown signal to avoid receiving
// forward proxy requests.
if err := gossiper.Leave(leaveCtx); err != nil {
s.logger.Warn("failed to gracefully leave cluster", zap.Error(err))
} else {
s.logger.Info("left cluster")
}

return nil
}, func(error) {
shutdownCancel()
Expand Down Expand Up @@ -308,6 +296,46 @@ func (s *Server) Run(ctx context.Context) error {
s.logger.Info("admin server shut down")
})

// Gossip.
gossipCtx, gossipCancel := context.WithCancel(ctx)
group.Add(func() error {
if len(nodeIDs) == 0 {
nodeIDs, err = gossiper.JoinOnStartup(gossipCtx, s.conf.Cluster.Join)
if err != nil {
if s.conf.Cluster.AbortIfJoinFails {
return fmt.Errorf("join on startup: %w", err)
}
s.logger.Warn("failed to join cluster", zap.Error(err))
}
if len(nodeIDs) > 0 {
s.logger.Info(
"joined cluster",
zap.Strings("node-ids", nodeIDs),
)
}
}

<-gossipCtx.Done()

leaveCtx, cancel := context.WithTimeout(
context.Background(),
s.conf.Server.GracefulShutdownTimeout,
)
defer cancel()

// Leave as soon as we receive the shutdown signal to avoid receiving
// forward proxy requests.
if err := gossiper.Leave(leaveCtx); err != nil {
s.logger.Warn("failed to gracefully leave cluster", zap.Error(err))
} else {
s.logger.Info("left cluster")
}

return nil
}, func(error) {
gossipCancel()
})

if err := group.Run(); err != nil {
return err
}
Expand Down
Loading