-
Notifications
You must be signed in to change notification settings - Fork 30.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pipeline ends a script before its stream finishes (and leaves the stream with pendingcb > 0) #49052
Comments
Your problem is related to the never ending The cause of never ending
Here is the proper implementation of your case and I transform to import { Transform } from 'stream';
import { pipeline } from 'stream/promises';
await pipeline(
(() => {
const t = new Transform({
objectMode: true,
transform: (chunk, _, cb) => {
setTimeout(() => {
t.push(chunk)
cb()
}, 1000)
}
})
t.write(1)
t.end()
return t
})(),
async function (stream) {
for await(const item of stream) {
console.log(item)
}
}
)
console.log("pipeline finished") |
Thank you, @climba03003 . This indeed makes sense. I guess I somehow expected I shall close the issue, since there doesn't seem to be any bug to report, but will keep posting my progress below, since I guess somebody else might run into the same situation trying to figure out the same/similar algo, so it might help on their voyage :). I'll now go back to the drawing board and change my original code to take that into account. You see, my original idea, was to try to rewrite my usual tree-walking non-recursive algorithm, which used an array to store its in-progress queue, and have it not use the queue (well, at least not explicitly, but use a stream buffer instead of the queue). The initial idea for the algo was like this: await pipeline(
(() => {
const t = new Transform({
objectMode: true,
highWaterMark: 5,
transform: async (node, _, cb) => {
console.log("transform:", {item})
for await(const childNode of await getChildNodes({parentNode: node})) {
if (childNode.hasChildren) {
t.write(childNode)
} else {
t.push(childNode)
}
}
cb()
}
})
t.write(rootNode)
return t
})(),
async function (stream) {
for await (const item of stream) {
console.log("log:", {item})
}
},
) In a nutshell, it would just go thru every node it gets (expecting it to be a node with children), Of course, when I add the I am not sure if there's a simple (readable) enough construct that would allow a stream to I am now thinking of rewriting my algo to resemble the more common streaming ones, which would yield events for "node with children starts", "child node (with no children)", "node with children ends"... (e.g. I am entering a directory, I am listing a file in that directory, I am leaving the directory -- I am not working with files and directories, but that would be a good example). That should, theoretically, allow me to know exactly when to call I'll post more code as I have it. |
Alrighty then. I think I have a version with self-writing pipeline(
Readable.from([rootNode1, rootNode2]), // my scenario is that there's actually no `pipeline` but `compose` here, because anything can stream root nodes and get back their leaves (and they use the `compose`-d Duplex stream in their own `pipeline`s
(readable) => {
const t = new Transform({
objectMode: true,
highWaterMark: 5,
transform: async (node, _, cb) => {
for await(const child of await getChildNodes({parentNode: node})) {
if (child.hasChildren) {
t.write(child)
} else {
t.push(child)
}
}
if(readable.readableEnded && t.writableLength === 1)
t.end()
cb()
}
})
readable.on("end", () => {
if(t.writableLength === 0)
t.end()
})
return readable.pipe(t, {
end: false,
})
},
async function (stream) {
for await (const item of stream) {
console.log("log:", {item})
}
},
) I am pretty sure the two "events" that lead to |
Version
v18.17.0
Platform
Linux WHITEONE 5.15.62.1-microsoft-standard-WSL2 #1 SMP Wed Aug 24 22:24:20 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux
Subsystem
stream
What steps will reproduce the bug?
If I run this script, it will log just the number "1". Instead, I would expect it to log the number "1" and the "pipeline finished" text.
(My original use case was slightly different, this is essentially a dumbed down version that makes it simple for me to reproduce the underlying issue).
Do notice the use of
setTimeout
to delay thepush
call as well as the commented outcb()
call (which would also be delayed by thesetTimeout
). In fact, the commented outcb()
call is interesting in that, it doesn't really matter whether it is commented or not. The outcome is the same with it commented out or left uncommented.This might (or might not) be related to #46170, because when I add another
setTimeout
just above thereturn t
, have it wait for say 5 seconds, then let it log thet
, thet
has thependingcb
set to 1.Additionally, if I increase the number of
t.write(...)
calls, the only one that seems to make it is the very first one. The only implication increasing the amount ofwrite
s has is that the number ofpendingcb
ont
rises.As an example, here's an output from a script where I duplicated the
t.write(1)
to 3 times. Do notice thatpendingcb
is 3.Setting
highWaterMark
to 1 or even 0 gets us to this very same behavior as with the default 16.The docs from stream.Transform lead me to believe that
_transform
can be asynchronous:Why does the script "just exit" without truly waiting for the pipeline to finish? The "pipeline finished" text is never displayed. Unless this is a real bug, I suspect I am simply not seeing an exception because I am not looking for it properly (but why does the script still exit with 0, then -- it would be non-0 in case of an exception, wouldn't it?).
Am I doing it/using it wrong?
How often does it reproduce? Is there a required condition?
always
What is the expected behavior? Why is that the expected behavior?
What do you see instead?
Additional information
Cheers :)
The text was updated successfully, but these errors were encountered: