-
Notifications
You must be signed in to change notification settings - Fork 37
Emit Connectedness notifications to the Event Bus #177
Conversation
// Emit the Connectedness=Connected event on the bus if we just went | ||
// from having no connections with the peer to having this connection with the peer. | ||
indexForLk := len(p) - 1 | ||
stripe := &s.stripedPeerConnectivity[p[indexForLk]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p is a peerID string, and here we're taking the last byte. Is it possible for remote peerids to be biased so a sybil can make all its identities fall into the same stripe? would it make sense to use a hash function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but this is really just best-effort in case the underlying event bus blocks. Honestly, I'm not really sure how critical it is. Thoughts @raulk?
@@ -60,6 +61,8 @@ type Swarm struct { | |||
m map[peer.ID][]*Conn | |||
} | |||
|
|||
stripedPeerConnectivity [256]sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pull out the number of stripes to a constant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
defer s2.Close() | ||
|
||
// subscribe for notifications on s1 | ||
s, err := s1.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests in the package can directly access s1.bus
, and don't need the additional interface method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@willscott Aha ! Except, the swarm tests are in a different package :p
return nil | ||
} | ||
|
||
// EventBus is NOT a part of the network interface. | ||
// It has been exported to help with testing. | ||
func (s *Swarm) EventBus() event.Bus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't seem like this needs to be exported. testing will either have the event bus it passed in to create the swarm, or can get it back from the host associated with the network.
@@ -75,6 +77,26 @@ func (c *Conn) doClose() { | |||
c.swarm.notifyAll(func(f network.Notifiee) { | |||
f.Disconnected(c.swarm, c) | |||
}) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're still holding the notify lock here. Probably fine but still odd.
c.swarm.conns.RLock() | ||
// we went from having a connection/s with the peer to having no connections to it | ||
// if after this disconnection, we have no OPEN connections with the peer | ||
isConnectednessFlip := len(c.swarm.conns.m[p]) == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to look at this where we remove the connection from the map (removeConn
). If we do this here, multiple connections could hit this same point at the same time, all firing the event.
// Emit the Connectedness=Connected event on the bus if we just went | ||
// from having no connections with the peer to having this connection with the peer. | ||
indexForLk := len(p) - 1 | ||
stripe := &s.stripedPeerConnectivity[p[indexForLk]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but this is really just best-effort in case the underlying event bus blocks. Honestly, I'm not really sure how critical it is. Thoughts @raulk?
cs := s.conns.m[p] | ||
// we went from having no connection with the peer to having this connection with the peer | ||
// if this is the ONLY open connection we have with the peer. | ||
isConnectednessFlip := (len(cs) == 1 && cs[0] == c && !cs[0].conn.IsClosed()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as below, we can't re-take the lock when we check this. We need to check if we're going from 0->1 and atomically take the notification lock before dropping the lock on the connection map.
@@ -1,11 +1,13 @@ | |||
package swarm | |||
|
|||
import "C" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wat?
Discussed in a call. We probably want per-connection locks, something like: diff --git a/swarm.go b/swarm.go
index f7c5771..1b7fbee 100644
--- a/swarm.go
+++ b/swarm.go
@@ -43,6 +43,11 @@ var ErrAddrFiltered = errors.New("address filtered")
// ErrDialTimeout is returned when one a dial times out due to the global timeout
var ErrDialTimeout = errors.New("dial timed out")
+type peerRef struct {
+ sync.Mutex // notification lock
+ conns []*Conn
+}
+
// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
@@ -57,7 +62,7 @@ type Swarm struct {
conns struct {
sync.RWMutex
- m map[peer.ID][]*Conn
+ m map[peer.ID]*peer
}
listeners struct {
@@ -490,23 +495,40 @@ func (s *Swarm) removeConn(c *Conn) {
p := c.RemotePeer()
s.conns.Lock()
- defer s.conns.Unlock()
- cs := s.conns.m[p]
- for i, ci := range cs {
+ cs, ok := s.conns.m[p]
+ s.conns.Unlock()
+ if !ok {
+ return // no peer
+ }
+
+ cs.Lock()
+ if len(cs.conns) == 0 {
+ // XXX This _should_ be impossible. We should call
+ // `doClose` on each connection once (protected by a
+ // sync.Once).
+ }
+
+ for i, ci := range cs.conns {
if ci == c {
- if len(cs) == 1 {
- delete(s.conns.m, p)
- } else {
- // NOTE: We're intentionally preserving order.
- // This way, connections to a peer are always
- // sorted oldest to newest.
- copy(cs[i:], cs[i+1:])
- cs[len(cs)-1] = nil
- s.conns.m[p] = cs[:len(cs)-1]
- }
- return
+ // NOTE: We're intentionally preserving order.
+ // This way, connections to a peer are always
+ // sorted oldest to newest.
+ copy(cs.conns[i:], cs.conns[i+1:])
+ cs.conns[len(cs.conns)-1] = nil
+ cs.conns = cs.conns[:len(cs.conns)-1]
+ break
}
}
+
+ if len(cs.conns) == 0 {
+ // NOTIFICATION HERE
+ s.conns.Lock()
+ cs.Unlock()
+
+ delete(s.conns.m, p)
+ } else {
+ s.conns.Unlock()
+ }
}
// String returns a string representation of Network. Unfortunately, this still has a problem. If the user tries to connect to the peer, open a stream, etc. we'll block until previous notifications have been received. An alternative solution is a wait-free (almost) algorithm. Instead of using a lock, we use a flag.
This way, nobody blocks on anybody else and exactly one thread gets stuck doing all the work. Unfortunately, there's still a catch. We can have the following sequence of events:
The service in step 4 won't see the expected disconnect event. To fix this, we have to complicate matters a bit and add a "state has changed" flag.
|
Given the complexity of doing this here, would it be wrong to do it in the Host as we do right now by paying the cost of mantaining the connectedness state for each connected peer ? |
I'd kind of like to "do this the right way" but let's put this on hold until we've cut a go-ipfs release. |
Hello, has there been any progress on this issue? I'm currently relying on a jury-rigged solution using Is there a revised ETA for this now that v0.5.1 is out? Thanks! |
@lthibault The team has been busy focusing on hardening Content Routing in I've assigned myself to work on this and would be picking it up after roughly two weeks. |
@aarshkshah1992 That's wonderful news -- thanks very much for the update! |
This is now done by the host: libp2p/go-libp2p#1230. |
@Stebalien @raulk
For libp2p/go-libp2p#801.
Please take a look. Have incorporated the discussions from libp2p/go-libp2p#813.