-
Notifications
You must be signed in to change notification settings - Fork 30.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test: add stream map tests #41642
test: add stream map tests #41642
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -5,16 +5,14 @@ const { | |||||||||||||||||||||||||||||||||||||||
Readable, | ||||||||||||||||||||||||||||||||||||||||
} = require('stream'); | ||||||||||||||||||||||||||||||||||||||||
const assert = require('assert'); | ||||||||||||||||||||||||||||||||||||||||
const { once } = require('events'); | ||||||||||||||||||||||||||||||||||||||||
const { setTimeout } = require('timers/promises'); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
// Map works on synchronous streams with a synchronous mapper | ||||||||||||||||||||||||||||||||||||||||
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x); | ||||||||||||||||||||||||||||||||||||||||
const result = [2, 4, 6, 8, 10]; | ||||||||||||||||||||||||||||||||||||||||
(async () => { | ||||||||||||||||||||||||||||||||||||||||
for await (const item of stream) { | ||||||||||||||||||||||||||||||||||||||||
assert.strictEqual(item, result.shift()); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]); | ||||||||||||||||||||||||||||||||||||||||
})().then(common.mustCall()); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
|
@@ -24,7 +22,49 @@ const { setTimeout } = require('timers/promises'); | |||||||||||||||||||||||||||||||||||||||
await Promise.resolve(); | ||||||||||||||||||||||||||||||||||||||||
return x + x; | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
const result = [2, 4, 6, 8, 10]; | ||||||||||||||||||||||||||||||||||||||||
(async () => { | ||||||||||||||||||||||||||||||||||||||||
assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]); | ||||||||||||||||||||||||||||||||||||||||
})().then(common.mustCall()); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
// Map works on asynchronous streams with a asynchronous mapper | ||||||||||||||||||||||||||||||||||||||||
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { | ||||||||||||||||||||||||||||||||||||||||
return x + x; | ||||||||||||||||||||||||||||||||||||||||
}).map((x) => x + x); | ||||||||||||||||||||||||||||||||||||||||
(async () => { | ||||||||||||||||||||||||||||||||||||||||
assert.deepStrictEqual(await stream.toArray(), [4, 8, 12, 16, 20]); | ||||||||||||||||||||||||||||||||||||||||
})().then(common.mustCall()); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
// Map works on an infinite stream | ||||||||||||||||||||||||||||||||||||||||
const stream = Readable.from(async function* () { | ||||||||||||||||||||||||||||||||||||||||
while (true) yield 1; | ||||||||||||||||||||||||||||||||||||||||
}()).map(common.mustCall(async (x) => { | ||||||||||||||||||||||||||||||||||||||||
return x + x; | ||||||||||||||||||||||||||||||||||||||||
}, 5)); | ||||||||||||||||||||||||||||||||||||||||
(async () => { | ||||||||||||||||||||||||||||||||||||||||
let i = 1; | ||||||||||||||||||||||||||||||||||||||||
for await (const item of stream) { | ||||||||||||||||||||||||||||||||||||||||
assert.strictEqual(item, 2); | ||||||||||||||||||||||||||||||||||||||||
if (++i === 5) break; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
})().then(common.mustCall()); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
// Map works on non-objectMode streams | ||||||||||||||||||||||||||||||||||||||||
const stream = new Readable({ | ||||||||||||||||||||||||||||||||||||||||
read() { | ||||||||||||||||||||||||||||||||||||||||
this.push(Uint8Array.from([1])); | ||||||||||||||||||||||||||||||||||||||||
this.push(Uint8Array.from([2])); | ||||||||||||||||||||||||||||||||||||||||
this.push(null); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
}).map(async ([x]) => { | ||||||||||||||||||||||||||||||||||||||||
return x + x; | ||||||||||||||||||||||||||||||||||||||||
}).map((x) => x + x); | ||||||||||||||||||||||||||||||||||||||||
const result = [4, 8]; | ||||||||||||||||||||||||||||||||||||||||
(async () => { | ||||||||||||||||||||||||||||||||||||||||
for await (const item of stream) { | ||||||||||||||||||||||||||||||||||||||||
assert.strictEqual(item, result.shift()); | ||||||||||||||||||||||||||||||||||||||||
|
@@ -33,39 +73,88 @@ const { setTimeout } = require('timers/promises'); | |||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
// Map works on asynchronous streams with a asynchronous mapper | ||||||||||||||||||||||||||||||||||||||||
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { | ||||||||||||||||||||||||||||||||||||||||
// Does not care about data events | ||||||||||||||||||||||||||||||||||||||||
const source = new Readable({ | ||||||||||||||||||||||||||||||||||||||||
read() { | ||||||||||||||||||||||||||||||||||||||||
this.push(Uint8Array.from([1])); | ||||||||||||||||||||||||||||||||||||||||
this.push(Uint8Array.from([2])); | ||||||||||||||||||||||||||||||||||||||||
this.push(null); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
setImmediate(() => stream.emit('data', Uint8Array.from([1]))); | ||||||||||||||||||||||||||||||||||||||||
const stream = source.map(async ([x]) => { | ||||||||||||||||||||||||||||||||||||||||
return x + x; | ||||||||||||||||||||||||||||||||||||||||
}).map((x) => x + x); | ||||||||||||||||||||||||||||||||||||||||
const result = [4, 8, 12, 16, 20]; | ||||||||||||||||||||||||||||||||||||||||
const result = [4, 8]; | ||||||||||||||||||||||||||||||||||||||||
(async () => { | ||||||||||||||||||||||||||||||||||||||||
for await (const item of stream) { | ||||||||||||||||||||||||||||||||||||||||
assert.strictEqual(item, result.shift()); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
})().then(common.mustCall()); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
// Emitting an error during `map` | ||||||||||||||||||||||||||||||||||||||||
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { | ||||||||||||||||||||||||||||||||||||||||
if (x === 3) { | ||||||||||||||||||||||||||||||||||||||||
stream.emit('error', new Error('boom')); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
return x + x; | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
assert.rejects( | ||||||||||||||||||||||||||||||||||||||||
stream.map((x) => x + x).toArray(), | ||||||||||||||||||||||||||||||||||||||||
/boom/, | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+98
to
+106
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could check the error by reference here and below.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure that's actually a guarantee we necessarily want to make in this case (that is - that if we emit we don't wrap the error in any way) - the guarantee currently tested is much smaller (that we include the message). If you feel strongly it's a guarantee we should make - I'll change it (we should also probably update the docs) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the current implementation does reject with the same error object, and in the future we decide to wrap this object in some other object, wouldn't you agree this would be a semver-major change? That's why I would say testing for it would make sense. The proposal says we should There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Not on an experimental API - once it graduates 100% absolutely.
Yes but note that in this case the user is explicitly
Please keep making suggestions discussing these things is important! |
||||||||||||||||||||||||||||||||||||||||
).then(common.mustCall()); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
// Throwing an error during `map` (sync) | ||||||||||||||||||||||||||||||||||||||||
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => { | ||||||||||||||||||||||||||||||||||||||||
if (x === 3) { | ||||||||||||||||||||||||||||||||||||||||
throw new Error('boom'); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
return x + x; | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
assert.rejects( | ||||||||||||||||||||||||||||||||||||||||
stream.map((x) => x + x).toArray(), | ||||||||||||||||||||||||||||||||||||||||
/boom/, | ||||||||||||||||||||||||||||||||||||||||
).then(common.mustCall()); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
// Throwing an error during `map` (async) | ||||||||||||||||||||||||||||||||||||||||
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { | ||||||||||||||||||||||||||||||||||||||||
if (x === 3) { | ||||||||||||||||||||||||||||||||||||||||
throw new Error('boom'); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
return x + x; | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
assert.rejects( | ||||||||||||||||||||||||||||||||||||||||
stream.map((x) => x + x).toArray(), | ||||||||||||||||||||||||||||||||||||||||
/boom/, | ||||||||||||||||||||||||||||||||||||||||
).then(common.mustCall()); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
// Concurrency + AbortSignal | ||||||||||||||||||||||||||||||||||||||||
const ac = new AbortController(); | ||||||||||||||||||||||||||||||||||||||||
let calls = 0; | ||||||||||||||||||||||||||||||||||||||||
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => { | ||||||||||||||||||||||||||||||||||||||||
calls++; | ||||||||||||||||||||||||||||||||||||||||
await setTimeout(100, { signal }); | ||||||||||||||||||||||||||||||||||||||||
}, { signal: ac.signal, concurrency: 2 }); | ||||||||||||||||||||||||||||||||||||||||
const range = Readable.from([1, 2, 3, 4, 5]); | ||||||||||||||||||||||||||||||||||||||||
const stream = range.map(common.mustCall(async (_, { signal }) => { | ||||||||||||||||||||||||||||||||||||||||
await once(signal, 'abort'); | ||||||||||||||||||||||||||||||||||||||||
throw signal.reason; | ||||||||||||||||||||||||||||||||||||||||
}, 2), { signal: ac.signal, concurrency: 2 }); | ||||||||||||||||||||||||||||||||||||||||
// pump | ||||||||||||||||||||||||||||||||||||||||
assert.rejects(async () => { | ||||||||||||||||||||||||||||||||||||||||
for await (const item of stream) { | ||||||||||||||||||||||||||||||||||||||||
// nope | ||||||||||||||||||||||||||||||||||||||||
console.log(item); | ||||||||||||||||||||||||||||||||||||||||
assert.fail('should not reach here, got ' + item); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
}, { | ||||||||||||||||||||||||||||||||||||||||
name: 'AbortError', | ||||||||||||||||||||||||||||||||||||||||
}).then(common.mustCall()); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
setImmediate(() => { | ||||||||||||||||||||||||||||||||||||||||
ac.abort(); | ||||||||||||||||||||||||||||||||||||||||
assert.strictEqual(calls, 2); | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be sound to consider using a
.mjs
file for future tests like this so we can use TLA and skip all those async functions + then(mustCall).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if that was allowed but couldn’t find non-loader related .mjs tests. If it is I think it’s a good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aduh95 so can you confirm it's allowed :)? I'll port the file to
.mjs
I don't mind the tests running sequentially that'd actually make things easier to debug and the performance won't change since it's all microticks anyway.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing forbids it, so it's allowed (and there's no reason to avoid it since Node.js 10 is EOL and all maintained versions of Node.js support ESM). For example see
test/parallel/test-child-process-fork-url.mjs
andtest/parallel/test-fs-cp.mjs
which have little to do with loaders and have been written this way because it was more convenient.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack I'll move all of these to mjs and simplify them as soon as the two PRs by outside contributors land to not create conflicts for them.