From 52a280f57c5200069dfd9cf052ea084bee64aa46 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Thu, 21 Jun 2018 16:17:42 +0300
Subject: [PATCH 1/4] Add cluster.peers refresh
---
cmd/thanos/flags.go | 4 +++-
cmd/thanos/query.go | 1 +
pkg/cluster/cluster.go | 47 ++++++++++++++++++++++++++++++++++++++++--
3 files changed, 49 insertions(+), 3 deletions(-)
diff --git a/cmd/thanos/flags.go b/cmd/thanos/flags.go
index 65d210a5b3..eb45e25c6f 100644
--- a/cmd/thanos/flags.go
+++ b/cmd/thanos/flags.go
@@ -34,6 +34,8 @@ func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Lo
pushPullInterval := cmd.Flag("cluster.pushpull-interval", "Interval for gossip state syncs. Setting this interval lower (more frequent) will increase convergence speeds across larger clusters at the expense of increased bandwidth usage.").
Default(cluster.DefaultPushPullInterval.String()).Duration()
+ refreshInterval := cmd.Flag("cluster.refresh-interval", "Interval for membership to refresh cluster.peers state, 0 disables refresh.").Default(cluster.DefaultRefreshInterval.String()).Duration()
+
return grpcBindAddr,
httpBindAddr,
func(logger log.Logger, reg *prometheus.Registry, waitIfEmpty bool, httpAdvertiseAddr string, queryAPIEnabled bool) (*cluster.Peer, error) {
@@ -66,7 +68,7 @@ func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Lo
level.Info(logger).Log("msg", "QueryAPI address that will be propagated through gossip", "address", advQueryAPIAddress)
}
- return cluster.New(logger, reg, *clusterBindAddr, *clusterAdvertiseAddr, advStoreAPIAddress, advQueryAPIAddress, *peers, waitIfEmpty, *gossipInterval, *pushPullInterval)
+ return cluster.New(logger, reg, *clusterBindAddr, *clusterAdvertiseAddr, advStoreAPIAddress, advQueryAPIAddress, *peers, waitIfEmpty, *gossipInterval, *pushPullInterval, *refreshInterval)
}
}
diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go
index 07db741ba4..dc6db68e2f 100644
--- a/cmd/thanos/query.go
+++ b/cmd/thanos/query.go
@@ -247,6 +247,7 @@ func runQuery(
l.Close()
})
}
+
level.Info(logger).Log("msg", "starting query node")
return nil
}
diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go
index 803f6269ab..787fb64dc6 100644
--- a/pkg/cluster/cluster.go
+++ b/pkg/cluster/cluster.go
@@ -27,8 +27,9 @@ type Peer struct {
mlist *memberlist.Memberlist
stopc chan struct{}
- cfg *memberlist.Config
- knownPeers []string
+ cfg *memberlist.Config
+ knownPeers []string
+ refreshInterval time.Duration
data *data
gossipMsgsReceived prometheus.Counter
@@ -43,6 +44,7 @@ type Peer struct {
const (
DefaultPushPullInterval = 5 * time.Second
DefaultGossipInterval = 5 * time.Second
+ DefaultRefreshInterval = 30 * time.Second
)
// PeerType describes a peer's role in the cluster.
@@ -97,6 +99,7 @@ func New(
waitIfEmpty bool,
pushPullInterval time.Duration,
gossipInterval time.Duration,
+ refreshInterval time.Duration,
) (*Peer, error) {
l = log.With(l, "component", "cluster")
@@ -161,6 +164,7 @@ func New(
logger: l,
knownPeers: knownPeers,
cfg: cfg,
+ refreshInterval: refreshInterval,
gossipMsgsReceived: gossipMsgsReceived,
gossipClusterMembers: gossipClusterMembers,
stopc: make(chan struct{}),
@@ -205,6 +209,45 @@ func (p *Peer) Join(peerType PeerType, initialMetadata PeerMetadata) error {
Metadata: initialMetadata,
})
+ if p.refreshInterval != 0 {
+ go p.periodicallyRefresh()
+ }
+
+ return nil
+}
+
+func (p *Peer) periodicallyRefresh() {
+ tick := time.NewTicker(p.refreshInterval)
+ defer tick.Stop()
+
+ for {
+ select {
+ case <-p.stopc:
+ return
+ case <-tick.C:
+ if err := p.Refresh(); err != nil {
+ level.Error(p.logger).Log("msg", "Refreshing memberlist", "err", err)
+ }
+ }
+ }
+}
+
+// Refresh renews membership cluster, this will refresh DNS names and join newly added members
+func (p *Peer) Refresh() error {
+ p.mlistMtx.Lock()
+ defer p.mlistMtx.Unlock()
+
+ if p.mlist == nil {
+ return nil
+ }
+
+ prev := p.mlist.NumMembers()
+ curr, err := p.mlist.Join(p.knownPeers)
+ if err != nil {
+ return errors.Wrap(err, "could not refresh memberlist cluster")
+ }
+ level.Debug(p.logger).Log("msg", "refreshed cluster", "before", prev, "after", curr)
+
return nil
}
From 14e590666860cc3fa8020e95f971de1cc0454bce Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Thu, 21 Jun 2018 16:34:59 +0300
Subject: [PATCH 2/4] Fix test
---
pkg/cluster/cluster_test.go | 1 +
1 file changed, 1 insertion(+)
diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go
index 96fe77cfc2..18add964ba 100644
--- a/pkg/cluster/cluster_test.go
+++ b/pkg/cluster/cluster_test.go
@@ -38,6 +38,7 @@ func joinPeer(num int, knownPeers []string) (peerAddr string, peer *Peer, err er
false,
100*time.Millisecond,
50*time.Millisecond,
+ 30*time.Millisecond,
)
if err != nil {
return "", nil, err
From 272b2007997a8910422ea61c1eb2239f483e3b3b Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Fri, 22 Jun 2018 08:53:47 +0300
Subject: [PATCH 3/4] Change to list current members and join only new ones
---
pkg/cluster/cluster.go | 48 ++++++++++++++++++++++++++++++------------
1 file changed, 35 insertions(+), 13 deletions(-)
diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go
index 787fb64dc6..fd12dcbd2a 100644
--- a/pkg/cluster/cluster.go
+++ b/pkg/cluster/cluster.go
@@ -29,6 +29,7 @@ type Peer struct {
cfg *memberlist.Config
knownPeers []string
+ advertiseAddr string
refreshInterval time.Duration
data *data
@@ -124,7 +125,7 @@ func New(
level.Warn(l).Log("err", "provide --cluster.advertise-address as a routable IP address or hostname")
}
- resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, net.Resolver{}, waitIfEmpty)
+ resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, *net.DefaultResolver, waitIfEmpty)
if err != nil {
return nil, errors.Wrap(err, "resolve peers")
}
@@ -161,14 +162,15 @@ func New(
reg.MustRegister(gossipClusterMembers)
return &Peer{
- logger: l,
- knownPeers: knownPeers,
- cfg: cfg,
- refreshInterval: refreshInterval,
- gossipMsgsReceived: gossipMsgsReceived,
- gossipClusterMembers: gossipClusterMembers,
- stopc: make(chan struct{}),
- data: &data{data: map[string]PeerState{}},
+ logger: l,
+ knownPeers: knownPeers,
+ cfg: cfg,
+ refreshInterval: refreshInterval,
+ gossipMsgsReceived: gossipMsgsReceived,
+ gossipClusterMembers: gossipClusterMembers,
+ stopc: make(chan struct{}),
+ data: &data{data: map[string]PeerState{}},
+ advertiseAddr: advertiseAddr,
advertiseStoreAPIAddr: advertiseStoreAPIAddr,
advertiseQueryAPIAddress: advertiseQueryAPIAddress,
}, nil
@@ -241,13 +243,33 @@ func (p *Peer) Refresh() error {
return nil
}
- prev := p.mlist.NumMembers()
- curr, err := p.mlist.Join(p.knownPeers)
+ resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, *net.DefaultResolver, false)
if err != nil {
- return errors.Wrap(err, "could not refresh memberlist cluster")
+ return errors.Wrapf(err, "refresh cluster could not resolve peers: %v", resolvedPeers)
}
- level.Debug(p.logger).Log("msg", "refreshed cluster", "before", prev, "after", curr)
+ currMembers := p.mlist.Members()
+ for _, peer := range resolvedPeers {
+ var isPeerFound bool
+
+ for _, mem := range currMembers {
+ if mem.Address() == peer {
+ isPeerFound = true
+ break
+ }
+ }
+
+ if !isPeerFound {
+ curr, err := p.mlist.Join([]string{peer})
+ if err != nil {
+ level.Debug(p.logger).Log("msg", "refresh cluster could not join peer", "peer", peer)
+ }
+
+ level.Debug(p.logger).Log("msg", "refresh cluster peer joined", "peer", peer, "before", len(currMembers), "after", curr)
+ }
+ }
+
+ level.Debug(p.logger).Log("msg", "refresh cluster done", "peers", strings.Join(p.knownPeers, ","), "resolvedPeers", strings.Join(resolvedPeers, ","))
return nil
}
From 04d93ccd4b6d2afb46611dec78ec67407351ddfc Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Mon, 25 Jun 2018 08:47:40 +0300
Subject: [PATCH 4/4] Fixes after review
---
pkg/cluster/cluster.go | 23 +++++++++++++++--------
1 file changed, 15 insertions(+), 8 deletions(-)
diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go
index fd12dcbd2a..098d77e925 100644
--- a/pkg/cluster/cluster.go
+++ b/pkg/cluster/cluster.go
@@ -45,7 +45,7 @@ type Peer struct {
const (
DefaultPushPullInterval = 5 * time.Second
DefaultGossipInterval = 5 * time.Second
- DefaultRefreshInterval = 30 * time.Second
+ DefaultRefreshInterval = 60 * time.Second
)
// PeerType describes a peer's role in the cluster.
@@ -249,6 +249,7 @@ func (p *Peer) Refresh() error {
}
currMembers := p.mlist.Members()
+ var notConnected []string
for _, peer := range resolvedPeers {
var isPeerFound bool
@@ -260,16 +261,22 @@ func (p *Peer) Refresh() error {
}
if !isPeerFound {
- curr, err := p.mlist.Join([]string{peer})
- if err != nil {
- level.Debug(p.logger).Log("msg", "refresh cluster could not join peer", "peer", peer)
- }
-
- level.Debug(p.logger).Log("msg", "refresh cluster peer joined", "peer", peer, "before", len(currMembers), "after", curr)
+ notConnected = append(notConnected, peer)
}
}
- level.Debug(p.logger).Log("msg", "refresh cluster done", "peers", strings.Join(p.knownPeers, ","), "resolvedPeers", strings.Join(resolvedPeers, ","))
+ if len(notConnected) == 0 {
+ level.Debug(p.logger).Log("msg", "refresh cluster done", "peers", strings.Join(p.knownPeers, ","), "resolvedPeers", strings.Join(resolvedPeers, ","))
+ return nil
+ }
+
+ curr, err := p.mlist.Join(notConnected)
+ if err != nil {
+ level.Error(p.logger).Log("msg", "refresh cluster could not join peers", "peers", strings.Join(notConnected, ","))
+ }
+
+ level.Debug(p.logger).Log("msg", "refresh cluster done, peers joined", "peers", strings.Join(notConnected, ","), "before", len(currMembers), "after", curr)
+
return nil
}