Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Emit Connectedness notifications to the Event Bus #177

Closed
wants to merge 5 commits into from

Conversation

aarshkshah1992
Copy link
Collaborator

@aarshkshah1992 aarshkshah1992 commented Mar 13, 2020

@Stebalien @raulk
For libp2p/go-libp2p#801.

Please take a look. Have incorporated the discussions from libp2p/go-libp2p#813.

@aarshkshah1992 aarshkshah1992 requested review from Stebalien and raulk and removed request for Stebalien and raulk March 13, 2020 12:26
@aarshkshah1992 aarshkshah1992 changed the title [WIP] Emit Connectedness notifications to the Event Bus Emit Connectedness notifications to the Event Bus Mar 13, 2020
// Emit the Connectedness=Connected event on the bus if we just went
// from having no connections with the peer to having this connection with the peer.
indexForLk := len(p) - 1
stripe := &s.stripedPeerConnectivity[p[indexForLk]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p is a peerID string, and here we're taking the last byte. Is it possible for remote peerids to be biased so a sybil can make all its identities fall into the same stripe? would it make sense to use a hash function?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but this is really just best-effort in case the underlying event bus blocks. Honestly, I'm not really sure how critical it is. Thoughts @raulk?

@@ -60,6 +61,8 @@ type Swarm struct {
m map[peer.ID][]*Conn
}

stripedPeerConnectivity [256]sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull out the number of stripes to a constant

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

defer s2.Close()

// subscribe for notifications on s1
s, err := s1.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests in the package can directly access s1.bus, and don't need the additional interface method.

Copy link
Collaborator Author

@aarshkshah1992 aarshkshah1992 Mar 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@willscott Aha ! Except, the swarm tests are in a different package :p

return nil
}

// EventBus is NOT a part of the network interface.
// It has been exported to help with testing.
func (s *Swarm) EventBus() event.Bus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem like this needs to be exported. testing will either have the event bus it passed in to create the swarm, or can get it back from the host associated with the network.

