Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add stream map tests #41642

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 105 additions & 16 deletions test/parallel/test-stream-map.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@ const {
Readable,
} = require('stream');
const assert = require('assert');
const { once } = require('events');
const { setTimeout } = require('timers/promises');

{
// Map works on synchronous streams with a synchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
const result = [2, 4, 6, 8, 10];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
})().then(common.mustCall());
}

Expand All @@ -24,7 +22,49 @@ const { setTimeout } = require('timers/promises');
await Promise.resolve();
return x + x;
});
const result = [2, 4, 6, 8, 10];
(async () => {
assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
})().then(common.mustCall());
Comment on lines +25 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be sound to consider using a .mjs file for future tests like this so we can use TLA and skip all those async functions + then(mustCall).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if that was allowed but couldn’t find non-loader related .mjs tests. If it is I think it’s a good idea

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aduh95 so can you confirm it's allowed :)? I'll port the file to .mjs I don't mind the tests running sequentially that'd actually make things easier to debug and the performance won't change since it's all microticks anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing forbids it, so it's allowed (and there's no reason to avoid it since Node.js 10 is EOL and all maintained versions of Node.js support ESM). For example see test/parallel/test-child-process-fork-url.mjs and test/parallel/test-fs-cp.mjs which have little to do with loaders and have been written this way because it was more convenient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack I'll move all of these to mjs and simplify them as soon as the two PRs by outside contributors land to not create conflicts for them.

}

{
// Map works on asynchronous streams with a asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
return x + x;
}).map((x) => x + x);
(async () => {
assert.deepStrictEqual(await stream.toArray(), [4, 8, 12, 16, 20]);
})().then(common.mustCall());
}

{
// Map works on an infinite stream
const stream = Readable.from(async function* () {
while (true) yield 1;
}()).map(common.mustCall(async (x) => {
return x + x;
}, 5));
(async () => {
let i = 1;
for await (const item of stream) {
assert.strictEqual(item, 2);
if (++i === 5) break;
}
})().then(common.mustCall());
}

{
// Map works on non-objectMode streams
const stream = new Readable({
read() {
this.push(Uint8Array.from([1]));
this.push(Uint8Array.from([2]));
this.push(null);
}
}).map(async ([x]) => {
return x + x;
}).map((x) => x + x);
const result = [4, 8];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
Expand All @@ -33,39 +73,88 @@ const { setTimeout } = require('timers/promises');
}

{
// Map works on asynchronous streams with a asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
// Does not care about data events
const source = new Readable({
read() {
this.push(Uint8Array.from([1]));
this.push(Uint8Array.from([2]));
this.push(null);
}
});
setImmediate(() => stream.emit('data', Uint8Array.from([1])));
const stream = source.map(async ([x]) => {
return x + x;
}).map((x) => x + x);
const result = [4, 8, 12, 16, 20];
const result = [4, 8];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Emitting an error during `map`
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
if (x === 3) {
stream.emit('error', new Error('boom'));
}
return x + x;
});
assert.rejects(
stream.map((x) => x + x).toArray(),
/boom/,
Comment on lines +98 to +106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could check the error by reference here and below.

Suggested change
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
if (x === 3) {
stream.emit('error', new Error('boom'));
}
return x + x;
});
assert.rejects(
stream.map((x) => x + x).toArray(),
/boom/,
const error = new Error();
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
if (x === 3) {
stream.emit('error', error);
}
return x + x;
});
assert.rejects(
stream.map((x) => x + x).toArray(),
error,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that's actually a guarantee we necessarily want to make in this case (that is - that if we emit we don't wrap the error in any way) - the guarantee currently tested is much smaller (that we include the message).

If you feel strongly it's a guarantee we should make - I'll change it (we should also probably update the docs)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the current implementation does reject with the same error object, and in the future we decide to wrap this object in some other object, wouldn't you agree this would be a semver-major change? That's why I would say testing for it would make sense.

The proposal says we should Await at each step, so I would think that implies the same rejected object is returned. Maybe we should make another test that verifies that when using a non-Error objects, everything still behaves. Non-blocking suggestion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't you agree this would be a semver-major change?

Not on an experimental API - once it graduates 100% absolutely.

The proposal says we should Await at each step, so I would think that implies the same rejected object is returned.

Yes but note that in this case the user is explicitly emiting the error event rather than throwing an error - I agree that for throwing we 100% should align.


Non-blocking suggestion.

Please keep making suggestions discussing these things is important!

).then(common.mustCall());
}

{
// Throwing an error during `map` (sync)
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => {
if (x === 3) {
throw new Error('boom');
}
return x + x;
});
assert.rejects(
stream.map((x) => x + x).toArray(),
/boom/,
).then(common.mustCall());
}


{
// Throwing an error during `map` (async)
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
if (x === 3) {
throw new Error('boom');
}
return x + x;
});
assert.rejects(
stream.map((x) => x + x).toArray(),
/boom/,
).then(common.mustCall());
}

{
// Concurrency + AbortSignal
const ac = new AbortController();
let calls = 0;
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => {
calls++;
await setTimeout(100, { signal });
}, { signal: ac.signal, concurrency: 2 });
const range = Readable.from([1, 2, 3, 4, 5]);
const stream = range.map(common.mustCall(async (_, { signal }) => {
await once(signal, 'abort');
throw signal.reason;
}, 2), { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
for await (const item of stream) {
// nope
console.log(item);
assert.fail('should not reach here, got ' + item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());

setImmediate(() => {
ac.abort();
assert.strictEqual(calls, 2);
});
}

Expand Down