From 2ac4765080c69c2a65027152fab8f309f64c0baa Mon Sep 17 00:00:00 2001 From: James M Snell Date: Thu, 21 Dec 2023 19:46:52 -0800 Subject: [PATCH] stream: fix cloned webstreams not being unref'd When cloning a `ReadableStream` and `WritableStream`, both use an internal `MessageChannel` to communicate with the original stream. Those, however, previously were not unref'd which would lead to the process not exiting if the stream was not fully consumed. Fixes: https://github.com/nodejs/node/issues/44985 --- lib/internal/webstreams/readablestream.js | 2 ++ lib/internal/webstreams/transfer.js | 4 +++- lib/internal/webstreams/writablestream.js | 2 ++ test/parallel/test-webstreams-clone-unref.js | 16 ++++++++++++++++ test/parallel/test-whatwg-webstreams-transfer.js | 5 +++++ 5 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-webstreams-clone-unref.js diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 4af209349341d1..887204a3ad7e89 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -609,6 +609,8 @@ class ReadableStream { [kTransferList]() { const { port1, port2 } = new MessageChannel(); + port1.unref(); + port2.unref(); this[kState].transfer.port1 = port1; this[kState].transfer.port2 = port2; return [ port2 ]; diff --git a/lib/internal/webstreams/transfer.js b/lib/internal/webstreams/transfer.js index 136b0d81a99464..c4cb4077f88403 100644 --- a/lib/internal/webstreams/transfer.js +++ b/lib/internal/webstreams/transfer.js @@ -143,6 +143,8 @@ class CrossRealmTransformReadableSource { error); port.close(); }; + + port.unref(); } start(controller) { @@ -210,7 +212,7 @@ class CrossRealmTransformWritableSink { error); port.close(); }; - + port.unref(); } start(controller) { diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 6dd7bc65566db6..6b010b51bad45b 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -304,6 +304,8 @@ class WritableStream { [kTransferList]() { const { port1, port2 } = new MessageChannel(); + port1.unref(); + port2.unref(); this[kState].transfer.port1 = port1; this[kState].transfer.port2 = port2; return [ port2 ]; diff --git a/test/parallel/test-webstreams-clone-unref.js b/test/parallel/test-webstreams-clone-unref.js new file mode 100644 index 00000000000000..88a9cebd9c3046 --- /dev/null +++ b/test/parallel/test-webstreams-clone-unref.js @@ -0,0 +1,16 @@ +'use strict'; + +require('../common'); +const { ok } = require('node:assert'); + +// This test verifies that cloned ReadableStream and WritableStream instances +// do not keep the process alive. The test fails if it timesout (it should just +// exit immediately) + +const rs1 = new ReadableStream(); +const ws1 = new WritableStream(); + +const [rs2, ws2] = structuredClone([rs1, ws1], { transfer: [rs1, ws1] }); + +ok(rs2 instanceof ReadableStream); +ok(ws2 instanceof WritableStream); diff --git a/test/parallel/test-whatwg-webstreams-transfer.js b/test/parallel/test-whatwg-webstreams-transfer.js index 01cfaa02ad075e..18a4ee615b58e0 100644 --- a/test/parallel/test-whatwg-webstreams-transfer.js +++ b/test/parallel/test-whatwg-webstreams-transfer.js @@ -464,12 +464,17 @@ const theData = 'hello'; tracker.verify(); }); + // We create an interval to keep the event loop alive while + // we wait for the stream read to complete. + const i = setInterval(() => {}, 1000); + parentPort.onmessage = tracker.calls(({ data }) => { assert(isReadableStream(data)); const reader = data.getReader(); reader.read().then(tracker.calls((result) => { assert(!result.done); assert(result.value instanceof Uint8Array); + clearInterval(i); })); parentPort.close(); });