diff --git a/src/pubsub.js b/src/pubsub.js index 96679fe55..f539d0e3a 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -81,8 +81,29 @@ module.exports = (arg) => { // Drop the request once we are actually done if (ps.listenerCount(topic) === 0) { - subscriptions[topic].abort() + if (!callback) { + return new Promise((resolve, reject) => { + // When the response stream has ended, resolve the promise + eos(subscriptions[topic].res, (err) => { + // FIXME: Artificial timeout needed to ensure unsubscribed + setTimeout(() => { + if (err) return reject(err) + resolve() + }) + }) + subscriptions[topic].req.abort() + subscriptions[topic] = null + }) + } + + // When the response stream has ended, call the callback + eos(subscriptions[topic].res, (err) => { + // FIXME: Artificial timeout needed to ensure unsubscribed + setTimeout(() => callback(err)) + }) + subscriptions[topic].req.abort() subscriptions[topic] = null + return } if (!callback) { @@ -154,13 +175,16 @@ module.exports = (arg) => { // Start the request and transform the response // stream to Pubsub messages stream - subscriptions[topic] = send.andTransform(request, PubsubMessageStream.from, (err, stream) => { + subscriptions[topic] = {} + subscriptions[topic].req = send.andTransform(request, PubsubMessageStream.from, (err, stream) => { if (err) { subscriptions[topic] = null ps.removeListener(topic, handler) return callback(err) } + subscriptions[topic].res = stream + stream.on('data', (msg) => { ps.emit(topic, msg) })