From db42ebf6abfcfd60ba157eed53bc58df8ee17721 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Wed, 27 Jun 2018 00:24:38 +0300
Subject: [PATCH] Add cluster.peers refresh (#383)
* Add cluster.peers refresh
* Fix test
* Change to list current members and join only new ones
* Fixes after review
---
cmd/thanos/flags.go | 4 +-
cmd/thanos/query.go | 1 +
pkg/cluster/cluster.go | 92 +++++++++++++++++++++++++++++++++----
pkg/cluster/cluster_test.go | 1 +
4 files changed, 87 insertions(+), 11 deletions(-)
diff --git a/cmd/thanos/flags.go b/cmd/thanos/flags.go
index 65d210a5b32..eb45e25c6f2 100644
--- a/cmd/thanos/flags.go
+++ b/cmd/thanos/flags.go
@@ -34,6 +34,8 @@ func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Lo
pushPullInterval := cmd.Flag("cluster.pushpull-interval", "Interval for gossip state syncs. Setting this interval lower (more frequent) will increase convergence speeds across larger clusters at the expense of increased bandwidth usage.").
Default(cluster.DefaultPushPullInterval.String()).Duration()
+ refreshInterval := cmd.Flag("cluster.refresh-interval", "Interval for membership to refresh cluster.peers state, 0 disables refresh.").Default(cluster.DefaultRefreshInterval.String()).Duration()
+
return grpcBindAddr,
httpBindAddr,
func(logger log.Logger, reg *prometheus.Registry, waitIfEmpty bool, httpAdvertiseAddr string, queryAPIEnabled bool) (*cluster.Peer, error) {
@@ -66,7 +68,7 @@ func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Lo
level.Info(logger).Log("msg", "QueryAPI address that will be propagated through gossip", "address", advQueryAPIAddress)
}
- return cluster.New(logger, reg, *clusterBindAddr, *clusterAdvertiseAddr, advStoreAPIAddress, advQueryAPIAddress, *peers, waitIfEmpty, *gossipInterval, *pushPullInterval)
+ return cluster.New(logger, reg, *clusterBindAddr, *clusterAdvertiseAddr, advStoreAPIAddress, advQueryAPIAddress, *peers, waitIfEmpty, *gossipInterval, *pushPullInterval, *refreshInterval)
}
}
diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go
index 07db741ba4c..dc6db68e2fa 100644
--- a/cmd/thanos/query.go
+++ b/cmd/thanos/query.go
@@ -247,6 +247,7 @@ func runQuery(
l.Close()
})
}
+
level.Info(logger).Log("msg", "starting query node")
return nil
}
diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go
index 803f6269abb..098d77e9253 100644
--- a/pkg/cluster/cluster.go
+++ b/pkg/cluster/cluster.go
@@ -27,8 +27,10 @@ type Peer struct {
mlist *memberlist.Memberlist
stopc chan struct{}
- cfg *memberlist.Config
- knownPeers []string
+ cfg *memberlist.Config
+ knownPeers []string
+ advertiseAddr string
+ refreshInterval time.Duration
data *data
gossipMsgsReceived prometheus.Counter
@@ -43,6 +45,7 @@ type Peer struct {
const (
DefaultPushPullInterval = 5 * time.Second
DefaultGossipInterval = 5 * time.Second
+ DefaultRefreshInterval = 60 * time.Second
)
// PeerType describes a peer's role in the cluster.
@@ -97,6 +100,7 @@ func New(
waitIfEmpty bool,
pushPullInterval time.Duration,
gossipInterval time.Duration,
+ refreshInterval time.Duration,
) (*Peer, error) {
l = log.With(l, "component", "cluster")
@@ -121,7 +125,7 @@ func New(
level.Warn(l).Log("err", "provide --cluster.advertise-address as a routable IP address or hostname")
}
- resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, net.Resolver{}, waitIfEmpty)
+ resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, *net.DefaultResolver, waitIfEmpty)
if err != nil {
return nil, errors.Wrap(err, "resolve peers")
}
@@ -158,13 +162,15 @@ func New(
reg.MustRegister(gossipClusterMembers)
return &Peer{
- logger: l,
- knownPeers: knownPeers,
- cfg: cfg,
- gossipMsgsReceived: gossipMsgsReceived,
- gossipClusterMembers: gossipClusterMembers,
- stopc: make(chan struct{}),
- data: &data{data: map[string]PeerState{}},
+ logger: l,
+ knownPeers: knownPeers,
+ cfg: cfg,
+ refreshInterval: refreshInterval,
+ gossipMsgsReceived: gossipMsgsReceived,
+ gossipClusterMembers: gossipClusterMembers,
+ stopc: make(chan struct{}),
+ data: &data{data: map[string]PeerState{}},
+ advertiseAddr: advertiseAddr,
advertiseStoreAPIAddr: advertiseStoreAPIAddr,
advertiseQueryAPIAddress: advertiseQueryAPIAddress,
}, nil
@@ -205,6 +211,72 @@ func (p *Peer) Join(peerType PeerType, initialMetadata PeerMetadata) error {
Metadata: initialMetadata,
})
+ if p.refreshInterval != 0 {
+ go p.periodicallyRefresh()
+ }
+
+ return nil
+}
+
+func (p *Peer) periodicallyRefresh() {
+ tick := time.NewTicker(p.refreshInterval)
+ defer tick.Stop()
+
+ for {
+ select {
+ case <-p.stopc:
+ return
+ case <-tick.C:
+ if err := p.Refresh(); err != nil {
+ level.Error(p.logger).Log("msg", "Refreshing memberlist", "err", err)
+ }
+ }
+ }
+}
+
+// Refresh renews membership cluster, this will refresh DNS names and join newly added members
+func (p *Peer) Refresh() error {
+ p.mlistMtx.Lock()
+ defer p.mlistMtx.Unlock()
+
+ if p.mlist == nil {
+ return nil
+ }
+
+ resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, *net.DefaultResolver, false)
+ if err != nil {
+ return errors.Wrapf(err, "refresh cluster could not resolve peers: %v", resolvedPeers)
+ }
+
+ currMembers := p.mlist.Members()
+ var notConnected []string
+ for _, peer := range resolvedPeers {
+ var isPeerFound bool
+
+ for _, mem := range currMembers {
+ if mem.Address() == peer {
+ isPeerFound = true
+ break
+ }
+ }
+
+ if !isPeerFound {
+ notConnected = append(notConnected, peer)
+ }
+ }
+
+ if len(notConnected) == 0 {
+ level.Debug(p.logger).Log("msg", "refresh cluster done", "peers", strings.Join(p.knownPeers, ","), "resolvedPeers", strings.Join(resolvedPeers, ","))
+ return nil
+ }
+
+ curr, err := p.mlist.Join(notConnected)
+ if err != nil {
+ level.Error(p.logger).Log("msg", "refresh cluster could not join peers", "peers", strings.Join(notConnected, ","))
+ }
+
+ level.Debug(p.logger).Log("msg", "refresh cluster done, peers joined", "peers", strings.Join(notConnected, ","), "before", len(currMembers), "after", curr)
+
return nil
}
diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go
index 96fe77cfc29..18add964ba5 100644
--- a/pkg/cluster/cluster_test.go
+++ b/pkg/cluster/cluster_test.go
@@ -38,6 +38,7 @@ func joinPeer(num int, knownPeers []string) (peerAddr string, peer *Peer, err er
false,
100*time.Millisecond,
50*time.Millisecond,
+ 30*time.Millisecond,
)
if err != nil {
return "", nil, err