-
-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for AsyncIterable as input #46
Changes from 5 commits
56a3bca
8477a7a
d5a0b26
6d4156d
8b6e6f0
c9e7112
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -20,33 +20,44 @@ export default async function pMap( | |||||
const result = []; | ||||||
const errors = []; | ||||||
const skippedIndexes = []; | ||||||
const iterator = iterable[Symbol.iterator](); | ||||||
let isRejected = false; | ||||||
let isResolved = false; | ||||||
let isIterableDone = false; | ||||||
let resolvingCount = 0; | ||||||
let currentIndex = 0; | ||||||
const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator](); | ||||||
|
||||||
const reject = reason => { | ||||||
isRejected = true; | ||||||
isResolved = true; | ||||||
reject_(reason); | ||||||
}; | ||||||
|
||||||
const next = () => { | ||||||
if (isRejected) { | ||||||
const next = async () => { | ||||||
if (isResolved) { | ||||||
return; | ||||||
} | ||||||
|
||||||
const nextItem = iterator.next(); | ||||||
const nextItem = await iterator.next(); | ||||||
|
||||||
const index = currentIndex; | ||||||
currentIndex++; | ||||||
|
||||||
// Note: iterator.next() can be called many times in parallel. | ||||||
// This can cause multiple calls to this next() function to | ||||||
// receive a `nextItem` with `done === true`. | ||||||
// The shutdown logic that rejects/resolves must be protected | ||||||
// so it runs only one time as the `skippedIndex` logic is | ||||||
// non-idempotent. | ||||||
if (nextItem.done) { | ||||||
isIterableDone = true; | ||||||
|
||||||
if (resolvingCount === 0) { | ||||||
if (resolvingCount === 0 && !isResolved) { | ||||||
if (!stopOnError && errors.length > 0) { | ||||||
reject(new AggregateError(errors)); | ||||||
} else { | ||||||
isResolved = true; | ||||||
|
||||||
for (const skippedIndex of skippedIndexes) { | ||||||
result.splice(skippedIndex, 1); | ||||||
} | ||||||
|
@@ -60,23 +71,25 @@ export default async function pMap( | |||||
|
||||||
resolvingCount++; | ||||||
|
||||||
// Intentionally detached | ||||||
(async () => { | ||||||
try { | ||||||
const element = await nextItem.value; | ||||||
|
||||||
if (isRejected) { | ||||||
if (isResolved) { | ||||||
return; | ||||||
} | ||||||
|
||||||
const value = await mapper(element, index); | ||||||
|
||||||
if (value === pMapSkip) { | ||||||
skippedIndexes.push(index); | ||||||
} else { | ||||||
result[index] = value; | ||||||
} | ||||||
|
||||||
resolvingCount--; | ||||||
next(); | ||||||
await next(); | ||||||
} catch (error) { | ||||||
if (stopOnError) { | ||||||
reject(error); | ||||||
|
@@ -89,7 +102,7 @@ export default async function pMap( | |||||
// If we continue calling next() indefinitely we will likely end up | ||||||
// in an infinite loop of failed iteration. | ||||||
try { | ||||||
next(); | ||||||
await next(); | ||||||
} catch (error) { | ||||||
reject(error); | ||||||
} | ||||||
|
@@ -98,23 +111,27 @@ export default async function pMap( | |||||
})(); | ||||||
}; | ||||||
|
||||||
for (let index = 0; index < concurrency; index++) { | ||||||
// Catch errors from the iterable.next() call | ||||||
// In that case we can't really continue regardless of stopOnError state | ||||||
// since an iterable is likely to continue throwing after it throws once. | ||||||
// If we continue calling next() indefinitely we will likely end up | ||||||
// in an infinite loop of failed iteration. | ||||||
try { | ||||||
next(); | ||||||
} catch (error) { | ||||||
reject(error); | ||||||
break; | ||||||
} | ||||||
// Create the concurrent runners in a detached (non-awaited) | ||||||
// promise. We need this so we can await the next() calls | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||
// to stop creating runners before hitting the concurrency limit | ||||||
// if the iterable has already been marked as done. | ||||||
// NOTE: We *must* do this for async iterators otherwise we'll spin up | ||||||
// infinite next() calls by default and never start the event loop. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done (plus more cases) |
||||||
(async () => { | ||||||
for (let index = 0; index < concurrency; index++) { | ||||||
try { | ||||||
// eslint-disable-next-line no-await-in-loop | ||||||
await next(); | ||||||
} catch (error) { | ||||||
reject(error); | ||||||
break; | ||||||
} | ||||||
|
||||||
if (isIterableDone || isRejected) { | ||||||
break; | ||||||
if (isIterableDone || isRejected) { | ||||||
break; | ||||||
} | ||||||
} | ||||||
} | ||||||
})(); | ||||||
}); | ||||||
} | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -43,9 +43,11 @@ Returns a `Promise` that is fulfilled when all promises in `input` and ones retu | |||||
|
||||||
#### input | ||||||
|
||||||
Type: `Iterable<Promise | unknown>` | ||||||
Type: `AsyncIterable<Promise<unknown> | unknown> | Iterable<Promise<unknown> | unknown>` | ||||||
|
||||||
Iterated over concurrently in the `mapper` function. | ||||||
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||
|
||||||
Asynchoronous iterables (distinguishing from iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchornous proces completes (e.g. reading from a remote queue when the queue has reached empty, or reading lines from a stream). | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||
|
||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You also need to update index.d.ts There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||
#### mapper(element, index) | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -7,23 +7,29 @@ import AggregateError from 'aggregate-error'; | |||||||||
import pMap, {pMapSkip} from './index.js'; | ||||||||||
|
||||||||||
const sharedInput = [ | ||||||||||
Promise.resolve([10, 300]), | ||||||||||
[async () => { | ||||||||||
return 10; | ||||||||||
}, 300], | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||||||
[20, 200], | ||||||||||
[30, 100] | ||||||||||
]; | ||||||||||
|
||||||||||
const errorInput1 = [ | ||||||||||
[20, 200], | ||||||||||
[30, 100], | ||||||||||
[() => Promise.reject(new Error('foo')), 10], | ||||||||||
[async () => { | ||||||||||
throw new Error('foo'); | ||||||||||
}, 10], | ||||||||||
[() => { | ||||||||||
throw new Error('bar'); | ||||||||||
}, 10] | ||||||||||
]; | ||||||||||
|
||||||||||
const errorInput2 = [ | ||||||||||
[20, 200], | ||||||||||
[() => Promise.reject(new Error('bar')), 10], | ||||||||||
[async () => { | ||||||||||
throw new Error('bar'); | ||||||||||
}, 10], | ||||||||||
[30, 100], | ||||||||||
[() => { | ||||||||||
throw new Error('foo'); | ||||||||||
|
@@ -151,21 +157,139 @@ test('pMapSkip', async t => { | |||||||||
], async value => value), [1, 2]); | ||||||||||
}); | ||||||||||
|
||||||||||
test('do not run mapping after stop-on-error happened', async t => { | ||||||||||
const input = [1, delay(300, {value: 2}), 3]; | ||||||||||
test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => { | ||||||||||
const input = [1, async () => delay(300, {value: 2}), 3]; | ||||||||||
const mappedValues = []; | ||||||||||
await t.throwsAsync( | ||||||||||
pMap(input, async value => { | ||||||||||
value = typeof value === 'function' ? await value() : value; | ||||||||||
mappedValues.push(value); | ||||||||||
if (value === 1) { | ||||||||||
await delay(100); | ||||||||||
throw new Error('Oops!'); | ||||||||||
} | ||||||||||
}, | ||||||||||
{concurrency: 1}) | ||||||||||
}) | ||||||||||
); | ||||||||||
await delay(500); | ||||||||||
t.deepEqual(mappedValues, [1]); | ||||||||||
t.deepEqual(mappedValues, [1, 3, 2]); | ||||||||||
}); | ||||||||||
|
||||||||||
class AsyncTestData { | ||||||||||
constructor(data) { | ||||||||||
this.data = data; | ||||||||||
} | ||||||||||
|
||||||||||
async * [Symbol.asyncIterator]() { | ||||||||||
for (let i = 0; i < this.data.length; i++) { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||||||
// Add a delay between each item iterated | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||||||
// eslint-disable-next-line no-await-in-loop | ||||||||||
await delay(10); | ||||||||||
yield this.data[i]; | ||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
// | ||||||||||
// Async Iterator tests | ||||||||||
// | ||||||||||
|
||||||||||
test('asyncIterator - main', async t => { | ||||||||||
const end = timeSpan(); | ||||||||||
t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper), [10, 20, 30]); | ||||||||||
|
||||||||||
// We give it some leeway on both sides of the expected 300ms as the exact value depends on the machine and workload. | ||||||||||
t.true(inRange(end(), {start: 290, end: 430})); | ||||||||||
}); | ||||||||||
|
||||||||||
test('asyncIterator - concurrency: 1', async t => { | ||||||||||
const end = timeSpan(); | ||||||||||
t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1}), [10, 20, 30]); | ||||||||||
t.true(inRange(end(), {start: 590, end: 760})); | ||||||||||
}); | ||||||||||
|
||||||||||
test('asyncIterator - concurrency: 4', async t => { | ||||||||||
const concurrency = 4; | ||||||||||
let running = 0; | ||||||||||
|
||||||||||
await pMap(new AsyncTestData(Array.from({length: 100}).fill(0)), async () => { | ||||||||||
running++; | ||||||||||
t.true(running <= concurrency); | ||||||||||
await delay(randomInt(30, 200)); | ||||||||||
running--; | ||||||||||
}, {concurrency}); | ||||||||||
}); | ||||||||||
|
||||||||||
test('asyncIterator - handles empty iterable', async t => { | ||||||||||
t.deepEqual(await pMap(new AsyncTestData([]), mapper), []); | ||||||||||
}); | ||||||||||
|
||||||||||
test('asyncIterator - async with concurrency: 2 (random time sequence)', async t => { | ||||||||||
const input = Array.from({length: 10}).map(() => randomInt(0, 100)); | ||||||||||
const mapper = value => delay(value, {value}); | ||||||||||
const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2}); | ||||||||||
t.deepEqual(result, input); | ||||||||||
}); | ||||||||||
|
||||||||||
test('asyncIterator - async with concurrency: 2 (problematic time sequence)', async t => { | ||||||||||
const input = [100, 200, 10, 36, 13, 45]; | ||||||||||
const mapper = value => delay(value, {value}); | ||||||||||
const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2}); | ||||||||||
t.deepEqual(result, input); | ||||||||||
}); | ||||||||||
|
||||||||||
test('asyncIterator - async with concurrency: 2 (out of order time sequence)', async t => { | ||||||||||
const input = [200, 100, 50]; | ||||||||||
const mapper = value => delay(value, {value}); | ||||||||||
const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2}); | ||||||||||
t.deepEqual(result, input); | ||||||||||
}); | ||||||||||
|
||||||||||
test('asyncIterator - enforce number in options.concurrency', async t => { | ||||||||||
await t.throwsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 0}), {instanceOf: TypeError}); | ||||||||||
await t.throwsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 1.5}), {instanceOf: TypeError}); | ||||||||||
await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 1})); | ||||||||||
await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 10})); | ||||||||||
await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: Number.POSITIVE_INFINITY})); | ||||||||||
}); | ||||||||||
|
||||||||||
test('asyncIterator - immediately rejects when stopOnError is true', async t => { | ||||||||||
await t.throwsAsync(pMap(new AsyncTestData(errorInput1), mapper, {concurrency: 1}), {message: 'foo'}); | ||||||||||
await t.throwsAsync(pMap(new AsyncTestData(errorInput2), mapper, {concurrency: 1}), {message: 'bar'}); | ||||||||||
}); | ||||||||||
|
||||||||||
test('asyncIterator - aggregate errors when stopOnError is false', async t => { | ||||||||||
await t.notThrowsAsync(pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1, stopOnError: false})); | ||||||||||
await t.throwsAsync(pMap(new AsyncTestData(errorInput1), mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /foo(.|\n)*bar/}); | ||||||||||
await t.throwsAsync(pMap(new AsyncTestData(errorInput2), mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /bar(.|\n)*foo/}); | ||||||||||
}); | ||||||||||
|
||||||||||
test('asyncIterator - pMapSkip', async t => { | ||||||||||
t.deepEqual(await pMap(new AsyncTestData([ | ||||||||||
1, | ||||||||||
pMapSkip, | ||||||||||
2 | ||||||||||
]), async value => value), [1, 2]); | ||||||||||
}); | ||||||||||
|
||||||||||
test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => { | ||||||||||
const input = [1, async () => delay(300, {value: 2}), 3]; | ||||||||||
const mappedValues = []; | ||||||||||
await t.throwsAsync( | ||||||||||
pMap(new AsyncTestData(input), async value => { | ||||||||||
if (typeof value === 'function') { | ||||||||||
value = await value(); | ||||||||||
} | ||||||||||
|
||||||||||
mappedValues.push(value); | ||||||||||
if (value === 1) { | ||||||||||
await delay(100); | ||||||||||
throw new Error(`Oops! ${value}`); | ||||||||||
} | ||||||||||
}), | ||||||||||
{message: 'Oops! 1'} | ||||||||||
); | ||||||||||
await delay(500); | ||||||||||
t.deepEqual(mappedValues, [1, 3, 2]); | ||||||||||
}); | ||||||||||
|
||||||||||
test('catches exception from source iterator - 1st item', async t => { | ||||||||||
|
@@ -225,3 +349,22 @@ test('catches exception from source iterator - 2nd item after 1st item mapper th | |||||||||
t.is(input.index, 2); | ||||||||||
t.deepEqual(mappedValues, [0]); | ||||||||||
}); | ||||||||||
|
||||||||||
test('asyncIterator - get the correct exception after stop-on-error', async t => { | ||||||||||
const input = [1, async () => delay(200, {value: 2}), async () => delay(300, {value: 3})]; | ||||||||||
const mappedValues = []; | ||||||||||
|
||||||||||
const task = pMap(new AsyncTestData(input), async value => { | ||||||||||
if (typeof value === 'function') { | ||||||||||
value = await value(); | ||||||||||
} | ||||||||||
|
||||||||||
mappedValues.push(value); | ||||||||||
// Throw for each item - all should fail and we should get only the first | ||||||||||
await delay(100); | ||||||||||
throw new Error(`Oops! ${value}`); | ||||||||||
}); | ||||||||||
await delay(500); | ||||||||||
await t.throwsAsync(task, {message: 'Oops! 1'}); | ||||||||||
t.deepEqual(mappedValues, [1, 2, 3]); | ||||||||||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should throw a TypeError with a human-friendly error if the input is neither an iterable or async iterable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think?
Inner function within the function so iterator can remain const? File scope function?
Nested
?
operator?if/else if/else with throw and iterator being non-const?
Also... when you say "TypeError with a human-friendly error"... did you mean a human friendly error-message? Or some specific field you want set on the error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done... i added a check for the iterator type at the top, similar to mapper / concurrency checks, which means that the existing code cannot have an error as it would have already thrown out