Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1641 from weaveworks/1554-converge-on-peer-uid-ch…
Browse files Browse the repository at this point in the history
…ange

better convergence when peers restart; fixes #1554.
  • Loading branch information
bboreham committed Nov 9, 2015
2 parents 97622b9 + bfb050c commit 53ac432
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 46 deletions.
4 changes: 0 additions & 4 deletions router/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,6 @@ func (conn *LocalConnection) registerRemote(remote *Peer, acceptNewPeer bool) er
}
}

if conn.remote.UID != remote.UID {
return fmt.Errorf("Connection appears to be with different version of a peer we already know of")
}

if conn.remote == conn.local {
return ErrConnectToSelf
}
Expand Down
10 changes: 10 additions & 0 deletions router/local_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,13 @@ func (peer *LocalPeer) setShortID(shortID PeerShortID) {
peer.ShortID = shortID
peer.Version++
}

func (peer *LocalPeer) setVersionBeyond(version uint64) bool {
peer.Lock()
defer peer.Unlock()
if version >= peer.Version {
peer.Version = version + 1
return true
}
return false
}
76 changes: 34 additions & 42 deletions router/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ func (upe UnknownPeerError) Error() string {
return fmt.Sprint("Reference to unknown peer ", upe.Name)
}

type NameCollisionError struct {
Name PeerName
}

func (nce NameCollisionError) Error() string {
return fmt.Sprint("Multiple peers found with same name: ", nce.Name)
}

type PeerNameSet map[PeerName]struct{}

type ConnectionSummary struct {
Expand All @@ -68,6 +60,9 @@ type PeersPendingNotifications struct {

// The local short ID needs reassigning due to a collision
reassignLocalShortID bool

// The local peer was modified
localPeerModified bool
}

func NewPeers(ourself *LocalPeer) *Peers {
Expand Down Expand Up @@ -100,7 +95,8 @@ func (peers *Peers) OnInvalidateShortIDs(callback func()) {
}

func (peers *Peers) unlockAndNotify(pending *PeersPendingNotifications) {
broadcastLocalPeer := pending.reassignLocalShortID && peers.reassignLocalShortID(pending)
broadcastLocalPeer := (pending.reassignLocalShortID && peers.reassignLocalShortID(pending)) ||
pending.localPeerModified
onGC := peers.onGC
onInvalidateShortIDs := peers.onInvalidateShortIDs
peers.Unlock()
Expand Down Expand Up @@ -436,12 +432,8 @@ func (peers *Peers) decodeUpdate(update []byte) (newPeers map[PeerName]*Peer, de
newPeer := NewPeerFromSummary(peerSummary)
decodedUpdate = append(decodedUpdate, newPeer)
decodedConns = append(decodedConns, connSummaries)
existingPeer, found := peers.byName[newPeer.Name]
if !found {
if _, found := peers.byName[newPeer.Name]; !found {
newPeers[newPeer.Name] = newPeer
} else if existingPeer.UID != newPeer.UID {
err = NameCollisionError{Name: newPeer.Name}
return
}
}

Expand Down Expand Up @@ -469,36 +461,36 @@ func (peers *Peers) applyUpdate(decodedUpdate []*Peer, decodedConns [][]Connecti
connSummaries := decodedConns[idx]
name := newPeer.Name
// guaranteed to find peer in the peers.byName
peer := peers.byName[name]
if peer != newPeer &&
(peer == peers.ourself.Peer || peer.Version >= newPeer.Version) {
// Nobody but us updates us. And if we know more about a
// peer than what's in the the update, we ignore the
// latter.
continue
}
// If we're here, either it was a new peer, or the update has
// more info about the peer than we do. Either case, we need
// to set version and conns and include the updated peer in
// the outgoing update.

// Can peer have been updated by anyone else in the mean time?
// No - we know that peer is not ourself, so the only prospect
// for an update would be someone else calling
// router.Peers.ApplyUpdate. But ApplyUpdate takes the Lock on
// the router.Peers, so there can be no race here.
peer.Version = newPeer.Version
peer.connections = makeConnsMap(peer, connSummaries, peers.byName)

if newPeer.ShortID != peer.ShortID {
peers.deleteByShortID(peer, pending)
peer.ShortID = newPeer.ShortID
peers.addByShortID(peer, pending)
switch peer := peers.byName[name]; peer {
case peers.ourself.Peer:
if newPeer.UID != peer.UID {
// The update contains information about an old
// incarnation of ourselves. We increase our version
// number beyond that which we received, so our
// information supersedes the old one when it is
// received by other peers.
pending.localPeerModified = peers.ourself.setVersionBeyond(newPeer.Version)
}
case newPeer:
peer.connections = makeConnsMap(peer, connSummaries, peers.byName)
newUpdate[name] = void
default: // existing peer
if newPeer.Version < peer.Version ||
(newPeer.Version == peer.Version && newPeer.UID <= peer.UID) {
continue
}
peer.Version = newPeer.Version
peer.UID = newPeer.UID
peer.connections = makeConnsMap(peer, connSummaries, peers.byName)

if newPeer.ShortID != peer.ShortID {
peers.deleteByShortID(peer, pending)
peer.ShortID = newPeer.ShortID
peers.addByShortID(peer, pending)
}
newUpdate[name] = void
}

newUpdate[name] = void
}

return newUpdate
}

Expand Down

0 comments on commit 53ac432

Please sign in to comment.