Skip to content

Commit

Permalink
feat: cancellable api calls (#2993)
Browse files Browse the repository at this point in the history
Passes a [AbortSignal](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal)
to every API call which emits a `abort` event when the caller is no longer
interested in the result of the operation.

Lower level code that creates resources or has other long-term effects should tear
down those resources early if the `abort` event is received.

Adds support for `timeout` options to every API call that will emit an `abort` event
on the passed signal and throw a Timeout error.

Finally `abort` events are triggered if the current request arrived via the HTTP API
and the request was aborted from the client - that is, a `disconnect` event is fired by Hapi.

- Updates the core-api docs to add these new options.
- Refactors HTTP API to replace custom args parsing with Joi
- Tests all HTTP API endpoints
- Adds pin support to `ipfs.block.put`- fixes #3015
  • Loading branch information
achingbrain authored May 14, 2020
1 parent 6236bb6 commit 41bdf44
Show file tree
Hide file tree
Showing 17 changed files with 102 additions and 42 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ Both the Current and Active LTS versions of Node.js are supported. Please see [n
All core API methods take _additional_ `options` specific to the HTTP API:

* `headers` - An object or [Headers](https://developer.mozilla.org/en-US/docs/Web/API/Headers) instance that can be used to set custom HTTP headers. Note that this option can also be [configured globally](#custom-headers) via the constructor options.
* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) that can be used to abort the request on demand.
* `timeout` - A number or string specifying a timeout for the request. If the timeout is reached before data is received a [`TimeoutError`](https://github.com/sindresorhus/ky/blob/2f37c3f999efb36db9108893b8b3d4b3a7f5ec45/index.js#L127-L132) is thrown. If a number is specified it is interpreted as milliseconds, if a string is passed, it is intepreted according to [`parse-duration`](https://www.npmjs.com/package/parse-duration). Note that this option can also be [configured globally](#global-timeouts) via the constructor options.
* `searchParams` - An object or [`URLSearchParams`](https://developer.mozilla.org/en-US/docs/Web/API/URLSearchParams) instance that can be used to add additional query parameters to the query string sent with each request.

### Instance Utils
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
},
"dependencies": {
"abort-controller": "^3.0.0",
"any-signal": "^1.1.0",
"bignumber.js": "^9.0.0",
"buffer": "^5.6.0",
"cids": "^0.8.0",
Expand Down Expand Up @@ -67,11 +68,11 @@
"stream-to-it": "^0.2.0"
},
"devDependencies": {
"aegir": "^21.10.1",
"aegir": "^22.0.0",
"cross-env": "^7.0.0",
"go-ipfs-dep": "0.4.23-3",
"interface-ipfs-core": "^0.134.3",
"ipfsd-ctl": "^4.0.1",
"ipfsd-ctl": "^4.1.1",
"it-all": "^1.0.1",
"it-concat": "^1.0.0",
"it-pipe": "^1.1.0",
Expand Down
10 changes: 8 additions & 2 deletions src/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,27 @@ const toCamel = require('./lib/object-to-camel')
const configure = require('./lib/configure')
const multipartRequest = require('./lib/multipart-request')
const toUrlSearchParams = require('./lib/to-url-search-params')
const anySignal = require('any-signal')
const AbortController = require('abort-controller')

module.exports = configure((api) => {
return async function * add (input, options = {}) {
const progressFn = options.progress

// allow aborting requests on body errors
const controller = new AbortController()
const signal = anySignal([controller.signal, options.signal])

const res = await api.post('add', {
searchParams: toUrlSearchParams({
'stream-channels': true,
...options,
progress: Boolean(progressFn)
}),
timeout: options.timeout,
signal: options.signal,
signal,
...(
await multipartRequest(input, options.headers)
await multipartRequest(input, controller, options.headers)
)
})

Expand Down
5 changes: 5 additions & 0 deletions src/bitswap/wantlist.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ const toUrlSearchParams = require('../lib/to-url-search-params')

module.exports = configure(api => {
return async (peer, options = {}) => {
if (peer && (peer.timeout || peer.signal)) {
options = peer
peer = undefined
}

if (peer) {
options.peer = typeof peer === 'string' ? peer : new CID(peer).toString()
}
Expand Down
10 changes: 8 additions & 2 deletions src/block/put.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const multihash = require('multihashes')
const multipartRequest = require('../lib/multipart-request')
const configure = require('../lib/configure')
const toUrlSearchParams = require('../lib/to-url-search-params')
const anySignal = require('any-signal')
const AbortController = require('abort-controller')

module.exports = configure(api => {
async function put (data, options = {}) {
Expand All @@ -32,14 +34,18 @@ module.exports = configure(api => {
delete options.cid
}

// allow aborting requests on body errors
const controller = new AbortController()
const signal = anySignal([controller.signal, options.signal])

let res
try {
const response = await api.post('block/put', {
timeout: options.timeout,
signal: options.signal,
signal: signal,
searchParams: toUrlSearchParams(options),
...(
await multipartRequest(data, options.headers)
await multipartRequest(data, controller, options.headers)
)
})
res = await response.json()
Expand Down
10 changes: 8 additions & 2 deletions src/config/replace.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@ const { Buffer } = require('buffer')
const multipartRequest = require('../lib/multipart-request')
const configure = require('../lib/configure')
const toUrlSearchParams = require('../lib/to-url-search-params')
const anySignal = require('any-signal')
const AbortController = require('abort-controller')

module.exports = configure(api => {
return async (config, options = {}) => {
// allow aborting requests on body errors
const controller = new AbortController()
const signal = anySignal([controller.signal, options.signal])

const res = await api.post('config/replace', {
timeout: options.timeout,
signal: options.signal,
signal,
searchParams: toUrlSearchParams(options),
...(
await multipartRequest(Buffer.from(JSON.stringify(config)), options.headers)
await multipartRequest(Buffer.from(JSON.stringify(config)), controller, options.headers)
)
})

Expand Down
10 changes: 8 additions & 2 deletions src/dag/put.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const multihash = require('multihashes')
const configure = require('../lib/configure')
const multipartRequest = require('../lib/multipart-request')
const toUrlSearchParams = require('../lib/to-url-search-params')
const anySignal = require('any-signal')
const AbortController = require('abort-controller')

module.exports = configure(api => {
return async (dagNode, options = {}) => {
Expand Down Expand Up @@ -43,12 +45,16 @@ module.exports = configure(api => {
serialized = dagNode
}

// allow aborting requests on body errors
const controller = new AbortController()
const signal = anySignal([controller.signal, options.signal])

const res = await api.post('dag/put', {
timeout: options.timeout,
signal: options.signal,
signal,
searchParams: toUrlSearchParams(options),
...(
await multipartRequest(serialized, options.headers)
await multipartRequest(serialized, controller, options.headers)
)
})
const data = await res.json()
Expand Down
2 changes: 1 addition & 1 deletion src/dag/resolve.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const toUrlSearchParams = require('../lib/to-url-search-params')

module.exports = configure(api => {
return async (cid, path, options = {}) => {
if (typeof path === 'object') {
if (path && typeof path === 'object') {
options = path
path = null
}
Expand Down
10 changes: 8 additions & 2 deletions src/files/write.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ const mtimeToObject = require('../lib/mtime-to-object')
const configure = require('../lib/configure')
const multipartRequest = require('../lib/multipart-request')
const toUrlSearchParams = require('../lib/to-url-search-params')
const anySignal = require('any-signal')
const AbortController = require('abort-controller')

module.exports = configure(api => {
return async (path, input, options = {}) => {
// allow aborting requests on body errors
const controller = new AbortController()
const signal = anySignal([controller.signal, options.signal])

const res = await api.post('files/write', {
timeout: options.timeout,
signal: options.signal,
signal,
searchParams: toUrlSearchParams({
arg: path,
streamChannels: true,
Expand All @@ -23,7 +29,7 @@ module.exports = configure(api => {
path: 'arg',
mode: modeToString(options.mode),
mtime: mtimeToObject(options.mtime)
}, options.headers)
}, controller, options.headers)
)
})

Expand Down
5 changes: 4 additions & 1 deletion src/lib/multipart-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const modeToString = require('../lib/mode-to-string')
const mtimeToObject = require('../lib/mtime-to-object')
const merge = require('merge-options').bind({ ignoreUndefined: true })

async function multipartRequest (source, headers = {}, boundary = `-----------------------------${nanoid()}`) {
async function multipartRequest (source, abortController, headers = {}, boundary = `-----------------------------${nanoid()}`) {
async function * streamFiles (source) {
try {
let index = 0
Expand Down Expand Up @@ -50,6 +50,9 @@ async function multipartRequest (source, headers = {}, boundary = `-------------

index++
}
} catch (err) {
// workaround for https://github.com/node-fetch/node-fetch/issues/753
abortController.abort(err)
} finally {
yield `\r\n--${boundary}--\r\n`
}
Expand Down
2 changes: 1 addition & 1 deletion src/object/patch/add-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module.exports = configure(api => {
searchParams: toUrlSearchParams({
arg: [
`${Buffer.isBuffer(cid) ? new CID(cid) : cid}`,
dLink.Name || dLink.name || null,
dLink.Name || dLink.name || '',
(dLink.Hash || dLink.cid || '').toString() || null
],
...options
Expand Down
10 changes: 8 additions & 2 deletions src/object/patch/append-data.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,24 @@ const CID = require('cids')
const multipartRequest = require('../../lib/multipart-request')
const configure = require('../../lib/configure')
const toUrlSearchParams = require('../../lib/to-url-search-params')
const anySignal = require('any-signal')
const AbortController = require('abort-controller')

module.exports = configure(api => {
return async (cid, data, options = {}) => {
// allow aborting requests on body errors
const controller = new AbortController()
const signal = anySignal([controller.signal, options.signal])

const res = await api.post('object/patch/append-data', {
timeout: options.timeout,
signal: options.signal,
signal,
searchParams: toUrlSearchParams({
arg: `${Buffer.isBuffer(cid) ? new CID(cid) : cid}`,
...options
}),
...(
await multipartRequest(data, options.headers)
await multipartRequest(data, controller, options.headers)
)
})

Expand Down
10 changes: 8 additions & 2 deletions src/object/patch/set-data.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@ const CID = require('cids')
const multipartRequest = require('../../lib/multipart-request')
const configure = require('../../lib/configure')
const toUrlSearchParams = require('../../lib/to-url-search-params')
const anySignal = require('any-signal')
const AbortController = require('abort-controller')

module.exports = configure(api => {
return async (cid, data, options = {}) => {
// allow aborting requests on body errors
const controller = new AbortController()
const signal = anySignal([controller.signal, options.signal])

const { Hash } = await (await api.post('object/patch/set-data', {
timeout: options.timeout,
signal: options.signal,
signal,
searchParams: toUrlSearchParams({
arg: [
`${Buffer.isBuffer(cid) ? new CID(cid) : cid}`
],
...options
}),
...(
await multipartRequest(data, options.headers)
await multipartRequest(data, controller, options.headers)
)
})).json()

Expand Down
11 changes: 9 additions & 2 deletions src/object/put.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const { Buffer } = require('buffer')
const multipartRequest = require('../lib/multipart-request')
const configure = require('../lib/configure')
const toUrlSearchParams = require('../lib/to-url-search-params')
const anySignal = require('any-signal')
const AbortController = require('abort-controller')

module.exports = configure(api => {
return async (obj, options = {}) => {
Expand Down Expand Up @@ -41,15 +43,20 @@ module.exports = configure(api => {
if (Buffer.isBuffer(obj) && options.enc) {
buf = obj
} else {
options.enc = 'json'
buf = Buffer.from(JSON.stringify(tmpObj))
}

// allow aborting requests on body errors
const controller = new AbortController()
const signal = anySignal([controller.signal, options.signal])

const res = await api.post('object/put', {
timeout: options.timeout,
signal: options.signal,
signal,
searchParams: toUrlSearchParams(options),
...(
await multipartRequest(buf, options.headers)
await multipartRequest(buf, controller, options.headers)
)
})

Expand Down
28 changes: 10 additions & 18 deletions src/object/stat.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,16 @@ const toUrlSearchParams = require('../lib/to-url-search-params')

module.exports = configure(api => {
return async (cid, options = {}) => {
let res
try {
res = await (await api.post('object/stat', {
timeout: options.timeout,
signal: options.signal,
searchParams: toUrlSearchParams({
arg: `${Buffer.isBuffer(cid) ? new CID(cid) : cid}`,
...options
}),
headers: options.headers
})).json()
} catch (err) {
if (err.name === 'TimeoutError') {
err.message = `failed to get block for ${Buffer.isBuffer(cid) ? new CID(cid) : cid}: context deadline exceeded`
}
throw err
}
const res = await api.post('object/stat', {
timeout: options.timeout,
signal: options.signal,
searchParams: toUrlSearchParams({
arg: `${Buffer.isBuffer(cid) ? new CID(cid) : cid}`,
...options
}),
headers: options.headers
})

return res
return res.json()
}
})
2 changes: 1 addition & 1 deletion src/pin/ls.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const toUrlSearchParams = require('../lib/to-url-search-params')

module.exports = configure(api => {
return async function * ls (path, options = {}) {
if (path && path.type) {
if (path && (path.type || path.timeout)) {
options = path || {}
path = []
}
Expand Down
12 changes: 12 additions & 0 deletions test/interface.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ describe('interface-ipfs-core tests', () => {
name: 'replace',
reason: 'FIXME Waiting for fix on go-ipfs https://github.com/ipfs/js-ipfs-http-client/pull/307#discussion_r69281789 and https://github.com/ipfs/go-ipfs/issues/2927'
},
{
name: 'should respect timeout option when listing config profiles',
reason: 'TODO: Not implemented in go-ipfs'
},
{
name: 'should list config profiles',
reason: 'TODO: Not implemented in go-ipfs'
Expand Down Expand Up @@ -159,6 +163,10 @@ describe('interface-ipfs-core tests', () => {
name: 'should ls from outside of mfs',
reason: 'TODO not implemented in go-ipfs yet'
},
{
name: 'should respect timeout option when changing the mode of a file',
reason: 'TODO not implemented in go-ipfs yet'
},
{
name: 'should update the mode for a file',
reason: 'TODO not implemented in go-ipfs yet'
Expand Down Expand Up @@ -251,6 +259,10 @@ describe('interface-ipfs-core tests', () => {
name: 'should respect metadata when copying from outside of mfs',
reason: 'TODO not implemented in go-ipfs yet'
},
{
name: 'should respect timeout option when updating the modification time of files',
reason: 'TODO not implemented in go-ipfs yet'
},
{
name: 'should have default mtime',
reason: 'TODO not implemented in go-ipfs yet'
Expand Down

0 comments on commit 41bdf44

Please sign in to comment.