Skip to content

Commit

Permalink
test: add stream map tests
Browse files Browse the repository at this point in the history
Add more tests to check and enforce the behavior of the map method.

Co-Authored-By: Antoine du Hamel <[email protected]>
PR-URL: nodejs#41642
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Antoine du Hamel <[email protected]>
  • Loading branch information
2 people authored and Linkgoron committed Jan 31, 2022
1 parent 4d2861e commit 3839af9
Showing 1 changed file with 105 additions and 16 deletions.
121 changes: 105 additions & 16 deletions test/parallel/test-stream-map.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@ const {
Readable,
} = require('stream');
const assert = require('assert');
const { once } = require('events');
const { setTimeout } = require('timers/promises');

{
// Map works on synchronous streams with a synchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
const result = [2, 4, 6, 8, 10];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
})().then(common.mustCall());
}

Expand All @@ -24,7 +22,49 @@ const { setTimeout } = require('timers/promises');
await Promise.resolve();
return x + x;
});
const result = [2, 4, 6, 8, 10];
(async () => {
assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
})().then(common.mustCall());
}

{
// Map works on asynchronous streams with a asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
return x + x;
}).map((x) => x + x);
(async () => {
assert.deepStrictEqual(await stream.toArray(), [4, 8, 12, 16, 20]);
})().then(common.mustCall());
}

{
// Map works on an infinite stream
const stream = Readable.from(async function* () {
while (true) yield 1;
}()).map(common.mustCall(async (x) => {
return x + x;
}, 5));
(async () => {
let i = 1;
for await (const item of stream) {
assert.strictEqual(item, 2);
if (++i === 5) break;
}
})().then(common.mustCall());
}

{
// Map works on non-objectMode streams
const stream = new Readable({
read() {
this.push(Uint8Array.from([1]));
this.push(Uint8Array.from([2]));
this.push(null);
}
}).map(async ([x]) => {
return x + x;
}).map((x) => x + x);
const result = [4, 8];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
Expand All @@ -33,39 +73,88 @@ const { setTimeout } = require('timers/promises');
}

{
// Map works on asynchronous streams with a asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
// Does not care about data events
const source = new Readable({
read() {
this.push(Uint8Array.from([1]));
this.push(Uint8Array.from([2]));
this.push(null);
}
});
setImmediate(() => stream.emit('data', Uint8Array.from([1])));
const stream = source.map(async ([x]) => {
return x + x;
}).map((x) => x + x);
const result = [4, 8, 12, 16, 20];
const result = [4, 8];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Emitting an error during `map`
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
if (x === 3) {
stream.emit('error', new Error('boom'));
}
return x + x;
});
assert.rejects(
stream.map((x) => x + x).toArray(),
/boom/,
).then(common.mustCall());
}

{
// Throwing an error during `map` (sync)
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => {
if (x === 3) {
throw new Error('boom');
}
return x + x;
});
assert.rejects(
stream.map((x) => x + x).toArray(),
/boom/,
).then(common.mustCall());
}


{
// Throwing an error during `map` (async)
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
if (x === 3) {
throw new Error('boom');
}
return x + x;
});
assert.rejects(
stream.map((x) => x + x).toArray(),
/boom/,
).then(common.mustCall());
}

{
// Concurrency + AbortSignal
const ac = new AbortController();
let calls = 0;
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => {
calls++;
await setTimeout(100, { signal });
}, { signal: ac.signal, concurrency: 2 });
const range = Readable.from([1, 2, 3, 4, 5]);
const stream = range.map(common.mustCall(async (_, { signal }) => {
await once(signal, 'abort');
throw signal.reason;
}, 2), { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
for await (const item of stream) {
// nope
console.log(item);
assert.fail('should not reach here, got ' + item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());

setImmediate(() => {
ac.abort();
assert.strictEqual(calls, 2);
});
}

Expand Down

0 comments on commit 3839af9

Please sign in to comment.