Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AsyncIterable as input #46

Merged
merged 6 commits into from
Sep 24, 2021

Conversation

huntharo
Copy link
Contributor

@huntharo huntharo commented Aug 25, 2021

Overview

Open Issues

  • Duplicating the tests for the async versions doesn't feel right - Let me know if there is a better way to handle this with ava
  • There is a problem in the original functionality around the stop-on-error test that needs to be resolved
    • The test has a bug in that neither p-map nor the test mapper function, which is provided for that specific test and is not the common version, await the calling of the delay() function for item 2
    • This means that item 2 will only ever return a Promise, never a value
    • When this is fixed to await the function call, it causes the returned results to change from [1, 3] to [1, 3, 2]
    • Fixing this test makes it clear that the concurrency setup loop does not wait for mappers, so the only way to make this test fail would be to have enough items that eventually the initial iteration could not reach the last items in the test array before the 100 ms delay on the exception expires
    • With concurrency = 1 this test behaves as expected since there are not unlimited unawaited promises created
    • Let me know your thoughts on how to resolve this as it becomes more apparent with the behavior changes needed for asyncIterable (where the concurrency setup iteration of asyncIterable has to be awaited to prevent inifinte runners from being created)
    • The test, in the state it's in in this PR, does not actually demonstrate that stop on error works as intended

@sindresorhus
Copy link
Owner

Duplicating the tests for the async versions doesn't feel right

That's how it's done. There's good way of mixing async and sync in JS.

@sindresorhus
Copy link
Owner

There is a problem in the original functionality around the stop-on-error test that needs to be resolved

Which test exactly? Can you link to it?

index.js Outdated Show resolved Hide resolved
index.js Outdated Show resolved Hide resolved
index.js Outdated Show resolved Hide resolved
@sindresorhus sindresorhus changed the title Implement #20 - Async Iterable Support Add support for async iterables as input Sep 7, 2021
@huntharo
Copy link
Contributor Author

huntharo commented Sep 7, 2021

There is a problem in the original functionality around the stop-on-error test that needs to be resolved

Which test exactly? Can you link to it?

This test: https://github.com/sindresorhus/p-map/blob/main/test.js#L121-L135

Stop on error is problematic... there can be many concurrent mappers running and they will continue to run even if iteration is "stopped" after one of them throws. If the continuing mappers also throw they will have nowhere to throw to that can be caught. So it seems that the best way to handle an exception in one mapper is to check if any other mappers are running, push the exception onto an array, then let the other mappers either succeed or fail, and the last of them throws and AggregateError with the exceptions from all of them or just throws the single error if only one of them failed. It definitely should not throw until all of the mappers have finished though as that could lead to the consumer restarting the pMap while mappers from the prior run are still active.

Ultimately what I ended up thinking was: if you need clean error handling then you should never allow the mapper to throw and instead handle the error yourself in the mapper. However, I don't think there is a way for the mapper to signal that iteration should stop early.

Current form of the test in the PR

  • The changes here are:
    • Add an async closure for the delay call so it can be detected as a function to be awaited - Checking instanceof Promise or ClearablePromise didn't work in this case
    • When the input value is a function, await the call of the function
    • Change the expected return value to 1, 3, 2 - Which is "wrong" as it shows that all values up to concurrency get mapped even if one of them throws an error before the others finish
  • The test does not demonstrate that iteration / mapper invocation stops after a mapper throws because the default concurrency is infinite and all of the mappers will be invoked before any one of them throws an exception - We can change this to have concurrency 1 and it will then return a result of [1], but all 3 mappers will still be invoked and will run to completion.
test('do not run mapping after stop-on-error happened', async t => {
	const input = [1, async () => delay(300, {value: 2}), 3];
	const mappedValues = [];
	await t.throwsAsync(
		pMap(input, async value => {
			value = typeof value === 'function' ? await value() : value;
			mappedValues.push(value);
			if (value === 1) {
				await delay(100);
				throw new Error('Oops!');
			}
		})
	);
	await delay(500);
	t.deepEqual(mappedValues, [1, 3, 2]);
});

@sindresorhus
Copy link
Owner

@huntharo
Copy link
Contributor Author

huntharo commented Sep 7, 2021

This test: https://github.com/sindresorhus/p-map/blob/main/test.js#L121-L135

// @papb ^

I added a PR with a failing test case that shows that multiple concurrent mapper exceptions result in only one of those errors being catchable:

#49

I think the stopOnError true/false logic can be somewhat consolidated so that both reject with an AggregateError when mappers have all stopped, but stopOnError true immediately stops invoking more mappers and iterating more source items (if applicable, as in concurrency !== Infinity).

@sindresorhus
Copy link
Owner

