Skip to content

Commit

Permalink
Change to list current members and join only new ones
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Jun 22, 2018
1 parent 14e5906 commit 4713645
Showing 1 changed file with 34 additions and 13 deletions.
47 changes: 34 additions & 13 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Peer struct {

cfg *memberlist.Config
knownPeers []string
advertiseAddr string
refreshInterval time.Duration

data *data
Expand Down Expand Up @@ -124,7 +125,7 @@ func New(
level.Warn(l).Log("err", "provide --cluster.advertise-address as a routable IP address or hostname")
}

resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, net.Resolver{}, waitIfEmpty)
resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, *net.DefaultResolver, waitIfEmpty)
if err != nil {
return nil, errors.Wrap(err, "resolve peers")
}
Expand Down Expand Up @@ -161,14 +162,15 @@ func New(
reg.MustRegister(gossipClusterMembers)

return &Peer{
logger: l,
knownPeers: knownPeers,
cfg: cfg,
refreshInterval: refreshInterval,
gossipMsgsReceived: gossipMsgsReceived,
gossipClusterMembers: gossipClusterMembers,
stopc: make(chan struct{}),
data: &data{data: map[string]PeerState{}},
logger: l,
knownPeers: knownPeers,
cfg: cfg,
refreshInterval: refreshInterval,
gossipMsgsReceived: gossipMsgsReceived,
gossipClusterMembers: gossipClusterMembers,
stopc: make(chan struct{}),
data: &data{data: map[string]PeerState{}},
advertiseAddr: advertiseAddr,
advertiseStoreAPIAddr: advertiseStoreAPIAddr,
advertiseQueryAPIAddress: advertiseQueryAPIAddress,
}, nil
Expand Down Expand Up @@ -241,13 +243,32 @@ func (p *Peer) Refresh() error {
return nil
}

prev := p.mlist.NumMembers()
curr, err := p.mlist.Join(p.knownPeers)
resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, *net.DefaultResolver, false)
if err != nil {
return errors.Wrap(err, "could not refresh memberlist cluster")
return errors.Wrapf(err, "refresh cluster could not resolve peers: %v", resolvedPeers)
}
level.Debug(p.logger).Log("msg", "refreshed cluster", "before", prev, "after", curr)

currMembers := p.mlist.Members()
for _, peer := range resolvedPeers {
var isPeerFound bool

for _, mem := range currMembers {
if mem.Address() == peer {
isPeerFound = true
break
}
}

if !isPeerFound {
curr, err := p.mlist.Join([]string{peer})
if err != nil {
return errors.Wrapf(err, "refresh cluster could not join peer: %s", peer)
}
level.Debug(p.logger).Log("msg", "refresh cluster peer joined", "peer", peer, "before", len(currMembers), "after", curr)
}
}

level.Debug(p.logger).Log("msg", "refresh cluster done", "peers", strings.Join(p.knownPeers, ","), "resolvedPeers", strings.Join(resolvedPeers, ","))
return nil
}

Expand Down

0 comments on commit 4713645

Please sign in to comment.