diff --git a/src/pubsub.js b/src/pubsub.js index a0a9c0a9..9f9c1d7f 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -199,12 +199,26 @@ class BasicPubSub extends Pubsub { } if (msgs.length) { - utils.normalizeInRpcMessages(msgs).forEach((msg) => { + msgs.forEach(message => { + const msg = utils.normalizeInRpcMessage(message) const seqno = utils.msgId(msg.from, msg.seqno) - if (!this.seenCache.has(seqno)) { - this._processRpcMessage(msg) - this.seenCache.put(seqno) + + // Ignore if we've already seen the message + if (this.seenCache.has(seqno)) { + return } + + this.seenCache.put(seqno) + + // Ensure the message is valid before processing it + this.validate(message, (err, isValid) => { + if (err || !isValid) { + this.log('Message could not be validated, dropping it. isValid=%s', isValid, err) + return + } + + this._processRpcMessage(msg) + }) }) } this._handleRpcControl(peer, rpc) diff --git a/test/gossip.js b/test/gossip.js index ffed3f39..53abc1a1 100644 --- a/test/gossip.js +++ b/test/gossip.js @@ -3,6 +3,7 @@ const { expect } = require('chai') const sinon = require('sinon') +const promisify = require('promisify-es6') const { GossipSubDhi } = require('../src/constants') const { @@ -48,7 +49,7 @@ describe('gossip', () => { // set spy sinon.spy(nodeA.gs, 'log') - nodeA.gs.publish(topic, Buffer.from('hey')) + await promisify(nodeA.gs.publish, { context: nodeA.gs })(topic, Buffer.from('hey')) await new Promise((resolve) => nodeA.gs.once('gossipsub:heartbeat', resolve)) expect(nodeA.gs.log.callCount).to.be.gt(1) nodeA.gs.log.getCalls() @@ -88,8 +89,8 @@ describe('gossip', () => { // manually add control message to be sent to peerB nodeA.gs.control.set(peerB, { graft: [{ topicID: topic }] }) - nodeA.gs.publish(topic, Buffer.from('hey')) - await new Promise((resolve) => setTimeout(resolve, 500)) + await promisify(nodeA.gs.publish, { context: nodeA.gs })(topic, Buffer.from('hey')) + await new Promise((resolve) => nodeA.gs.once('gossipsub:heartbeat', resolve)) expect(nodeB.gs.log.callCount).to.be.gt(1) // expect control message to be sent alongside published message const call = nodeB.gs.log.getCalls().find((call) => call.args[0] === 'GRAFT: Add mesh link from %s in %s') diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js new file mode 100644 index 00000000..127c1aac --- /dev/null +++ b/test/pubsub.spec.js @@ -0,0 +1,143 @@ +'use strict' +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const sinon = require('sinon') +const { utils } = require('libp2p-pubsub') +const promisify = require('promisify-es6') +const { + createNode, + startNode, + stopNode +} = require('./utils') + +describe('Pubsub', () => { + let nodeA + let gossipsub + before(async () => { + nodeA = await createNode('/ip4/127.0.0.1/tcp/0') + gossipsub = nodeA.gs + await startNode(nodeA) + await startNode(gossipsub) + }) + after(async () => { + await stopNode(gossipsub) + await stopNode(nodeA) + }) + afterEach(() => { + sinon.restore() + }) + + describe('publish', () => { + it('should sign messages on publish', (done) => { + sinon.spy(nodeA.gs, '_publish') + + gossipsub.publish('signing-topic', Buffer.from('hello'), (err) => { + expect(err).to.not.exist() + + // Get the first message sent to _publish, and validate it + const signedMessage = gossipsub._publish.getCall(0).lastArg[0] + gossipsub.validate(signedMessage, (err, isValid) => { + expect(err).to.not.exist() + expect(isValid).to.eql(true) + done() + }) + }) + }) + }) + + describe('validate', () => { + it('should drop unsigned messages', () => { + sinon.spy(gossipsub, '_processRpcMessage') + sinon.spy(gossipsub, 'validate') + sinon.spy(gossipsub, 'log') + sinon.stub(gossipsub.peers, 'get').returns({}) + + const topic = 'my-topic' + const rpc = { + subscriptions: [], + msgs: [{ + from: gossipsub.peerId.id, + data: Buffer.from('an unsigned message'), + seqno: utils.randomSeqno(), + topicIDs: [topic] + }] + } + + gossipsub._onRpc('QmAnotherPeer', rpc) + + return new Promise(resolve => setTimeout(() => { + const dropLogs = gossipsub.log.getCalls().filter((call) => call.args[0].match(/dropping it/gi)) + expect(gossipsub.validate.callCount).to.eql(1) + expect(gossipsub._processRpcMessage.called).to.eql(false) + expect(dropLogs).to.have.length(1) + resolve() + }, 500)) + }) + + it('should not drop signed messages', async () => { + sinon.spy(gossipsub, '_processRpcMessage') + sinon.spy(gossipsub, 'validate') + sinon.spy(gossipsub, 'log') + sinon.stub(gossipsub.peers, 'get').returns({}) + + const topic = 'my-topic' + const signedMessage = await promisify(gossipsub._buildMessage, { + context: gossipsub + })({ + from: gossipsub.peerId.id, + data: Buffer.from('an unsigned message'), + seqno: utils.randomSeqno(), + topicIDs: [topic] + }) + + const rpc = { + subscriptions: [], + msgs: [signedMessage] + } + + gossipsub._onRpc('QmAnotherPeer', rpc) + + return new Promise(resolve => setTimeout(() => { + const dropLogs = gossipsub.log.getCalls().filter((call) => call.args[0].match(/dropping it/gi)) + expect(gossipsub.validate.callCount).to.eql(1) + expect(gossipsub._processRpcMessage.callCount).to.eql(1) + expect(dropLogs).to.be.empty() + resolve() + }, 500)) + }) + + it('should not drop unsigned messages if strict signing is disabled', () => { + sinon.spy(gossipsub, '_processRpcMessage') + sinon.spy(gossipsub, 'validate') + sinon.spy(gossipsub, 'log') + sinon.stub(gossipsub.peers, 'get').returns({}) + // Disable strict signing + sinon.stub(gossipsub, 'strictSigning').value(false) + + const topic = 'my-topic' + const rpc = { + subscriptions: [], + msgs: [{ + from: gossipsub.peerId.id, + data: Buffer.from('an unsigned message'), + seqno: utils.randomSeqno(), + topicIDs: [topic] + }] + } + + gossipsub._onRpc('QmAnotherPeer', rpc) + + return new Promise(resolve => setTimeout(() => { + const dropLogs = gossipsub.log.getCalls().filter((call) => call.args[0].match(/dropping it/gi)) + expect(gossipsub.validate.callCount).to.eql(1) + expect(gossipsub._processRpcMessage.callCount).to.eql(1) + expect(dropLogs).to.be.empty() + resolve() + }, 500)) + }) + }) +})