I think the stopOnError true/false logic can be somewhat consolidated so that both reject with an AggregateError when mappers have all stopped

Per #49 (comment), stopOnError: true should only return a single error. This is how Promise.all works too.

huntharo and others added 3 commits September 12, 2021 15:46
- Expand capabilities to handle both async and sync iterables
- There is a problem in the original functionality around the `stop-on-error` test that needs to be resolved
  - The test has a bug in that neither p-map nor the test mapper function, which is provided for that specific test and is not the common version, await the calling of the delay() function for item 2
  - This means that item 2 will only ever return a Promise, never a value
  - When this is fixed to await the function call, it causes the returned results to change from `[1, 3]` to `[1, 3, 2]`
  - Fixing this test makes it clear that the `concurrency setup` loop does not wait for mappers, so the only way to make this test fail would be to have enough items that eventually the initial iteration could not reach the last items in the test array before the 100 ms delay on the exception expires
  - With `concurrency = 1` this test behaves as expected since there are not unlimited unawaited promises created
  - Let me know your thoughts on how to resolve this as it becomes more apparent with the behavior changes needed for asyncIterable (where the concurrency setup iteration of asyncIterable has to be awaited to prevent inifinte runners from being created)
  - The test, in the state it's in in this PR, does not actually demonstrate that stop on error works as intended
Co-authored-by: Sindre Sorhus <[email protected]>
@huntharo
Copy link
Contributor Author

I believe all outstanding issues have been addressed now.

index.js Outdated Show resolved Hide resolved
@sindresorhus
Copy link
Owner

Can you update the readme too?

- Also distinguish from items returning a promise
- Update the TS types to indicate that iterated items can actually be a `Promise`
@huntharo
Copy link
Contributor Author

Can you update the readme too?

The readme has been updated.

@huntharo huntharo changed the title Add support for async iterables as input Add support for AsyncIterable as input Sep 17, 2021
let isIterableDone = false;
let resolvingCount = 0;
let currentIndex = 0;
const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator]();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should throw a TypeError with a human-friendly error if the input is neither an iterable or async iterable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think?

Inner function within the function so iterator can remain const? File scope function?

Nested ? operator?

if/else if/else with throw and iterator being non-const?

Also... when you say "TypeError with a human-friendly error"... did you mean a human friendly error-message? Or some specific field you want set on the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done... i added a check for the iterator type at the top, similar to mapper / concurrency checks, which means that the existing code cannot have an error as it would have already thrown out

index.js Outdated
break;
}
// Create the concurrent runners in a detached (non-awaited)
// promise. We need this so we can await the next() calls
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// promise. We need this so we can await the next() calls
// promise. We need this so we can await the `next()` calls

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

index.js Outdated
// to stop creating runners before hitting the concurrency limit
// if the iterable has already been marked as done.
// NOTE: We *must* do this for async iterators otherwise we'll spin up
// infinite next() calls by default and never start the event loop.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// infinite next() calls by default and never start the event loop.
// infinite `next()` calls by default and never start the event loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done (plus more cases)

Iterated over concurrently in the `mapper` function.
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item.

Asynchoronous iterables (distinguishing from iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchornous proces completes (e.g. reading from a remote queue when the queue has reached empty, or reading lines from a stream).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to update index.d.ts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

readme.md Outdated

Iterated over concurrently in the `mapper` function.
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item.
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

readme.md Outdated
Iterated over concurrently in the `mapper` function.
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item.

Asynchoronous iterables (distinguishing from iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchornous proces completes (e.g. reading from a remote queue when the queue has reached empty, or reading lines from a stream).
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Asynchoronous iterables (distinguishing from iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchornous proces completes (e.g. reading from a remote queue when the queue has reached empty, or reading lines from a stream).
Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

test.js Outdated
Comment on lines 10 to 12
[async () => {
return 10;
}, 300],
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[async () => {
return 10;
}, 300],
[async () => 10, 300],

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

test.js Outdated

async * [Symbol.asyncIterator]() {
for (let i = 0; i < this.data.length; i++) {
// Add a delay between each item iterated
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Add a delay between each item iterated
// Add a delay between each iterated item

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

test.js Outdated
}

async * [Symbol.asyncIterator]() {
for (let i = 0; i < this.data.length; i++) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (let i = 0; i < this.data.length; i++) {
for (let index = 0; index < this.data.length; index++) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@sindresorhus sindresorhus merged commit a24e909 into sindresorhus:main Sep 24, 2021
@sindresorhus
Copy link
Owner

Looks good. Thanks for contributing this :)

@huntharo
Copy link
Contributor Author

Looks good. Thanks for contributing this :)

No problem. Thanks for this (and other) modules and for the timely reviews and merges!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Async Iterable Protocol
2 participants