-
Notifications
You must be signed in to change notification settings - Fork 16
Various floodsub issues #51
Changes from 18 commits
2835336
6a45bfe
e8df6d6
81d50fe
9eb14e7
8303a97
3edacfe
ff77021
eaedffe
21773d0
19adfd7
2246baf
0e3af5e
bfeb6ae
6aa60ef
35a94aa
2e8dbe6
9537fb7
0dfd20a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,17 +59,53 @@ class FloodSub extends EventEmitter { | |
this._dialPeer = this._dialPeer.bind(this) | ||
} | ||
|
||
_addPeer (peer) { | ||
const id = peer.info.id.toB58String() | ||
|
||
/* | ||
Always use an existing peer. | ||
|
||
What is happening here is: "If the other peer has already dialed to me, we already have | ||
an establish link between the two, what might be missing is a | ||
Connection specifically between me and that Peer" | ||
*/ | ||
let existing = this.peers.get(id) | ||
if (existing) { | ||
log('already existing peer', id) | ||
++existing._references | ||
} else { | ||
log('new peer', id) | ||
this.peers.set(id, peer) | ||
existing = peer | ||
} | ||
|
||
return existing | ||
} | ||
|
||
_removePeer (peer) { | ||
const id = peer.info.id.toB58String() | ||
|
||
log('remove', id, peer._references) | ||
// Only delete when no one else is referencing this peer. | ||
if (--peer._references === 0) { | ||
log('delete peer', id) | ||
this.peers.delete(id) | ||
} | ||
|
||
return peer | ||
} | ||
|
||
_dialPeer (peerInfo, callback) { | ||
callback = callback || function noop () {} | ||
const idB58Str = peerInfo.id.toB58String() | ||
log('dialing %s', idB58Str) | ||
|
||
// If already have a PubSub conn, ignore | ||
const peer = this.peers.get(idB58Str) | ||
if (peer && peer.isConnected) { | ||
return setImmediate(() => callback()) | ||
} | ||
|
||
log('dialing %s', idB58Str) | ||
this.libp2p.dial(peerInfo, multicodec, (err, conn) => { | ||
if (err) { | ||
log.err(err) | ||
|
@@ -82,13 +118,9 @@ class FloodSub extends EventEmitter { | |
|
||
_onDial (peerInfo, conn, callback) { | ||
const idB58Str = peerInfo.id.toB58String() | ||
log('connected', idB58Str) | ||
|
||
// If already had a dial to me, just add the conn | ||
if (!this.peers.has(idB58Str)) { | ||
this.peers.set(idB58Str, new Peer(peerInfo)) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is important logic. What is happening here is: "If the other peer has already dialed to me, we already have an establish link between the two, what might be missing is a Connection specifically between me and that Peer" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was copy and paste from original code. will add the comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code was moved to |
||
|
||
const peer = this.peers.get(idB58Str) | ||
const peer = this._addPeer(new Peer(peerInfo)) | ||
peer.attachConnection(conn) | ||
|
||
// Immediately send my own subscriptions to the newly established conn | ||
|
@@ -104,24 +136,20 @@ class FloodSub extends EventEmitter { | |
} | ||
|
||
const idB58Str = peerInfo.id.toB58String() | ||
const peer = this._addPeer(new Peer(peerInfo)) | ||
|
||
if (!this.peers.has(idB58Str)) { | ||
log('new peer', idB58Str) | ||
this.peers.set(idB58Str, new Peer(peerInfo)) | ||
} | ||
|
||
this._processConnection(idB58Str, conn) | ||
this._processConnection(idB58Str, conn, peer) | ||
}) | ||
} | ||
|
||
_processConnection (idB58Str, conn) { | ||
_processConnection (idB58Str, conn, peer) { | ||
pull( | ||
conn, | ||
lp.decode(), | ||
pull.map((data) => pb.rpc.RPC.decode(data)), | ||
pull.drain( | ||
(rpc) => this._onRpc(idB58Str, rpc), | ||
(err) => this._onConnectionEnd(idB58Str, err) | ||
(err) => this._onConnectionEnd(idB58Str, peer, err) | ||
) | ||
) | ||
} | ||
|
@@ -131,11 +159,12 @@ class FloodSub extends EventEmitter { | |
return | ||
} | ||
|
||
log('rpc from', idB58Str) | ||
const subs = rpc.subscriptions | ||
const msgs = rpc.msgs | ||
|
||
if (msgs && msgs.length) { | ||
this._processRpcMessages(rpc.msgs) | ||
this._processRpcMessages(utils.normalizeInRpcMessages(rpc.msgs)) | ||
} | ||
|
||
if (subs && subs.length) { | ||
|
@@ -164,13 +193,14 @@ class FloodSub extends EventEmitter { | |
}) | ||
} | ||
|
||
_onConnectionEnd (idB58Str, err) { | ||
_onConnectionEnd (idB58Str, peer, err) { | ||
// socket hang up, means the one side canceled | ||
if (err && err.message !== 'socket hang up') { | ||
log.err(err) | ||
} | ||
|
||
this.peers.delete(idB58Str) | ||
log('connection ended', idB58Str, err ? err.message : '') | ||
this._removePeer(peer) | ||
} | ||
|
||
_emitMessages (topics, messages) { | ||
|
@@ -191,7 +221,7 @@ class FloodSub extends EventEmitter { | |
return | ||
} | ||
|
||
peer.sendMessages(messages) | ||
peer.sendMessages(utils.normalizeOutRpcMessages(messages)) | ||
|
||
log('publish msgs on topics', topics, peer.info.id.toB58String()) | ||
}) | ||
|
@@ -241,11 +271,15 @@ class FloodSub extends EventEmitter { | |
this.libp2p.unhandle(multicodec) | ||
this.libp2p.removeListener('peer:connect', this._dialPeer) | ||
|
||
log('stopping') | ||
asyncEach(this.peers.values(), (peer, cb) => peer.close(cb), (err) => { | ||
if (err) { | ||
return callback(err) | ||
} | ||
|
||
log('stopped') | ||
this.peers = new Map() | ||
this.subscriptions = new Set() | ||
this.started = false | ||
callback() | ||
}) | ||
|
@@ -287,7 +321,7 @@ class FloodSub extends EventEmitter { | |
this._emitMessages(topics, msgObjects) | ||
|
||
// send to all the other peers | ||
this._forwardMessages(topics, messages.map(buildMessage)) | ||
this._forwardMessages(topics, msgObjects) | ||
} | ||
|
||
/** | ||
|
@@ -303,14 +337,18 @@ class FloodSub extends EventEmitter { | |
|
||
topics.forEach((topic) => this.subscriptions.add(topic)) | ||
|
||
this.peers.forEach((peer) => checkIfReady(peer)) | ||
this.peers.forEach((peer) => sendSubscriptionsOnceReady(peer)) | ||
// make sure that FloodSub is already mounted | ||
function checkIfReady (peer) { | ||
function sendSubscriptionsOnceReady (peer) { | ||
if (peer && peer.isWritable) { | ||
peer.sendSubscriptions(topics) | ||
} else { | ||
setImmediate(checkIfReady.bind(peer)) | ||
return peer.sendSubscriptions(topics) | ||
} | ||
const onConnection = () => { | ||
peer.removeListened('connection', onConnection) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pgte Should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @richardschneider oops, thanks! @diasdavid fixed it for me.. |
||
sendSubscriptionsOnceReady(peer) | ||
} | ||
peer.on('connection', onConnection) | ||
peer.once('close', () => peer.removeListened('connection', onConnection)) | ||
} | ||
} | ||
|
||
|
@@ -321,7 +359,11 @@ class FloodSub extends EventEmitter { | |
* @returns {undefined} | ||
*/ | ||
unsubscribe (topics) { | ||
assert(this.started, 'FloodSub is not started') | ||
// Avoid race conditions, by quietly ignoring unsub when shutdown. | ||
if (!this.started) { | ||
return | ||
} | ||
|
||
topics = ensureArray(topics) | ||
|
||
topics.forEach((topic) => this.subscriptions.delete(topic)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,17 +4,20 @@ const lp = require('pull-length-prefixed') | |
const Pushable = require('pull-pushable') | ||
const pull = require('pull-stream') | ||
const setImmediate = require('async/setImmediate') | ||
const EventEmitter = require('events') | ||
|
||
const rpc = require('./message').rpc.RPC | ||
|
||
/** | ||
* The known state of a connected peer. | ||
*/ | ||
class Peer { | ||
class Peer extends EventEmitter { | ||
/** | ||
* @param {PeerInfo} info | ||
*/ | ||
constructor (info) { | ||
super() | ||
|
||
/** | ||
* @type {PeerInfo} | ||
*/ | ||
|
@@ -31,6 +34,8 @@ class Peer { | |
* @type {Pushable} | ||
*/ | ||
this.stream = null | ||
|
||
this._references = 1 | ||
} | ||
|
||
/** | ||
|
@@ -80,8 +85,15 @@ class Peer { | |
pull( | ||
this.stream, | ||
lp.encode(), | ||
conn | ||
conn, | ||
pull.onEnd(() => { | ||
this.conn = null | ||
this.stream = null | ||
this.emit('close') | ||
}) | ||
) | ||
|
||
this.emit('connection') | ||
} | ||
|
||
_sendRawSubscriptions (topics, subscribe) { | ||
|
@@ -155,16 +167,18 @@ class Peer { | |
* @returns {undefined} | ||
*/ | ||
close (callback) { | ||
if (!this.conn || !this.stream) { | ||
// no connection to close | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why remove? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because it never invoked the callback, read the rest of the code. This is still WIP, reviews are fine but DO NOT merge There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I take it back. This code does nothing. |
||
// end the pushable pull-stream | ||
// Force removal of peer | ||
this._references = 1 | ||
|
||
// End the pushable | ||
if (this.stream) { | ||
this.stream.end() | ||
} | ||
|
||
setImmediate(() => { | ||
this.conn = null | ||
this.stream = null | ||
this.emit('close') | ||
callback() | ||
}) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if can be inverted for great readability (by this I mean, avoid do negative equals with things like undefined or null)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, positive logic is always better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even better