diff --git a/README.md b/README.md index 6cdde97b..cf4e25a2 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ - [`packages/it-parallel-batch`](https://github.com/achingbrain/it/tree/main/packages/it-parallel-batch) Process (async)iterable values as functions with concurrency control - [`packages/it-peekable`](https://github.com/achingbrain/it/tree/main/packages/it-peekable) Allows peeking/pushing an iterable - [`packages/it-protobuf-stream`](https://github.com/achingbrain/it/tree/main/packages/it-protobuf-stream) Read and write protobuf messages over a duplex stream +- [`packages/it-queueless-pushable`](https://github.com/achingbrain/it/tree/main/packages/it-queueless-pushable) A pushable queue that waits until a value is consumed before accepting another - [`packages/it-reduce`](https://github.com/achingbrain/it/tree/main/packages/it-reduce) Reduces the values yielded from an async iterator - [`packages/it-rpc`](https://github.com/achingbrain/it/tree/main/packages/it-rpc) Schema-free RPC over async iterables - [`packages/it-skip`](https://github.com/achingbrain/it/tree/main/packages/it-skip) Skip items from an iterable diff --git a/packages/it-queueless-pushable/LICENSE b/packages/it-queueless-pushable/LICENSE new file mode 100644 index 00000000..20ce483c --- /dev/null +++ b/packages/it-queueless-pushable/LICENSE @@ -0,0 +1,4 @@ +This project is dual licensed under MIT and Apache-2.0. + +MIT: https://www.opensource.org/licenses/mit +Apache-2.0: https://www.apache.org/licenses/license-2.0 diff --git a/packages/it-queueless-pushable/LICENSE-APACHE b/packages/it-queueless-pushable/LICENSE-APACHE new file mode 100644 index 00000000..14478a3b --- /dev/null +++ b/packages/it-queueless-pushable/LICENSE-APACHE @@ -0,0 +1,5 @@ +Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. diff --git a/packages/it-queueless-pushable/LICENSE-MIT b/packages/it-queueless-pushable/LICENSE-MIT new file mode 100644 index 00000000..72dc60d8 --- /dev/null +++ b/packages/it-queueless-pushable/LICENSE-MIT @@ -0,0 +1,19 @@ +The MIT License (MIT) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/packages/it-queueless-pushable/README.md b/packages/it-queueless-pushable/README.md new file mode 100644 index 00000000..0ea7885e --- /dev/null +++ b/packages/it-queueless-pushable/README.md @@ -0,0 +1,82 @@ +# it-queueless-pushable + +[![codecov](https://img.shields.io/codecov/c/github/achingbrain/it.svg?style=flat-square)](https://codecov.io/gh/achingbrain/it) +[![CI](https://img.shields.io/github/actions/workflow/status/achingbrain/it/js-test-and-release.yml?branch=main\&style=flat-square)](https://github.com/achingbrain/it/actions/workflows/js-test-and-release.yml?query=branch%3Amain) + +> A pushable queue that waits until a value is consumed before accepting another + +# About + + + +Collects all `Uint8Array` values from an (async)iterable and returns them as a single `Uint8Array`. + +## Example + +```javascript +import toBuffer from 'it-to-buffer' + +// This can also be an iterator, generator, etc +const values = [Buffer.from([0, 1]), Buffer.from([2, 3])] + +const result = toBuffer(values) + +console.info(result) // Buffer[0, 1, 2, 3] +``` + +Async sources must be awaited: + +```javascript +import toBuffer from 'it-to-buffer' + +const values = async function * () { + yield Buffer.from([0, 1]) + yield Buffer.from([2, 3]) +} + +const result = await toBuffer(values()) + +console.info(result) // Buffer[0, 1, 2, 3] +``` + +# Install + +```console +$ npm i it-queueless-pushable +``` + +## Browser ` +``` + +# API Docs + +- + +# License + +Licensed under either of + +- Apache 2.0, ([LICENSE-APACHE](https://github.com/achingbrain/it/blob/main/packages/it-queueless-pushable/LICENSE-APACHE) / ) +- MIT ([LICENSE-MIT](https://github.com/achingbrain/it/blob/main/packages/it-queueless-pushable/LICENSE-MIT) / ) + +# Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions. diff --git a/packages/it-queueless-pushable/package.json b/packages/it-queueless-pushable/package.json new file mode 100644 index 00000000..0fd70025 --- /dev/null +++ b/packages/it-queueless-pushable/package.json @@ -0,0 +1,147 @@ +{ + "name": "it-queueless-pushable", + "version": "0.0.0", + "description": "A pushable queue that waits until a value is consumed before accepting another", + "author": "Alex Potsides ", + "license": "Apache-2.0 OR MIT", + "homepage": "https://github.com/achingbrain/it/tree/main/packages/it-queueless-pushable#readme", + "repository": { + "type": "git", + "url": "git+https://github.com/achingbrain/it.git" + }, + "bugs": { + "url": "https://github.com/achingbrain/it/issues" + }, + "publishConfig": { + "access": "public" + }, + "type": "module", + "types": "./dist/src/index.d.ts", + "files": [ + "src", + "dist", + "!dist/test", + "!**/*.tsbuildinfo" + ], + "exports": { + ".": { + "types": "./dist/src/index.d.ts", + "import": "./dist/src/index.js" + } + }, + "eslintConfig": { + "extends": "ipfs", + "parserOptions": { + "project": true, + "sourceType": "module" + } + }, + "release": { + "branches": [ + "main" + ], + "plugins": [ + [ + "@semantic-release/commit-analyzer", + { + "preset": "conventionalcommits", + "releaseRules": [ + { + "breaking": true, + "release": "major" + }, + { + "revert": true, + "release": "patch" + }, + { + "type": "feat", + "release": "minor" + }, + { + "type": "fix", + "release": "patch" + }, + { + "type": "docs", + "release": "patch" + }, + { + "type": "test", + "release": "patch" + }, + { + "type": "deps", + "release": "patch" + }, + { + "scope": "no-release", + "release": false + } + ] + } + ], + [ + "@semantic-release/release-notes-generator", + { + "preset": "conventionalcommits", + "presetConfig": { + "types": [ + { + "type": "feat", + "section": "Features" + }, + { + "type": "fix", + "section": "Bug Fixes" + }, + { + "type": "chore", + "section": "Trivial Changes" + }, + { + "type": "docs", + "section": "Documentation" + }, + { + "type": "deps", + "section": "Dependencies" + }, + { + "type": "test", + "section": "Tests" + } + ] + } + } + ], + "@semantic-release/changelog", + "@semantic-release/npm", + "@semantic-release/github", + "@semantic-release/git" + ] + }, + "scripts": { + "build": "aegir build", + "lint": "aegir lint", + "dep-check": "aegir dep-check", + "clean": "aegir clean", + "test": "aegir test", + "test:node": "aegir test -t node --cov", + "test:chrome": "aegir test -t browser --cov", + "test:chrome-webworker": "aegir test -t webworker", + "test:firefox": "aegir test -t browser -- --browser firefox", + "test:firefox-webworker": "aegir test -t webworker -- --browser firefox", + "release": "aegir release" + }, + "dependencies": { + "p-defer": "^4.0.1", + "race-signal": "^1.0.2" + }, + "devDependencies": { + "aegir": "^43.0.0", + "delay": "^6.0.0", + "it-all": "^3.0.6", + "it-drain": "^3.0.7" + } +} diff --git a/packages/it-queueless-pushable/src/index.ts b/packages/it-queueless-pushable/src/index.ts new file mode 100644 index 00000000..a7a78e2e --- /dev/null +++ b/packages/it-queueless-pushable/src/index.ts @@ -0,0 +1,165 @@ +/** + * @packageDocumentation + * + * A pushable async generator that waits until the current value is consumed + * before allowing a new value to be pushed. + * + * Useful for when you don't want to keep memory usage under control and/or + * allow a downstream consumer to dictate how fast data flows through a pipe, + * but you want to be able to apply a transform to that data. + * + * @example + * + * ```typescript + * import { queuelessPushable } from 'it-queueless-pushable' + * + * const pushable = queuelessPushable() + * + * // run asynchronously + * Promise.resolve().then(async () => { + * // push a value - the returned promise will not resolve until the value is + * // read from the pushable + * await pushable.push('hello') + * }) + * + * // read a value + * const result = await pushable.next() + * console.info(result) // { done: false, value: 'hello' } + * ``` + */ + +import deferred, { type DeferredPromise } from 'p-defer' +import { raceSignal, type RaceSignalOptions } from 'race-signal' + +export interface AbortOptions { + signal?: AbortSignal +} + +export interface Pushable extends AsyncGenerator { + /** + * End the iterable after all values in the buffer (if any) have been yielded. If an + * error is passed the buffer is cleared immediately and the next iteration will + * throw the passed error + */ + end(err?: Error, options?: AbortOptions & RaceSignalOptions): Promise + + /** + * Push a value into the iterable. Values are yielded from the iterable in the order + * they are pushed. Values not yet consumed from the iterable are buffered. + */ + push(value: T, options?: AbortOptions & RaceSignalOptions): Promise +} + +class QueuelessPushable implements Pushable { + private readNext: DeferredPromise + private haveNext: DeferredPromise + private ended: boolean + private nextResult: IteratorResult | undefined + + constructor () { + this.ended = false + + this.readNext = deferred() + this.haveNext = deferred() + } + + [Symbol.asyncIterator] (): AsyncGenerator { + return this + } + + async next (): Promise> { + if (this.nextResult == null) { + // wait for the supplier to push a value + await this.haveNext.promise + } + + if (this.nextResult == null) { + throw new Error('HaveNext promise resolved but nextResult was undefined') + } + + const nextResult = this.nextResult + this.nextResult = undefined + + // signal to the supplier that we read the value + this.readNext.resolve() + this.readNext = deferred() + + return nextResult + } + + async throw (err?: Error): Promise> { + this.ended = true + + if (err != null) { + // this can cause unhandled promise rejections if nothing is awaiting the + // next value so attach a dummy catch listener to the promise + this.haveNext.promise.catch(() => {}) + this.haveNext.reject(err) + } + + const result: IteratorReturnResult = { + done: true, + value: undefined + } + + return result + } + + async return (): Promise> { + const result: IteratorReturnResult = { + done: true, + value: undefined + } + + await this._push(undefined) + + return result + } + + async push (value: T, options?: AbortOptions & RaceSignalOptions): Promise { + await this._push(value, options) + } + + async end (err?: Error, options?: AbortOptions & RaceSignalOptions): Promise { + if (err != null) { + await this.throw(err) + } else { + // abortable return + await this._push(undefined, options) + } + } + + private async _push (value?: T, options?: AbortOptions & RaceSignalOptions): Promise { + if (value != null && this.ended) { + throw new Error('Cannot push value onto an ended pushable') + } + + // wait for all values to be read + while (this.nextResult != null) { + await this.readNext.promise + } + + if (value != null) { + this.nextResult = { done: false, value } + } else { + this.ended = true + this.nextResult = { done: true, value: undefined } + } + + // let the consumer know we have a new value + this.haveNext.resolve() + this.haveNext = deferred() + + // wait for the consumer to have finished processing the value and requested + // the next one or for the passed signal to abort the waiting + await raceSignal( + this.readNext.promise, + options?.signal, + options + ) + } +} + +export function queuelessPushable (): Pushable { + return new QueuelessPushable() +} diff --git a/packages/it-queueless-pushable/test/index.spec.ts b/packages/it-queueless-pushable/test/index.spec.ts new file mode 100644 index 00000000..562068f9 --- /dev/null +++ b/packages/it-queueless-pushable/test/index.spec.ts @@ -0,0 +1,137 @@ +import { expect } from 'aegir/chai' +import delay from 'delay' +import all from 'it-all' +import drain from 'it-drain' +import { queuelessPushable } from '../src/index.js' + +describe('it-queueless-pushable', () => { + it('should return a value', async () => { + const pushable = queuelessPushable() + + void Promise.resolve().then(async () => { + await pushable.push('hello') + }) + + const result = await pushable.next() + expect(result).to.deep.equal({ + done: false, + value: 'hello' + }) + }) + + it('should wait for the value to be consumed before accepting another value', async () => { + const pushable = queuelessPushable() + const actions: string[] = [] + + void Promise.resolve().then(async () => { + actions.push('push hello') + await pushable.push('hello') + actions.push('push world') + await pushable.push('world') + actions.push('end queue') + await pushable.end() + }) + + while (true) { + const result = await pushable.next() + + if (result.done === true) { + break + } + + actions.push(`consume ${result.value}`) + } + + expect(actions).to.deep.equal([ + 'push hello', + 'consume hello', + 'push world', + 'consume world', + 'end queue' + ]) + }) + + it('should be consumable as an async iterator', async () => { + const pushable = queuelessPushable() + + void Promise.resolve().then(async () => { + await pushable.push('hello') + await pushable.push('world') + await pushable.end() + }) + + await expect(all(pushable)).to.eventually.deep.equal([ + 'hello', + 'world' + ]) + }) + + it('should end with an error', async () => { + const err = new Error('Urk!') + const pushable = queuelessPushable() + + void Promise.resolve().then(async () => { + await pushable.push('hello') + await pushable.push('world') + await pushable.end(err) + }) + + await expect(all(pushable)).to.eventually.be.rejectedWith(err.message) + }) + + it('should return', async () => { + const pushable = queuelessPushable() + + void Promise.resolve().then(async () => { + await pushable.push('hello') + await pushable.push('world') + await pushable.return() + }) + + await expect(all(pushable)).to.eventually.be.ok() + }) + + it('should not be pushable after ending', async () => { + const pushable = queuelessPushable() + + void drain(pushable) + + await pushable.push('hello') + await pushable.push('world') + await pushable.end() + + await expect(pushable.push('nope!')).to.eventually.be.rejectedWith('Cannot push value onto an ended pushable') + }) + + it('should push in order even it promises are unawaited', async () => { + const pushable = queuelessPushable() + + void pushable.push('hello') + void pushable.push('world') + void pushable.end() + + await expect(all(pushable)).to.eventually.deep.equal([ + 'hello', + 'world' + ]) + }) + + it('should abort pushing if consumer is slow', async () => { + const pushable = queuelessPushable() + + await expect(pushable.push('hello', { + signal: AbortSignal.timeout(100) + })).to.eventually.be.rejected() + }) + + it('should not cause an unhandled promise rejection if the queue is aborted without a listener', async () => { + const err = new Error('Urk!') + const pushable = queuelessPushable() + void pushable.throw(err) + + // wait until a future tick to allow promise to reject without being caught + await delay(10) + + await expect(pushable.next()).to.eventually.be.rejectedWith(err.message) + }) +}) diff --git a/packages/it-queueless-pushable/tsconfig.json b/packages/it-queueless-pushable/tsconfig.json new file mode 100644 index 00000000..13a35996 --- /dev/null +++ b/packages/it-queueless-pushable/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "src", + "test" + ] +} diff --git a/packages/it-queueless-pushable/typedoc.json b/packages/it-queueless-pushable/typedoc.json new file mode 100644 index 00000000..f599dc72 --- /dev/null +++ b/packages/it-queueless-pushable/typedoc.json @@ -0,0 +1,5 @@ +{ + "entryPoints": [ + "./src/index.ts" + ] +}