From d5a0b26c5d33cf597f7b4562376e1243f6d0bda7 Mon Sep 17 00:00:00 2001 From: harold Date: Sun, 12 Sep 2021 15:38:34 -0400 Subject: [PATCH] Prevent skip removal logic from being run twice --- index.js | 29 +++++++++++++++-------------- test.js | 31 +++++++++++++++++++++++++------ 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/index.js b/index.js index 12f5756..473589a 100644 --- a/index.js +++ b/index.js @@ -21,42 +21,43 @@ export default async function pMap( const errors = []; const skippedIndexes = []; let isRejected = false; + let isRejectedOrResolved = false; let isIterableDone = false; let resolvingCount = 0; let currentIndex = 0; - let asyncIterator = false; - let iterator; - - if (iterable[Symbol.iterator] === undefined) { - // We've got an async iterable - iterator = iterable[Symbol.asyncIterator](); - asyncIterator = true; - } else { - iterator = iterable[Symbol.iterator](); - } + const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator](); const reject = reason => { isRejected = true; + isRejectedOrResolved = true; reject_(reason); }; const next = async () => { - if (isRejected) { + if (isRejectedOrResolved) { return; } - const nextItem = asyncIterator ? await iterator.next() : iterator.next(); + const nextItem = await iterator.next(); const index = currentIndex; currentIndex++; + // Note: iterator.next() can be called many times in parallel. + // This can cause multiple calls to this next() function to + // receive a `nextItem` with `done === true`. + // The shutdown logic that rejects/resolves must be protected + // so it runs only one time as the `skippedIndex` logic is + // non-idempotent. if (nextItem.done) { isIterableDone = true; - if (resolvingCount === 0) { + if (resolvingCount === 0 && !isRejectedOrResolved) { if (!stopOnError && errors.length > 0) { reject(new AggregateError(errors)); } else { + isRejectedOrResolved = true; + for (const skippedIndex of skippedIndexes) { result.splice(skippedIndex, 1); } @@ -75,7 +76,7 @@ export default async function pMap( try { const element = await nextItem.value; - if (isRejected) { + if (isRejectedOrResolved) { return; } diff --git a/test.js b/test.js index 687cb74..fe96714 100644 --- a/test.js +++ b/test.js @@ -157,7 +157,7 @@ test('pMapSkip', async t => { ], async value => value), [1, 2]); }); -test('do not run mapping after stop-on-error happened', async t => { +test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => { const input = [1, async () => delay(300, {value: 2}), 3]; const mappedValues = []; await t.throwsAsync( @@ -271,7 +271,7 @@ test('asyncIterator - pMapSkip', async t => { ]), async value => value), [1, 2]); }); -test('asyncIterator - do not run mapping after stop-on-error happened', async t => { +test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => { const input = [1, async () => delay(300, {value: 2}), 3]; const mappedValues = []; await t.throwsAsync( @@ -283,13 +283,13 @@ test('asyncIterator - do not run mapping after stop-on-error happened', async t mappedValues.push(value); if (value === 1) { await delay(100); - throw new Error('Oops!'); + throw new Error(`Oops! ${value}`); } - }, - {concurrency: 1}) + }), + {message: 'Oops! 1'} ); await delay(500); - t.deepEqual(mappedValues, [1]); + t.deepEqual(mappedValues, [1, 3, 2]); }); test('catches exception from source iterator - 1st item', async t => { @@ -349,3 +349,22 @@ test('catches exception from source iterator - 2nd item after 1st item mapper th t.is(input.index, 2); t.deepEqual(mappedValues, [0]); }); + +test('asyncIterator - get the correct exception after stop-on-error', async t => { + const input = [1, async () => delay(200, {value: 2}), async () => delay(300, {value: 3})]; + const mappedValues = []; + + const task = pMap(new AsyncTestData(input), async value => { + if (typeof value === 'function') { + value = await value(); + } + + mappedValues.push(value); + // Throw for each item - all should fail and we should get only the first + await delay(100); + throw new Error(`Oops! ${value}`); + }); + await delay(500); + await t.throwsAsync(task, {message: 'Oops! 1'}); + t.deepEqual(mappedValues, [1, 2, 3]); +});