-
-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for AsyncIterable as input #46
Conversation
That's how it's done. There's good way of mixing async and sync in JS. |
Which test exactly? Can you link to it? |
This test: https://github.com/sindresorhus/p-map/blob/main/test.js#L121-L135 Stop on error is problematic... there can be many concurrent mappers running and they will continue to run even if iteration is "stopped" after one of them throws. If the continuing mappers also throw they will have nowhere to throw to that can be caught. So it seems that the best way to handle an exception in one mapper is to check if any other mappers are running, push the exception onto an array, then let the other mappers either succeed or fail, and the last of them throws and AggregateError with the exceptions from all of them or just throws the single error if only one of them failed. It definitely should not throw until all of the mappers have finished though as that could lead to the consumer restarting the pMap while mappers from the prior run are still active. Ultimately what I ended up thinking was: if you need clean error handling then you should never allow the mapper to throw and instead handle the error yourself in the mapper. However, I don't think there is a way for the mapper to signal that iteration should stop early. Current form of the test in the PR
|
I added a PR with a failing test case that shows that multiple concurrent mapper exceptions result in only one of those errors being catchable: I think the stopOnError true/false logic can be somewhat consolidated so that both reject with an AggregateError when mappers have all stopped, but stopOnError true immediately stops invoking more mappers and iterating more source items (if applicable, as in concurrency !== Infinity). |
Per #49 (comment), |
- Expand capabilities to handle both async and sync iterables - There is a problem in the original functionality around the `stop-on-error` test that needs to be resolved - The test has a bug in that neither p-map nor the test mapper function, which is provided for that specific test and is not the common version, await the calling of the delay() function for item 2 - This means that item 2 will only ever return a Promise, never a value - When this is fixed to await the function call, it causes the returned results to change from `[1, 3]` to `[1, 3, 2]` - Fixing this test makes it clear that the `concurrency setup` loop does not wait for mappers, so the only way to make this test fail would be to have enough items that eventually the initial iteration could not reach the last items in the test array before the 100 ms delay on the exception expires - With `concurrency = 1` this test behaves as expected since there are not unlimited unawaited promises created - Let me know your thoughts on how to resolve this as it becomes more apparent with the behavior changes needed for asyncIterable (where the concurrency setup iteration of asyncIterable has to be awaited to prevent inifinte runners from being created) - The test, in the state it's in in this PR, does not actually demonstrate that stop on error works as intended
Co-authored-by: Sindre Sorhus <[email protected]>
I believe all outstanding issues have been addressed now. |
Can you update the readme too? |
- Also distinguish from items returning a promise - Update the TS types to indicate that iterated items can actually be a `Promise`
The readme has been updated. |
let isIterableDone = false; | ||
let resolvingCount = 0; | ||
let currentIndex = 0; | ||
const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator](); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should throw a TypeError with a human-friendly error if the input is neither an iterable or async iterable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think?
Inner function within the function so iterator can remain const? File scope function?
Nested ?
operator?
if/else if/else with throw and iterator being non-const?
Also... when you say "TypeError with a human-friendly error"... did you mean a human friendly error-message? Or some specific field you want set on the error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done... i added a check for the iterator type at the top, similar to mapper / concurrency checks, which means that the existing code cannot have an error as it would have already thrown out
index.js
Outdated
break; | ||
} | ||
// Create the concurrent runners in a detached (non-awaited) | ||
// promise. We need this so we can await the next() calls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// promise. We need this so we can await the next() calls | |
// promise. We need this so we can await the `next()` calls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
index.js
Outdated
// to stop creating runners before hitting the concurrency limit | ||
// if the iterable has already been marked as done. | ||
// NOTE: We *must* do this for async iterators otherwise we'll spin up | ||
// infinite next() calls by default and never start the event loop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// infinite next() calls by default and never start the event loop. | |
// infinite `next()` calls by default and never start the event loop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done (plus more cases)
Iterated over concurrently in the `mapper` function. | ||
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. | ||
|
||
Asynchoronous iterables (distinguishing from iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchornous proces completes (e.g. reading from a remote queue when the queue has reached empty, or reading lines from a stream). | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You also need to update index.d.ts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
readme.md
Outdated
|
||
Iterated over concurrently in the `mapper` function. | ||
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. | |
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
readme.md
Outdated
Iterated over concurrently in the `mapper` function. | ||
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. | ||
|
||
Asynchoronous iterables (distinguishing from iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchornous proces completes (e.g. reading from a remote queue when the queue has reached empty, or reading lines from a stream). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Asynchoronous iterables (distinguishing from iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchornous proces completes (e.g. reading from a remote queue when the queue has reached empty, or reading lines from a stream). | |
Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
test.js
Outdated
[async () => { | ||
return 10; | ||
}, 300], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[async () => { | |
return 10; | |
}, 300], | |
[async () => 10, 300], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
test.js
Outdated
|
||
async * [Symbol.asyncIterator]() { | ||
for (let i = 0; i < this.data.length; i++) { | ||
// Add a delay between each item iterated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Add a delay between each item iterated | |
// Add a delay between each iterated item |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
test.js
Outdated
} | ||
|
||
async * [Symbol.asyncIterator]() { | ||
for (let i = 0; i < this.data.length; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (let i = 0; i < this.data.length; i++) { | |
for (let index = 0; index < this.data.length; index++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Looks good. Thanks for contributing this :) |
No problem. Thanks for this (and other) modules and for the timely reviews and merges! |
Overview
Open Issues
ava
stop-on-error
test that needs to be resolved[1, 3]
to[1, 3, 2]
concurrency setup
loop does not wait for mappers, so the only way to make this test fail would be to have enough items that eventually the initial iteration could not reach the last items in the test array before the 100 ms delay on the exception expiresconcurrency = 1
this test behaves as expected since there are not unlimited unawaited promises created