diff --git a/src/ping-pull-stream.js b/src/ping-pull-stream.js index 9c18140e2..a7871faa3 100644 --- a/src/ping-pull-stream.js +++ b/src/ping-pull-stream.js @@ -2,7 +2,9 @@ const toPull = require('stream-to-pull-stream') const deferred = require('pull-defer') +const pump = require('pump') const moduleConfig = require('./utils/module-config') +const PingMessageStream = require('./utils/ping-message-stream') module.exports = (arg) => { const send = moduleConfig(arg) @@ -18,10 +20,13 @@ module.exports = (arg) => { qs: opts } const p = deferred.source() + const response = new PingMessageStream() send(request, (err, stream) => { if (err) { return p.abort(err) } - p.resolve(toPull.source(stream)) + + pump(stream, response) + p.resolve(toPull.source(response)) }) return p diff --git a/src/ping-readable-stream.js b/src/ping-readable-stream.js index 6281a44de..6d2a0e606 100644 --- a/src/ping-readable-stream.js +++ b/src/ping-readable-stream.js @@ -1,8 +1,8 @@ 'use strict' -const Stream = require('readable-stream') const pump = require('pump') const moduleConfig = require('./utils/module-config') +const PingMessageStream = require('./utils/ping-message-stream') module.exports = (arg) => { const send = moduleConfig(arg) @@ -17,17 +17,15 @@ module.exports = (arg) => { args: id, qs: opts } - // ndjson streams objects - const pt = new Stream.PassThrough({ - objectMode: true - }) + + const response = new PingMessageStream() send(request, (err, stream) => { - if (err) { return pt.destroy(err) } + if (err) { return response.destroy(err) } - pump(stream, pt) + pump(stream, response) }) - return pt + return response } } diff --git a/src/ping.js b/src/ping.js index 2682e9752..7372b5761 100644 --- a/src/ping.js +++ b/src/ping.js @@ -1,8 +1,10 @@ 'use strict' const promisify = require('promisify-es6') +const pump = require('pump') +const concat = require('concat-stream') const moduleConfig = require('./utils/module-config') -const streamToValue = require('./utils/stream-to-value') +const PingMessageStream = require('./utils/ping-message-stream') module.exports = (arg) => { const send = moduleConfig(arg) @@ -30,14 +32,16 @@ module.exports = (arg) => { // Transform the response stream to a value: // [{ Success: , Time: , Text: }] - const transform = (res, callback) => { - streamToValue(res, (err, res) => { - if (err) { - return callback(err) + const transform = (stream, callback) => { + const messageConverter = new PingMessageStream() + pump( + stream, + messageConverter, + concat({encoding: 'object'}, (data) => callback(null, data)), + (err) => { + if (err) callback(err) } - - callback(null, res) - }) + ) } send.andTransform(request, transform, callback) diff --git a/src/utils/ping-message-converter.js b/src/utils/ping-message-converter.js new file mode 100644 index 000000000..79d9fc3be --- /dev/null +++ b/src/utils/ping-message-converter.js @@ -0,0 +1,23 @@ +'use strict' + +// Converts IPFS API ping messages to lowercase +// +// { +// Success: true, +// Text: 'foobar', +// Time: 0 +// } +// + +module.exports = function pingMessageConverter (obj) { + if (!isPingMessage(obj)) throw new Error('Invalid ping message received') + return { + success: obj.Success, + time: obj.Time, + text: obj.Text + } +} + +function isPingMessage (obj) { + return obj && typeof obj.Success === 'boolean' +} diff --git a/src/utils/ping-message-stream.js b/src/utils/ping-message-stream.js new file mode 100644 index 000000000..96516fe9b --- /dev/null +++ b/src/utils/ping-message-stream.js @@ -0,0 +1,23 @@ +'use strict' + +const TransformStream = require('readable-stream').Transform +const pingMessageConverter = require('./ping-message-converter') + +class PingMessageStream extends TransformStream { + constructor (options) { + const opts = Object.assign(options || {}, { objectMode: true }) + super(opts) + } + + _transform (obj, enc, callback) { + try { + const msg = pingMessageConverter(obj) + this.push(msg) + } catch (err) { + return callback(err) + } + callback() + } +} + +module.exports = PingMessageStream diff --git a/test/ping.spec.js b/test/ping.spec.js index 49e96a8b0..e0ec03dc5 100644 --- a/test/ping.spec.js +++ b/test/ping.spec.js @@ -12,6 +12,7 @@ const parallel = require('async/parallel') const series = require('async/series') const IPFSApi = require('../src') +const PingMessageStream = require('../src/utils/ping-message-stream') const f = require('./utils/factory') describe('.ping', function () { @@ -77,10 +78,10 @@ describe('.ping', function () { expect(res).to.be.an('array') expect(res).to.have.lengthOf(3) res.forEach(packet => { - expect(packet).to.have.keys('Success', 'Time', 'Text') - expect(packet.Time).to.be.a('number') + expect(packet).to.have.keys('success', 'time', 'text') + expect(packet.time).to.be.a('number') }) - const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + const resultMsg = res.find(packet => packet.text.includes('Average latency')) expect(resultMsg).to.exist() done() }) @@ -92,10 +93,10 @@ describe('.ping', function () { expect(res).to.be.an('array') expect(res).to.have.lengthOf(4) res.forEach(packet => { - expect(packet).to.have.keys('Success', 'Time', 'Text') - expect(packet.Time).to.be.a('number') + expect(packet).to.have.keys('success', 'time', 'text') + expect(packet.time).to.be.a('number') }) - const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + const resultMsg = res.find(packet => packet.text.includes('Average latency')) expect(resultMsg).to.exist() done() }) @@ -107,10 +108,10 @@ describe('.ping', function () { expect(res).to.be.an('array') expect(res).to.have.lengthOf(4) res.forEach(packet => { - expect(packet).to.have.keys('Success', 'Time', 'Text') - expect(packet.Time).to.be.a('number') + expect(packet).to.have.keys('success', 'time', 'text') + expect(packet.time).to.be.a('number') }) - const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + const resultMsg = res.find(packet => packet.text.includes('Average latency')) expect(resultMsg).to.exist() done() }) @@ -131,10 +132,10 @@ describe('.ping', function () { expect(res).to.be.an('array') expect(res).to.have.lengthOf(3) res.forEach(packet => { - expect(packet).to.have.keys('Success', 'Time', 'Text') - expect(packet.Time).to.be.a('number') + expect(packet).to.have.keys('success', 'time', 'text') + expect(packet.time).to.be.a('number') }) - const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + const resultMsg = res.find(packet => packet.text.includes('Average latency')) expect(resultMsg).to.exist() }) }) @@ -147,10 +148,10 @@ describe('.ping', function () { expect(data).to.be.an('array') expect(data).to.have.lengthOf(3) data.forEach(packet => { - expect(packet).to.have.keys('Success', 'Time', 'Text') - expect(packet.Time).to.be.a('number') + expect(packet).to.have.keys('success', 'time', 'text') + expect(packet.time).to.be.a('number') }) - const resultMsg = data.find(packet => packet.Text.includes('Average latency')) + const resultMsg = data.find(packet => packet.text.includes('Average latency')) expect(resultMsg).to.exist() done() }) @@ -162,7 +163,7 @@ describe('.ping', function () { ipfs.pingReadableStream(otherId) .on('data', data => { expect(data).to.be.an('object') - expect(data).to.have.keys('Success', 'Time', 'Text') + expect(data).to.have.keys('success', 'time', 'text') packetNum++ }) .on('error', err => { @@ -173,4 +174,11 @@ describe('.ping', function () { done() }) }) + + it('message conversion fails if invalid message is received', () => { + const messageConverter = new PingMessageStream() + expect(() => { + messageConverter.write({some: 'InvalidMessage'}) + }).to.throw('Invalid ping message received') + }) })