@@ -75,6 +77,26 @@ func (c *Conn) doClose() {
c.swarm.notifyAll(func(f network.Notifiee) {
f.Disconnected(c.swarm, c)
})

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're still holding the notify lock here. Probably fine but still odd.

c.swarm.conns.RLock()
// we went from having a connection/s with the peer to having no connections to it
// if after this disconnection, we have no OPEN connections with the peer
isConnectednessFlip := len(c.swarm.conns.m[p]) == 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to look at this where we remove the connection from the map (removeConn). If we do this here, multiple connections could hit this same point at the same time, all firing the event.

// Emit the Connectedness=Connected event on the bus if we just went
// from having no connections with the peer to having this connection with the peer.
indexForLk := len(p) - 1
stripe := &s.stripedPeerConnectivity[p[indexForLk]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but this is really just best-effort in case the underlying event bus blocks. Honestly, I'm not really sure how critical it is. Thoughts @raulk?

cs := s.conns.m[p]
// we went from having no connection with the peer to having this connection with the peer
// if this is the ONLY open connection we have with the peer.
isConnectednessFlip := (len(cs) == 1 && cs[0] == c && !cs[0].conn.IsClosed())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as below, we can't re-take the lock when we check this. We need to check if we're going from 0->1 and atomically take the notification lock before dropping the lock on the connection map.

@@ -1,11 +1,13 @@
package swarm

import "C"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wat?

@Stebalien
Copy link
Member

Stebalien commented Mar 16, 2020

Discussed in a call. We probably want per-connection locks, something like:

diff --git a/swarm.go b/swarm.go
index f7c5771..1b7fbee 100644
--- a/swarm.go
+++ b/swarm.go
@@ -43,6 +43,11 @@ var ErrAddrFiltered = errors.New("address filtered")
 // ErrDialTimeout is returned when one a dial times out due to the global timeout
 var ErrDialTimeout = errors.New("dial timed out")
 
+type peerRef struct {
+	sync.Mutex // notification lock
+	conns      []*Conn
+}
+
 // Swarm is a connection muxer, allowing connections to other peers to
 // be opened and closed, while still using the same Chan for all
 // communication. The Chan sends/receives Messages, which note the
@@ -57,7 +62,7 @@ type Swarm struct {
 
 	conns struct {
 		sync.RWMutex
-		m map[peer.ID][]*Conn
+		m map[peer.ID]*peer
 	}
 
 	listeners struct {
@@ -490,23 +495,40 @@ func (s *Swarm) removeConn(c *Conn) {
 	p := c.RemotePeer()
 
 	s.conns.Lock()
-	defer s.conns.Unlock()
-	cs := s.conns.m[p]
-	for i, ci := range cs {
+	cs, ok := s.conns.m[p]
+	s.conns.Unlock()
+	if !ok {
+		return // no peer
+	}
+
+	cs.Lock()
+	if len(cs.conns) == 0 {
+		// XXX This _should_ be impossible. We should call
+		// `doClose` on each connection once (protected by a
+		// sync.Once).
+	}
+
+	for i, ci := range cs.conns {
 		if ci == c {
-			if len(cs) == 1 {
-				delete(s.conns.m, p)
-			} else {
-				// NOTE: We're intentionally preserving order.
-				// This way, connections to a peer are always
-				// sorted oldest to newest.
-				copy(cs[i:], cs[i+1:])
-				cs[len(cs)-1] = nil
-				s.conns.m[p] = cs[:len(cs)-1]
-			}
-			return
+			// NOTE: We're intentionally preserving order.
+			// This way, connections to a peer are always
+			// sorted oldest to newest.
+			copy(cs.conns[i:], cs.conns[i+1:])
+			cs.conns[len(cs.conns)-1] = nil
+			cs.conns = cs.conns[:len(cs.conns)-1]
+			break
 		}
 	}
+
+	if len(cs.conns) == 0 {
+		// NOTIFICATION HERE
+		s.conns.Lock()
+		cs.Unlock()
+
+		delete(s.conns.m, p)
+	} else {
+		s.conns.Unlock()
+	}
 }
 
 // String returns a string representation of Network.

Unfortunately, this still has a problem. If the user tries to connect to the peer, open a stream, etc. we'll block until previous notifications have been received.

An alternative solution is a wait-free (almost) algorithm. Instead of using a lock, we use a flag.

  1. Take the global conns lock.
  2. Check to see if we're going from 0 to 1 or 1 to 0 connects.
  3. Check a "working on it" flag. If it's set, just add the connection and return.
  4. Otherwise, set the "working on it" flag, add the connection, and unlock.
  5. Fire the notification.
  6. Re-take the lock, check if we've changed states again (we fired "connect" but now have no connections OR we fired "disconnect" but now have connections).
  7. If not, unset "working on it" and return.
  8. If so, unlock and fire the next event.

This way, nobody blocks on anybody else and exactly one thread gets stuck doing all the work.

Unfortunately, there's still a catch. We can have the following sequence of events:

  1. Connection closed.
  2. Begin firing disconnect event.
  3. Connection opened.
  4. User notices that the connection is open and uses it.
  5. Connection closes
  6. End firing disconnect, see that we have no connections, and return.

The service in step 4 won't see the expected disconnect event. To fix this, we have to complicate matters a bit and add a "state has changed" flag.

  1. If we don't fire an event because another thread is "working on it", set this flag.
  2. If we're "working on it", always unset this flag before firing an event.
  3. When we're "working on it" and we're checking to see if the state has changed again, check this flag. If set...
    1. ...if we just fired a disconnect event and...
      1. ...the state is connected...
        1. ...unset the flag...
        2. ...fire a connect event...
        3. ... loop and check again.
      2. ...the state is disconnected...
        1. ...LEAVE the flag...
        2. ...fire a CONNECT event...
        3. ... loop and check again.
    2. ...if we just fired a connect event and...
      1. ...the state is connected... (we've gone through a disconnect -> connect)
        1. ...unset the flag...
        2. return
      2. ...the state is disconnected...
        1. ...unset the flag...
        2. ...fire a disconnect event...
        3. ... loop and check again.

@aarshkshah1992
Copy link
Collaborator Author

aarshkshah1992 commented Mar 17, 2020

@Stebalien @raulk

Given the complexity of doing this here, would it be wrong to do it in the Host as we do right now by paying the cost of mantaining the connectedness state for each connected peer ?

@Stebalien
Copy link
Member

I'd kind of like to "do this the right way" but let's put this on hold until we've cut a go-ipfs release.

@lthibault
Copy link

Hello, has there been any progress on this issue?

I'm currently relying on a jury-rigged solution using Host.Network().Notify(...), but it's only really suitable as a temporary stopgap.

Is there a revised ETA for this now that v0.5.1 is out?

Thanks!

@aarshkshah1992
Copy link
Collaborator Author

@lthibault The team has been busy focusing on hardening Content Routing in go-ipfs and improving the performance of our Kademlia DHT.

I've assigned myself to work on this and would be picking it up after roughly two weeks.

@lthibault
Copy link

@aarshkshah1992 That's wonderful news -- thanks very much for the update!

@marten-seemann
Copy link
Contributor

This is now done by the host: libp2p/go-libp2p#1230.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants