Skip to content

Commit

Permalink
Add cluster.peers refresh (#383)
Browse files Browse the repository at this point in the history
* Add cluster.peers refresh

* Fix test

* Change to list current members and join only new ones

* Fixes after review
  • Loading branch information
povilasv authored and bwplotka committed Jun 26, 2018
1 parent 6cc17fe commit fa26827
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 11 deletions.
4 changes: 3 additions & 1 deletion cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Lo
pushPullInterval := cmd.Flag("cluster.pushpull-interval", "Interval for gossip state syncs. Setting this interval lower (more frequent) will increase convergence speeds across larger clusters at the expense of increased bandwidth usage.").
Default(cluster.DefaultPushPullInterval.String()).Duration()

refreshInterval := cmd.Flag("cluster.refresh-interval", "Interval for membership to refresh cluster.peers state, 0 disables refresh.").Default(cluster.DefaultRefreshInterval.String()).Duration()

return grpcBindAddr,
httpBindAddr,
func(logger log.Logger, reg *prometheus.Registry, waitIfEmpty bool, httpAdvertiseAddr string, queryAPIEnabled bool) (*cluster.Peer, error) {
Expand Down Expand Up @@ -66,7 +68,7 @@ func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Lo
level.Info(logger).Log("msg", "QueryAPI address that will be propagated through gossip", "address", advQueryAPIAddress)
}

return cluster.New(logger, reg, *clusterBindAddr, *clusterAdvertiseAddr, advStoreAPIAddress, advQueryAPIAddress, *peers, waitIfEmpty, *gossipInterval, *pushPullInterval)
return cluster.New(logger, reg, *clusterBindAddr, *clusterAdvertiseAddr, advStoreAPIAddress, advQueryAPIAddress, *peers, waitIfEmpty, *gossipInterval, *pushPullInterval, *refreshInterval)
}
}

Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func runQuery(
l.Close()
})
}

level.Info(logger).Log("msg", "starting query node")
return nil
}
Expand Down
92 changes: 82 additions & 10 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ type Peer struct {
mlist *memberlist.Memberlist
stopc chan struct{}

cfg *memberlist.Config
knownPeers []string
cfg *memberlist.Config
knownPeers []string
advertiseAddr string
refreshInterval time.Duration

data *data
gossipMsgsReceived prometheus.Counter
Expand All @@ -43,6 +45,7 @@ type Peer struct {
const (
DefaultPushPullInterval = 5 * time.Second
DefaultGossipInterval = 5 * time.Second
DefaultRefreshInterval = 60 * time.Second
)

// PeerType describes a peer's role in the cluster.
Expand Down Expand Up @@ -97,6 +100,7 @@ func New(
waitIfEmpty bool,
pushPullInterval time.Duration,
gossipInterval time.Duration,
refreshInterval time.Duration,
) (*Peer, error) {
l = log.With(l, "component", "cluster")

Expand All @@ -121,7 +125,7 @@ func New(
level.Warn(l).Log("err", "provide --cluster.advertise-address as a routable IP address or hostname")
}

resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, net.Resolver{}, waitIfEmpty)
resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, *net.DefaultResolver, waitIfEmpty)
if err != nil {
return nil, errors.Wrap(err, "resolve peers")
}
Expand Down Expand Up @@ -158,13 +162,15 @@ func New(
reg.MustRegister(gossipClusterMembers)

return &Peer{
logger: l,
knownPeers: knownPeers,
cfg: cfg,
gossipMsgsReceived: gossipMsgsReceived,
gossipClusterMembers: gossipClusterMembers,
stopc: make(chan struct{}),
data: &data{data: map[string]PeerState{}},
logger: l,
knownPeers: knownPeers,
cfg: cfg,
refreshInterval: refreshInterval,
gossipMsgsReceived: gossipMsgsReceived,
gossipClusterMembers: gossipClusterMembers,
stopc: make(chan struct{}),
data: &data{data: map[string]PeerState{}},
advertiseAddr: advertiseAddr,
advertiseStoreAPIAddr: advertiseStoreAPIAddr,
advertiseQueryAPIAddress: advertiseQueryAPIAddress,
}, nil
Expand Down Expand Up @@ -205,6 +211,72 @@ func (p *Peer) Join(peerType PeerType, initialMetadata PeerMetadata) error {
Metadata: initialMetadata,
})

if p.refreshInterval != 0 {
go p.periodicallyRefresh()
}

return nil
}

func (p *Peer) periodicallyRefresh() {
tick := time.NewTicker(p.refreshInterval)
defer tick.Stop()

for {
select {
case <-p.stopc:
return
case <-tick.C:
if err := p.Refresh(); err != nil {
level.Error(p.logger).Log("msg", "Refreshing memberlist", "err", err)
}
}
}
}

// Refresh renews membership cluster, this will refresh DNS names and join newly added members
func (p *Peer) Refresh() error {
p.mlistMtx.Lock()
defer p.mlistMtx.Unlock()

if p.mlist == nil {
return nil
}

resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, *net.DefaultResolver, false)
if err != nil {
return errors.Wrapf(err, "refresh cluster could not resolve peers: %v", resolvedPeers)
}

currMembers := p.mlist.Members()
var notConnected []string
for _, peer := range resolvedPeers {
var isPeerFound bool

for _, mem := range currMembers {
if mem.Address() == peer {
isPeerFound = true
break
}
}

if !isPeerFound {
notConnected = append(notConnected, peer)
}
}

if len(notConnected) == 0 {
level.Debug(p.logger).Log("msg", "refresh cluster done", "peers", strings.Join(p.knownPeers, ","), "resolvedPeers", strings.Join(resolvedPeers, ","))
return nil
}

curr, err := p.mlist.Join(notConnected)
if err != nil {
level.Error(p.logger).Log("msg", "refresh cluster could not join peers", "peers", strings.Join(notConnected, ","))
}

level.Debug(p.logger).Log("msg", "refresh cluster done, peers joined", "peers", strings.Join(notConnected, ","), "before", len(currMembers), "after", curr)

return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func joinPeer(num int, knownPeers []string) (peerAddr string, peer *Peer, err er
false,
100*time.Millisecond,
50*time.Millisecond,
30*time.Millisecond,
)
if err != nil {
return "", nil, err
Expand Down

0 comments on commit fa26827

Please sign in to comment.