Skip to content

Commit

Permalink
chore: update deps and fix nohoist config (#3248)
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain authored Aug 27, 2020
1 parent 92671b0 commit 35c5395
Showing 1 changed file with 36 additions and 25 deletions.
61 changes: 36 additions & 25 deletions src/pubsub/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,25 @@ const toUrlSearchParams = require('../lib/to-url-search-params')

module.exports = configure((api, options) => {
const subsTracker = SubscriptionTracker.singleton()
const publish = require('./publish')(options)

return async (topic, handler, options = {}) => {
return async (topic, handler, options = {}) => { // eslint-disable-line require-await
options.signal = subsTracker.subscribe(topic, handler, options.signal)

let res
let done
let fail

const result = new Promise((resolve, reject) => {
done = resolve
fail = reject
})

// In Firefox, the initial call to fetch does not resolve until some data
// is received. If this doesn't happen within 1 second send an empty message
// to kickstart the process.
const ffWorkaround = setTimeout(async () => {
log(`Publishing empty message to "${topic}" to resolve subscription request`)
try {
await publish(topic, new Uint8Array(0), options)
} catch (err) {
log('Failed to publish empty message', err)
}
}, 1000)
// is received. If this doesn't happen within 1 second assume success
const ffWorkaround = setTimeout(() => done(), 1000)

try {
res = await api.post('pubsub/sub', {
// Do this async to not block Firefox
setTimeout(() => {
api.post('pubsub/sub', {
timeout: options.timeout,
signal: options.signal,
searchParams: toUrlSearchParams({
Expand All @@ -38,18 +36,31 @@ module.exports = configure((api, options) => {
}),
headers: options.headers
})
} catch (err) { // Initial subscribe fail, ensure we clean up
subsTracker.unsubscribe(topic, handler)
throw err
}
.catch((err) => {
// Initial subscribe fail, ensure we clean up
subsTracker.unsubscribe(topic, handler)

clearTimeout(ffWorkaround)
fail(err)
})
.then((response) => {
clearTimeout(ffWorkaround)

readMessages(res.ndjson(), {
onMessage: handler,
onEnd: () => subsTracker.unsubscribe(topic, handler),
onError: options.onError
})
if (!response) {
// if there was no response, the subscribe failed
return
}

readMessages(response.ndjson(), {
onMessage: handler,
onEnd: () => subsTracker.unsubscribe(topic, handler),
onError: options.onError
})

done()
})
}, 0)

return result
}
})

Expand Down

0 comments on commit 35c5395

Please sign in to comment.