From 4713645dd7fc66a3dee9caa1b6c02b52555e683c Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Fri, 22 Jun 2018 08:53:47 +0300
Subject: [PATCH] Change to list current members and join only new ones
---
pkg/cluster/cluster.go | 47 ++++++++++++++++++++++++++++++------------
1 file changed, 34 insertions(+), 13 deletions(-)
diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go
index 787fb64dc64..a2988763b04 100644
--- a/pkg/cluster/cluster.go
+++ b/pkg/cluster/cluster.go
@@ -29,6 +29,7 @@ type Peer struct {
cfg *memberlist.Config
knownPeers []string
+ advertiseAddr string
refreshInterval time.Duration
data *data
@@ -124,7 +125,7 @@ func New(
level.Warn(l).Log("err", "provide --cluster.advertise-address as a routable IP address or hostname")
}
- resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, net.Resolver{}, waitIfEmpty)
+ resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, *net.DefaultResolver, waitIfEmpty)
if err != nil {
return nil, errors.Wrap(err, "resolve peers")
}
@@ -161,14 +162,15 @@ func New(
reg.MustRegister(gossipClusterMembers)
return &Peer{
- logger: l,
- knownPeers: knownPeers,
- cfg: cfg,
- refreshInterval: refreshInterval,
- gossipMsgsReceived: gossipMsgsReceived,
- gossipClusterMembers: gossipClusterMembers,
- stopc: make(chan struct{}),
- data: &data{data: map[string]PeerState{}},
+ logger: l,
+ knownPeers: knownPeers,
+ cfg: cfg,
+ refreshInterval: refreshInterval,
+ gossipMsgsReceived: gossipMsgsReceived,
+ gossipClusterMembers: gossipClusterMembers,
+ stopc: make(chan struct{}),
+ data: &data{data: map[string]PeerState{}},
+ advertiseAddr: advertiseAddr,
advertiseStoreAPIAddr: advertiseStoreAPIAddr,
advertiseQueryAPIAddress: advertiseQueryAPIAddress,
}, nil
@@ -241,13 +243,32 @@ func (p *Peer) Refresh() error {
return nil
}
- prev := p.mlist.NumMembers()
- curr, err := p.mlist.Join(p.knownPeers)
+ resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, *net.DefaultResolver, false)
if err != nil {
- return errors.Wrap(err, "could not refresh memberlist cluster")
+ return errors.Wrapf(err, "refresh cluster could not resolve peers: %v", resolvedPeers)
}
- level.Debug(p.logger).Log("msg", "refreshed cluster", "before", prev, "after", curr)
+ currMembers := p.mlist.Members()
+ for _, peer := range resolvedPeers {
+ var isPeerFound bool
+
+ for _, mem := range currMembers {
+ if mem.Address() == peer {
+ isPeerFound = true
+ break
+ }
+ }
+
+ if !isPeerFound {
+ curr, err := p.mlist.Join([]string{peer})
+ if err != nil {
+ return errors.Wrapf(err, "refresh cluster could not join peer: %s", peer)
+ }
+ level.Debug(p.logger).Log("msg", "refresh cluster peer joined", "peer", peer, "before", len(currMembers), "after", curr)
+ }
+ }
+
+ level.Debug(p.logger).Log("msg", "refresh cluster done", "peers", strings.Join(p.knownPeers, ","), "resolvedPeers", strings.Join(resolvedPeers, ","))
return nil
}