Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
better convergence when peers restart
Browse files Browse the repository at this point in the history
Previously when a peer restarted, information about the new
incarnation (i.e. with a different UID) was not accepted by other
peers (and connections would be dropped) unless all knowledge of the
previous incarnation had been purged. This could result in a lot of
connection churn and hence connectivity disruption, and, in some
pathological cases, very slow convergence and hence acceptance of the
new incarnation into the network.

We now no longer drop connections when encountering different
incarnations of a peer. There are two situations when that can happen:

1) on connection establishment

we simply proceed

2) on receipt of gossip

to ensure convergence we

a) treat the UID as an additional discriminator when deciding whether
we should update our information about a peer with that which was
gossiped. Specifically, we update the information we hold when a) the
gossiped version is greater, or b) is the same and the UID is
greater.

b) include the UID in the information we update

c) move our own version number beyond any we receive for ourselves, if
the received UID differs from ours.

With (a) we establishes a total order of peer information across
several incarnations of the same peer. i.e. we consider information to
be fresher if it has a higher version, or the same version and higher
UID. This may seem somewhat counter intutive, since it will generally
treat information about new incarnations as older than old
incarnations, since incarnations always start life with version 1. But
to do better we'd need to establish a total order of incarnations that
matches their temporal occurrence. Which requires some sort of durable
state.

So instead we have (c). Through that we learn the highest version
number of any old incarnation of ourselves that other peers still
hold, and then make sure that our version is greater than
that. Essentially we continue where the old incarnations left
off. It's as if instead of restarting we had simply changed UIDs. And
due to (a) and (b) the information about the new incarnation of
ourselves, now with a higher version, will supersede that of the old
incarnations.

Fixes #1554.
  • Loading branch information
rade committed Nov 5, 2015
1 parent 6c0bdec commit 301d97c
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 46 deletions.
4 changes: 0 additions & 4 deletions router/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,6 @@ func (conn *LocalConnection) registerRemote(remote *Peer, acceptNewPeer bool) er
}
}

if conn.remote.UID != remote.UID {
return fmt.Errorf("Connection appears to be with different version of a peer we already know of")
}

if conn.remote == conn.local {
return ErrConnectToSelf
}
Expand Down
10 changes: 10 additions & 0 deletions router/local_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,13 @@ func (peer *LocalPeer) setShortID(shortID PeerShortID) {
peer.ShortID = shortID
peer.Version++
}

func (peer *LocalPeer) setVersionBeyond(version uint64) bool {
peer.Lock()
defer peer.Unlock()
if version >= peer.Version {
peer.Version = version + 1
return true
}
return false
}
72 changes: 34 additions & 38 deletions router/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ type UnknownPeerError struct {
Name PeerName
}

type NameCollisionError struct {
Name PeerName
}

type PeerNameSet map[PeerName]struct{}

type ConnectionSummary struct {
Expand All @@ -57,6 +53,9 @@ type PeersPendingNotifications struct {

// The local short ID needs reassigning due to a collision
reassignLocalShortID bool

// The local peer was modified
localPeerModified bool
}

func NewPeers(ourself *LocalPeer) *Peers {
Expand Down Expand Up @@ -89,7 +88,8 @@ func (peers *Peers) OnInvalidateShortIDs(callback func()) {
}

func (peers *Peers) unlockAndNotify(pending *PeersPendingNotifications) {
broadcastLocalPeer := pending.reassignLocalShortID && peers.reassignLocalShortID(pending)
broadcastLocalPeer := (pending.reassignLocalShortID && peers.reassignLocalShortID(pending)) ||
pending.localPeerModified
onGC := peers.onGC
onInvalidateShortIDs := peers.onInvalidateShortIDs
peers.Unlock()
Expand Down Expand Up @@ -425,12 +425,8 @@ func (peers *Peers) decodeUpdate(update []byte) (newPeers map[PeerName]*Peer, de
newPeer := NewPeerFromSummary(peerSummary)
decodedUpdate = append(decodedUpdate, newPeer)
decodedConns = append(decodedConns, connSummaries)
existingPeer, found := peers.byName[newPeer.Name]
if !found {
if _, found := peers.byName[newPeer.Name]; !found {
newPeers[newPeer.Name] = newPeer
} else if existingPeer.UID != newPeer.UID {
err = NameCollisionError{Name: newPeer.Name}
return
}
}

Expand Down Expand Up @@ -458,36 +454,36 @@ func (peers *Peers) applyUpdate(decodedUpdate []*Peer, decodedConns [][]Connecti
connSummaries := decodedConns[idx]
name := newPeer.Name
// guaranteed to find peer in the peers.byName
peer := peers.byName[name]
if peer != newPeer &&
(peer == peers.ourself.Peer || peer.Version >= newPeer.Version) {
// Nobody but us updates us. And if we know more about a
// peer than what's in the the update, we ignore the
// latter.
continue
}
// If we're here, either it was a new peer, or the update has
// more info about the peer than we do. Either case, we need
// to set version and conns and include the updated peer in
// the outgoing update.

// Can peer have been updated by anyone else in the mean time?
// No - we know that peer is not ourself, so the only prospect
// for an update would be someone else calling
// router.Peers.ApplyUpdate. But ApplyUpdate takes the Lock on
// the router.Peers, so there can be no race here.
peer.Version = newPeer.Version
peer.connections = makeConnsMap(peer, connSummaries, peers.byName)

if newPeer.ShortID != peer.ShortID {
peers.deleteByShortID(peer, pending)
peer.ShortID = newPeer.ShortID
peers.addByShortID(peer, pending)
switch peer := peers.byName[name]; peer {
case newPeer:
peer.connections = makeConnsMap(peer, connSummaries, peers.byName)
newUpdate[name] = void
case peers.ourself.Peer:
if newPeer.UID != peer.UID {
// The update contains information about an old
// incarnation of ourselves. We increase our version
// number beyond that which we received, so our
// information supersedes the old one when it is
// received by other peers.
pending.localPeerModified = peers.ourself.setVersionBeyond(newPeer.Version)
}
default:
if newPeer.Version < peer.Version ||
(newPeer.Version == peer.Version && newPeer.UID <= peer.UID) {
continue
}
peer.Version = newPeer.Version
peer.UID = newPeer.UID
peer.connections = makeConnsMap(peer, connSummaries, peers.byName)

if newPeer.ShortID != peer.ShortID {
peers.deleteByShortID(peer, pending)
peer.ShortID = newPeer.ShortID
peers.addByShortID(peer, pending)
}
newUpdate[name] = void
}

newUpdate[name] = void
}

return newUpdate
}

Expand Down
4 changes: 0 additions & 4 deletions router/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ func (upe UnknownPeerError) Error() string {
return fmt.Sprint("Reference to unknown peer ", upe.Name)
}

func (nce NameCollisionError) Error() string {
return fmt.Sprint("Multiple peers found with same name: ", nce.Name)
}

func (pde PacketDecodingError) Error() string {
return fmt.Sprint("Failed to decode packet: ", pde.Desc)
}
Expand Down

0 comments on commit 301d97c

Please sign in to comment.