diff --git a/README.md b/README.md index bf68db9..3eaf52e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# interface-datastore +# interface-datastore [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) @@ -13,16 +13,52 @@ > Implementation of the [datastore](https://github.com/ipfs/go-datastore) interface in JavaScript -## Lead Maintainer +## Lead Maintainer [Alex Potsides](https://github.com/achingbrain) -## Table of Contents +## Table of Contents - [Implementations](#implementations) - [Install](#install) - [Usage](#usage) -- [Api](#api) + - [Wrapping Stores](#wrapping-stores) + - [Test suite](#test-suite) + - [Keys](#keys) +- [API](#api) + - [`has(key, [options])` -> `Promise`](#haskey-options---promiseboolean) + - [Arguments](#arguments) + - [Example](#example) + - [`put(key, value, [options])` -> `Promise`](#putkey-value-options---promise) + - [Arguments](#arguments-1) + - [Example](#example-1) + - [`putMany(source, [options])` -> `AsyncIterator<{ key: Key, value: Buffer }>`](#putmanysource-options---asynciterator-key-key-value-buffer-) + - [Arguments](#arguments-2) + - [Example](#example-2) + - [`get(key, [options])` -> `Promise`](#getkey-options---promisebuffer) + - [Arguments](#arguments-3) + - [Example](#example-3) + - [`getMany(source, [options])` -> `AsyncIterator`](#getmanysource-options---asynciteratorbuffer) + - [Arguments](#arguments-4) + - [Example](#example-4) + - [`delete(key, [options])` -> `Promise`](#deletekey-options---promise) + - [Arguments](#arguments-5) + - [Example](#example-5) + - [`deleteMany(source, [options])` -> `AsyncIterator`](#deletemanysource-options---asynciteratorkey) + - [Arguments](#arguments-6) + - [Example](#example-6) + - [`query(query, [options])` -> `AsyncIterable`](#queryquery-options---asynciterablebuffer) + - [Arguments](#arguments-7) + - [Example](#example-7) + - [`batch()`](#batch) + - [Example](#example-8) + - [`put(key, value)`](#putkey-value) + - [`delete(key)`](#deletekey) + - [`commit([options])` -> `AsyncIterator`](#commitoptions---asynciterator) + - [Arguments](#arguments-8) + - [Example](#example-9) + - [`open()` -> `Promise`](#open---promise) + - [`close()` -> `Promise`](#close---promise) - [Contribute](#contribute) - [License](#license) @@ -93,7 +129,26 @@ describe('mystore', () => { }) ``` -## API +### Aborting requests + +Most API methods accept an [AbortSignal][] as part of an options object. Implementations may listen for an `abort` event emitted by this object, or test the `signal.aborted` property. When received implementations should tear down any long-lived requests or resources created. + +### Concurrency + +The streaming `(put|get|delete)Many` methods are intended to be used with modules such as [it-parallel-batch](https://www.npmjs.com/package/it-parallel-batch) to allow calling code to control levels of parallelisation. The batching method ensures results are returned in the correct order, but interface implementations should be thread safe. + +```js +const batch = require('it-parallel-batch') +const source = [{ + key: .., + value: .. +}] + +// put values into the datastore concurrently, max 10 at a time +for await (const { key, data } of batch(store.putMany(source), 10)) { + console.info(`Put ${key}`) +} +``` ### Keys @@ -101,7 +156,7 @@ To allow a better abstraction on how to address values, there is a `Key` class w ```js const a = new Key('a') -const b = new Key(new Buffer('hello')) +const b = new Key(Buffer.from('hello')) ``` The key scheme is inspired by file systems and Google App Engine key model. Keys are meant to be unique across a system. They are typically hierarchical, incorporating more and more specific namespaces. Thus keys can be deemed 'children' or 'ancestors' of other keys: @@ -115,64 +170,177 @@ Also, every namespace can be parameterized to embed relevant object information. - `new Key('/Comedy/MontyPython/Sketch:CheeseShop')` - `new Key('/Comedy/MontyPython/Sketch:CheeseShop/Character:Mousebender')` +## API -### Methods +Implementations of this interface should make the following methods available: -> The exact types can be found in [`src/index.js`](src/index.js). +### `has(key, [options])` -> `Promise` -These methods will be present on every datastore. `Key` always means an instance of the above mentioned Key type. Every datastore is generic over the `Value` type, though currently all backing implementations are implemented only for [`Buffer`](https://nodejs.org/docs/latest/api/buffer.html). +Check for the existence of a given key -### `has(key)` -> `Promise` +#### Arguments -- `key: Key` +| Name | Type | Description | +| ---- | ---- | ----------- | +| key | [Key][] | The key to check the existance of | +| options | [Object][] | An options object, all properties are optional | +| options.signal | [AbortSignal][] | A way to signal that the caller is no longer interested in the outcome of this operation | -Check for the existence of a given key +#### Example ```js const exists = await store.has(new Key('awesome')) -console.log('is it there', exists) -``` -### `put(key, value)` -> `Promise` +if (exists) { + console.log('it is there') +} else { + console.log('it is not there') +} +``` -- `key: Key` -- `value: Value` +### `put(key, value, [options])` -> `Promise` Store a value with the given key. +#### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| key | [Key][] | The key to store the value under | +| value | [Buffer][] | Value to store | +| options | [Object][] | An options object, all properties are optional | +| options.signal | [AbortSignal][] | A way to signal that the caller is no longer interested in the outcome of this operation | + +#### Example + ```js -await store.put(new Key('awesome'), new Buffer('datastores')) +await store.put([{ key: new Key('awesome'), value: Buffer.from('datastores') }]) console.log('put content') ``` -### `get(key)` -> `Promise` +### `putMany(source, [options])` -> `AsyncIterator<{ key: Key, value: Buffer }>` + +Store many key-value pairs. + +#### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| source | [AsyncIterator][]<{ key: [Key][], value: [Buffer][] }> | The key to store the value under | +| value | [Buffer][] | Value to store | +| options | [Object][] | An options object, all properties are optional | +| options.signal | [AbortSignal][] | A way to signal that the caller is no longer interested in the outcome of this operation | + +#### Example + +```js +const source = [{ key: new Key('awesome'), value: Buffer.from('datastores') }] + +for await (const { key, value } of store.putMany(source)) { + console.info(`put content for key ${key}`) +} +``` + +### `get(key, [options])` -> `Promise` -- `key: Key` +#### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| key | [Key][] | The key retrieve the value for | +| options | [Object][] | An options object, all properties are optional | +| options.signal | [AbortSignal][] | A way to signal that the caller is no longer interested in the outcome of this operation | + +#### Example Retrieve the value stored under the given key. ```js const value = await store.get(new Key('awesome')) -console.log('got content: %s', value.toString()) +console.log('got content: %s', value.toString('utf8')) // => got content: datastore ``` -### `delete(key)` -> `Promise` +### `getMany(source, [options])` -> `AsyncIterator` + +#### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| source | [AsyncIterator][]<[Key][]> | One or more keys to retrieve values for | +| options | [Object][] | An options object, all properties are optional | +| options.signal | [AbortSignal][] | A way to signal that the caller is no longer interested in the outcome of this operation | + +#### Example -- `key: Key` +Retrieve a stream of values stored under the given keys. + +```js +for await (const value of store.getMany([new Key('awesome')])) { + console.log('got content: %s', value.toString('utf8')) + // => got content: datastore +} +``` + +### `delete(key, [options])` -> `Promise` Delete the content stored under the given key. +#### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| key | [Key][] | The key to remove the value for | +| options | [Object][] | An options object, all properties are optional | +| options.signal | [AbortSignal][] | A way to signal that the caller is no longer interested in the outcome of this operation | + +#### Example + ```js await store.delete(new Key('awesome')) console.log('deleted awesome content :(') ``` -### `query(query)` -> `Iterable` +### `deleteMany(source, [options])` -> `AsyncIterator` + +Delete the content stored under the given keys. + +#### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| source | [AsyncIterator][]<[Key][]> | One or more keys to remove values for | +| options | [Object][] | An options object, all properties are optional | +| options.signal | [AbortSignal][] | A way to signal that the caller is no longer interested in the outcome of this operation | + +#### Example + +```js +const source = [new Key('awesome')] + +for await (const key of store.deleteMany(source)) { + console.log(`deleted content with key ${key}`) +} +``` + +### `query(query, [options])` -> `AsyncIterable` -- `query: Query` see below for possible values +Search the store for some values. Returns an [AsyncIterable][] with each item being a [Buffer][]. -Search the store for some values. Returns an [Iterable](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols) with each item being a `Value`. +#### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| query | [Object][] | A query object, all properties are optional | +| query.prefix | [String][] | Only return values where the key starts with this prefix | +| query.filters | [Array][]<[Function][]([Buffer][]) -> [Boolean][]> | Filter the results according to the these functions | +| query.orders | [Array][]<[Function][]([Array][]<[Buffer][]>) -> [Array][]<[Buffer][]>> | Order the results according to these functions | +| query.limit | [Number][] | Only return this many records | +| query.offset | [Number][] | Skip this many records at the beginning | +| options | [Object][] | An options object, all properties are optional | +| options.signal | [AbortSignal][] | A way to signal that the caller is no longer interested in the outcome of this operation | + +#### Example ```js // retrieve __all__ values from the store @@ -183,26 +351,17 @@ for await (const value of store.query({})) { console.log('ALL THE VALUES', list) ``` -#### `Query` - -Object in the form with the following optional properties - -- `prefix: string` (optional) - only return values where the key starts with this prefix -- `filters: Array>` (optional) - filter the results according to the these functions -- `orders: Array>` (optional) - order the results according to these functions -- `limit: Number` (optional) - only return this many records -- `offset: Number` (optional) - skip this many records at the beginning -- `keysOnly: Boolean` (optional) - Only return keys, no values. - ### `batch()` This will return an object with which you can chain multiple operations together, with them only being executed on calling `commit`. +#### Example + ```js const b = store.batch() for (let i = 0; i < 100; i++) { - b.put(new Key(`hello${i}`), new Buffer(`hello world ${i}`)) + b.put(new Key(`hello${i}`), Buffer.from(`hello world ${i}`)) } await b.commit() @@ -211,21 +370,49 @@ console.log('put 100 values') #### `put(key, value)` -- `key: Key` -- `value: Value` - Queue a put operation to the store. -#### `delete(key)` +| Name | Type | Description | +| ---- | ---- | ----------- | +| key | [Key][] | The key to store the value under | +| value | [Buffer][] | Value to store | -- `key: Key` +#### `delete(key)` Queue a delete operation to the store. -#### `commit()` -> `Promise` +| Name | Type | Description | +| ---- | ---- | ----------- | +| key | [Key][] | The key to remove the value for | + +#### `commit([options])` -> `AsyncIterator` Write all queued operations to the underyling store. The batch object should not be used after calling this. +#### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| options | [Object][] | An options object, all properties are optional | +| options.signal | [AbortSignal][] | A way to signal that the caller is no longer interested in the outcome of this operation | + +#### Example + +```js +const batch = store.batch() + +batch.put(new Key('to-put'), Buffer.from('hello world')) +batch.del(new Key('to-remove')) + +for await (const res of batch.commit()) { + if (res.key) { + console.info('put', res.key) + } else { + console.info('del', res) + } +} +``` + ### `open()` -> `Promise` Opens the datastore, this is only needed if the store was closed before, otherwise this is taken care of by the constructor. @@ -243,3 +430,16 @@ Small note: If editing the Readme, please conform to the [standard-readme](https ## License MIT 2017 © IPFS + + +[Key]: #Keys +[Object]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object +[Buffer]: https://nodejs.org/api/buffer.html +[AbortSignal]: https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal +[AsyncIterator]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/asyncIterator +[AsyncIterable]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols +[String]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/String +[Array]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array +[Function]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Function +[Number]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number +[Boolean]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Boolean \ No newline at end of file diff --git a/package.json b/package.json index a2bd5a0..76528cf 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,9 @@ "devDependencies": { "aegir": "^21.4.5", "chai": "^4.1.2", - "dirty-chai": "^2.0.1" + "dirty-chai": "^2.0.1", + "it-all": "^1.0.2", + "it-drain": "^1.0.1" }, "dependencies": { "buffer": "^5.5.0", diff --git a/src/errors.js b/src/errors.js index 14ca3ef..7e4941d 100644 --- a/src/errors.js +++ b/src/errors.js @@ -21,3 +21,8 @@ module.exports.notFoundError = (err) => { err = err || new Error('Not Found') return errcode(err, 'ERR_NOT_FOUND') } + +module.exports.abortedError = (err) => { + err = err || new Error('Aborted') + return errcode(err, 'ERR_ABORTED') +} diff --git a/src/key.js b/src/key.js index cffa374..3513ac2 100644 --- a/src/key.js +++ b/src/key.js @@ -68,7 +68,7 @@ class Key { * @returns {String} */ get [Symbol.toStringTag] () { - return `[Key ${this.toString()}]` + return `Key(${this.toString()})` } /** diff --git a/src/memory.js b/src/memory.js index 7cd4705..d574d56 100644 --- a/src/memory.js +++ b/src/memory.js @@ -6,6 +6,12 @@ const Key = require('./key') // Errors const Errors = require('./errors') +function throwIfAborted (signal) { + if (signal && signal.aborted) { + throw Error.abortedError() + } +} + class MemoryDatastore { constructor () { this.data = {} @@ -17,12 +23,31 @@ class MemoryDatastore { this.data[key.toString()] = val } + async * putMany (source, options = {}) { + throwIfAborted(options.signal) + + for await (const { key, value } of source) { + throwIfAborted(options.signal) + await this.put(key, value) + yield { key, value } + } + } + async get (key) { const exists = await this.has(key) if (!exists) throw Errors.notFoundError() return this.data[key.toString()] } + async * getMany (source, options = {}) { + throwIfAborted(options.signal) + + for await (const key of source) { + throwIfAborted(options.signal) + yield this.get(key) + } + } + async has (key) { // eslint-disable-line require-await return this.data[key.toString()] !== undefined } @@ -31,26 +56,33 @@ class MemoryDatastore { delete this.data[key.toString()] } + async * deleteMany (source, options = {}) { + throwIfAborted(options.signal) + + for await (const key of source) { + throwIfAborted(options.signal) + await this.delete(key) + yield key + } + } + batch () { let puts = [] let dels = [] + const self = this + return { put (key, value) { - puts.push([key, value]) + puts.push({ key, value }) }, delete (key) { dels.push(key) }, - commit: async () => { // eslint-disable-line require-await - puts.forEach(v => { - this.data[v[0].toString()] = v[1] - }) + async * commit (options) { // eslint-disable-line require-await + yield * self.putMany(puts, options) puts = [] - - dels.forEach(key => { - delete this.data[key.toString()] - }) + yield * self.deleteMany(dels, options) dels = [] } } diff --git a/src/tests.js b/src/tests.js index 5087533..156891c 100644 --- a/src/tests.js +++ b/src/tests.js @@ -7,6 +7,8 @@ const randomBytes = require('iso-random-stream/src/random') const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect +const all = require('it-all') +const drain = require('it-drain') const Key = require('../src').Key @@ -34,15 +36,43 @@ module.exports = (test) => { it('parallel', async () => { const data = [] for (let i = 0; i < 100; i++) { - data.push([new Key(`/z/key${i}`), Buffer.from(`data${i}`)]) + data.push({ key: new Key(`/z/key${i}`), value: Buffer.from(`data${i}`) }) } - await Promise.all(data.map(d => store.put(d[0], d[1]))) - const res = await Promise.all(data.map(d => store.get(d[0]))) + await Promise.all(data.map(d => store.put(d.key, d.value))) - res.forEach((res, i) => { - expect(res).to.be.eql(data[i][1]) - }) + const res = await all(store.getMany(data.map(d => d.key))) + expect(res).to.deep.equal(data.map(d => d.value)) + }) + }) + + describe('putMany', () => { + let store + + beforeEach(async () => { + store = await test.setup() + if (!store) throw new Error('missing store') + }) + + afterEach(() => cleanup(store)) + + it('streaming', async () => { + const data = [] + for (let i = 0; i < 100; i++) { + data.push({ key: new Key(`/z/key${i}`), value: Buffer.from(`data${i}`) }) + } + + let index = 0 + + for await (const { key, value } of store.putMany(data)) { + expect(data[index]).to.deep.equal({ key, value }) + index++ + } + + expect(index).to.equal(data.length) + + const res = await all(store.getMany(data.map(d => d.key))) + expect(res).to.deep.equal(data.map(d => d.value)) }) }) @@ -77,6 +107,40 @@ module.exports = (test) => { }) }) + describe('getMany', () => { + let store + + beforeEach(async () => { + store = await test.setup() + if (!store) throw new Error('missing store') + }) + + afterEach(() => cleanup(store)) + + it('streaming', async () => { + const k = new Key('/z/one') + await store.put(k, Buffer.from('hello')) + const source = [k] + + const res = await all(store.getMany(source)) + expect(res).to.have.lengthOf(1) + expect(res[0]).to.be.eql(Buffer.from('hello')) + }) + + it('should throw error for missing key', async () => { + const k = new Key('/does/not/exist') + + try { + await drain(store.getMany([k])) + } catch (err) { + expect(err).to.have.property('code', 'ERR_NOT_FOUND') + return + } + + throw new Error('expected error to be thrown') + }) + }) + describe('delete', () => { let store @@ -105,12 +169,47 @@ module.exports = (test) => { await Promise.all(data.map(d => store.put(d[0], d[1]))) const res0 = await Promise.all(data.map(d => store.has(d[0]))) - res0.forEach((res, i) => expect(res).to.be.eql(true)) + res0.forEach(res => expect(res).to.be.eql(true)) await Promise.all(data.map(d => store.delete(d[0]))) const res1 = await Promise.all(data.map(d => store.has(d[0]))) - res1.forEach((res, i) => expect(res).to.be.eql(false)) + res1.forEach(res => expect(res).to.be.eql(false)) + }) + }) + + describe('deleteMany', () => { + let store + + beforeEach(async () => { + store = await test.setup() + if (!store) throw new Error('missing store') + }) + + afterEach(() => cleanup(store)) + + it('streaming', async () => { + const data = [] + for (let i = 0; i < 100; i++) { + data.push({ key: new Key(`/a/key${i}`), value: Buffer.from(`data${i}`) }) + } + + await drain(store.putMany(data)) + + const res0 = await Promise.all(data.map(d => store.has(d.key))) + res0.forEach(res => expect(res).to.be.eql(true)) + + let index = 0 + + for await (const key of store.deleteMany(data.map(d => d.key))) { + expect(data[index].key).to.deep.equal(key) + index++ + } + + expect(index).to.equal(data.length) + + const res1 = await Promise.all(data.map(d => store.has(d.key))) + res1.forEach(res => expect(res).to.be.eql(false)) }) }) @@ -133,7 +232,7 @@ module.exports = (test) => { b.put(new Key('/q/two'), Buffer.from('2')) b.put(new Key('/q/three'), Buffer.from('3')) b.delete(new Key('/z/old')) - await b.commit() + await drain(b.commit()) const keys = ['/a/one', '/q/two', '/q/three', '/z/old'] const res = await Promise.all(keys.map(k => store.has(new Key(k)))) @@ -151,7 +250,7 @@ module.exports = (test) => { b.put(new Key(`/z/hello${i}`), randomBytes(128)) } - await b.commit() + await drain(b.commit()) const total = async iterable => { let count = 0 @@ -217,18 +316,16 @@ module.exports = (test) => { b.put(world.key, world.value) b.put(hello2.key, hello2.value) - return b.commit() + return drain(b.commit()) }) after(() => cleanup(store)) - tests.forEach(t => it(t[0], async () => { - let res = [] - for await (const value of store.query(t[1])) res.push(value) + tests.forEach(([name, query, expected]) => it(name, async () => { + let res = await all(store.query(query)) - const expected = t[2] if (Array.isArray(expected)) { - if (t[1].orders == null) { + if (query.orders == null) { expect(res).to.have.length(expected.length) const s = (a, b) => { if (a.key.toString() < b.key.toString()) { @@ -250,7 +347,7 @@ module.exports = (test) => { } }) } else { - expect(res).to.be.eql(t[2]) + expect(res).to.be.eql(expected) } } else if (typeof expected === 'number') { expect(res).to.have.length(expected)