From 52a280f57c5200069dfd9cf052ea084bee64aa46 Mon Sep 17 00:00:00 2001 From: Povilas Versockas Date: Thu, 21 Jun 2018 16:17:42 +0300 Subject: [PATCH 1/4] Add cluster.peers refresh --- cmd/thanos/flags.go | 4 +++- cmd/thanos/query.go | 1 + pkg/cluster/cluster.go | 47 ++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/cmd/thanos/flags.go b/cmd/thanos/flags.go index 65d210a5b3..eb45e25c6f 100644 --- a/cmd/thanos/flags.go +++ b/cmd/thanos/flags.go @@ -34,6 +34,8 @@ func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Lo pushPullInterval := cmd.Flag("cluster.pushpull-interval", "Interval for gossip state syncs. Setting this interval lower (more frequent) will increase convergence speeds across larger clusters at the expense of increased bandwidth usage."). Default(cluster.DefaultPushPullInterval.String()).Duration() + refreshInterval := cmd.Flag("cluster.refresh-interval", "Interval for membership to refresh cluster.peers state, 0 disables refresh.").Default(cluster.DefaultRefreshInterval.String()).Duration() + return grpcBindAddr, httpBindAddr, func(logger log.Logger, reg *prometheus.Registry, waitIfEmpty bool, httpAdvertiseAddr string, queryAPIEnabled bool) (*cluster.Peer, error) { @@ -66,7 +68,7 @@ func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Lo level.Info(logger).Log("msg", "QueryAPI address that will be propagated through gossip", "address", advQueryAPIAddress) } - return cluster.New(logger, reg, *clusterBindAddr, *clusterAdvertiseAddr, advStoreAPIAddress, advQueryAPIAddress, *peers, waitIfEmpty, *gossipInterval, *pushPullInterval) + return cluster.New(logger, reg, *clusterBindAddr, *clusterAdvertiseAddr, advStoreAPIAddress, advQueryAPIAddress, *peers, waitIfEmpty, *gossipInterval, *pushPullInterval, *refreshInterval) } } diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 07db741ba4..dc6db68e2f 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -247,6 +247,7 @@ func runQuery( l.Close() }) } + level.Info(logger).Log("msg", "starting query node") return nil } diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 803f6269ab..787fb64dc6 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -27,8 +27,9 @@ type Peer struct { mlist *memberlist.Memberlist stopc chan struct{} - cfg *memberlist.Config - knownPeers []string + cfg *memberlist.Config + knownPeers []string + refreshInterval time.Duration data *data gossipMsgsReceived prometheus.Counter @@ -43,6 +44,7 @@ type Peer struct { const ( DefaultPushPullInterval = 5 * time.Second DefaultGossipInterval = 5 * time.Second + DefaultRefreshInterval = 30 * time.Second ) // PeerType describes a peer's role in the cluster. @@ -97,6 +99,7 @@ func New( waitIfEmpty bool, pushPullInterval time.Duration, gossipInterval time.Duration, + refreshInterval time.Duration, ) (*Peer, error) { l = log.With(l, "component", "cluster") @@ -161,6 +164,7 @@ func New( logger: l, knownPeers: knownPeers, cfg: cfg, + refreshInterval: refreshInterval, gossipMsgsReceived: gossipMsgsReceived, gossipClusterMembers: gossipClusterMembers, stopc: make(chan struct{}), @@ -205,6 +209,45 @@ func (p *Peer) Join(peerType PeerType, initialMetadata PeerMetadata) error { Metadata: initialMetadata, }) + if p.refreshInterval != 0 { + go p.periodicallyRefresh() + } + + return nil +} + +func (p *Peer) periodicallyRefresh() { + tick := time.NewTicker(p.refreshInterval) + defer tick.Stop() + + for { + select { + case <-p.stopc: + return + case <-tick.C: + if err := p.Refresh(); err != nil { + level.Error(p.logger).Log("msg", "Refreshing memberlist", "err", err) + } + } + } +} + +// Refresh renews membership cluster, this will refresh DNS names and join newly added members +func (p *Peer) Refresh() error { + p.mlistMtx.Lock() + defer p.mlistMtx.Unlock() + + if p.mlist == nil { + return nil + } + + prev := p.mlist.NumMembers() + curr, err := p.mlist.Join(p.knownPeers) + if err != nil { + return errors.Wrap(err, "could not refresh memberlist cluster") + } + level.Debug(p.logger).Log("msg", "refreshed cluster", "before", prev, "after", curr) + return nil } From 14e590666860cc3fa8020e95f971de1cc0454bce Mon Sep 17 00:00:00 2001 From: Povilas Versockas Date: Thu, 21 Jun 2018 16:34:59 +0300 Subject: [PATCH 2/4] Fix test --- pkg/cluster/cluster_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 96fe77cfc2..18add964ba 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -38,6 +38,7 @@ func joinPeer(num int, knownPeers []string) (peerAddr string, peer *Peer, err er false, 100*time.Millisecond, 50*time.Millisecond, + 30*time.Millisecond, ) if err != nil { return "", nil, err From 272b2007997a8910422ea61c1eb2239f483e3b3b Mon Sep 17 00:00:00 2001 From: Povilas Versockas Date: Fri, 22 Jun 2018 08:53:47 +0300 Subject: [PATCH 3/4] Change to list current members and join only new ones --- pkg/cluster/cluster.go | 48 ++++++++++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 787fb64dc6..fd12dcbd2a 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -29,6 +29,7 @@ type Peer struct { cfg *memberlist.Config knownPeers []string + advertiseAddr string refreshInterval time.Duration data *data @@ -124,7 +125,7 @@ func New( level.Warn(l).Log("err", "provide --cluster.advertise-address as a routable IP address or hostname") } - resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, net.Resolver{}, waitIfEmpty) + resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, *net.DefaultResolver, waitIfEmpty) if err != nil { return nil, errors.Wrap(err, "resolve peers") } @@ -161,14 +162,15 @@ func New( reg.MustRegister(gossipClusterMembers) return &Peer{ - logger: l, - knownPeers: knownPeers, - cfg: cfg, - refreshInterval: refreshInterval, - gossipMsgsReceived: gossipMsgsReceived, - gossipClusterMembers: gossipClusterMembers, - stopc: make(chan struct{}), - data: &data{data: map[string]PeerState{}}, + logger: l, + knownPeers: knownPeers, + cfg: cfg, + refreshInterval: refreshInterval, + gossipMsgsReceived: gossipMsgsReceived, + gossipClusterMembers: gossipClusterMembers, + stopc: make(chan struct{}), + data: &data{data: map[string]PeerState{}}, + advertiseAddr: advertiseAddr, advertiseStoreAPIAddr: advertiseStoreAPIAddr, advertiseQueryAPIAddress: advertiseQueryAPIAddress, }, nil @@ -241,13 +243,33 @@ func (p *Peer) Refresh() error { return nil } - prev := p.mlist.NumMembers() - curr, err := p.mlist.Join(p.knownPeers) + resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, *net.DefaultResolver, false) if err != nil { - return errors.Wrap(err, "could not refresh memberlist cluster") + return errors.Wrapf(err, "refresh cluster could not resolve peers: %v", resolvedPeers) } - level.Debug(p.logger).Log("msg", "refreshed cluster", "before", prev, "after", curr) + currMembers := p.mlist.Members() + for _, peer := range resolvedPeers { + var isPeerFound bool + + for _, mem := range currMembers { + if mem.Address() == peer { + isPeerFound = true + break + } + } + + if !isPeerFound { + curr, err := p.mlist.Join([]string{peer}) + if err != nil { + level.Debug(p.logger).Log("msg", "refresh cluster could not join peer", "peer", peer) + } + + level.Debug(p.logger).Log("msg", "refresh cluster peer joined", "peer", peer, "before", len(currMembers), "after", curr) + } + } + + level.Debug(p.logger).Log("msg", "refresh cluster done", "peers", strings.Join(p.knownPeers, ","), "resolvedPeers", strings.Join(resolvedPeers, ",")) return nil } From 04d93ccd4b6d2afb46611dec78ec67407351ddfc Mon Sep 17 00:00:00 2001 From: Povilas Versockas Date: Mon, 25 Jun 2018 08:47:40 +0300 Subject: [PATCH 4/4] Fixes after review --- pkg/cluster/cluster.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index fd12dcbd2a..098d77e925 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -45,7 +45,7 @@ type Peer struct { const ( DefaultPushPullInterval = 5 * time.Second DefaultGossipInterval = 5 * time.Second - DefaultRefreshInterval = 30 * time.Second + DefaultRefreshInterval = 60 * time.Second ) // PeerType describes a peer's role in the cluster. @@ -249,6 +249,7 @@ func (p *Peer) Refresh() error { } currMembers := p.mlist.Members() + var notConnected []string for _, peer := range resolvedPeers { var isPeerFound bool @@ -260,16 +261,22 @@ func (p *Peer) Refresh() error { } if !isPeerFound { - curr, err := p.mlist.Join([]string{peer}) - if err != nil { - level.Debug(p.logger).Log("msg", "refresh cluster could not join peer", "peer", peer) - } - - level.Debug(p.logger).Log("msg", "refresh cluster peer joined", "peer", peer, "before", len(currMembers), "after", curr) + notConnected = append(notConnected, peer) } } - level.Debug(p.logger).Log("msg", "refresh cluster done", "peers", strings.Join(p.knownPeers, ","), "resolvedPeers", strings.Join(resolvedPeers, ",")) + if len(notConnected) == 0 { + level.Debug(p.logger).Log("msg", "refresh cluster done", "peers", strings.Join(p.knownPeers, ","), "resolvedPeers", strings.Join(resolvedPeers, ",")) + return nil + } + + curr, err := p.mlist.Join(notConnected) + if err != nil { + level.Error(p.logger).Log("msg", "refresh cluster could not join peers", "peers", strings.Join(notConnected, ",")) + } + + level.Debug(p.logger).Log("msg", "refresh cluster done, peers joined", "peers", strings.Join(notConnected, ","), "before", len(currMembers), "after", curr) + return nil }