Skip to content

Commit

Permalink
Wait for both reads and writes to finish before finalizing pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
MattiasBuelens committed Oct 2, 2021
1 parent 0ec648d commit 4d08211
Showing 1 changed file with 45 additions and 19 deletions.
64 changes: 45 additions & 19 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC

let shuttingDown = false;

// This is used to keep track of the spec's requirement that we wait for ongoing writes during shutdown.
// This is used to keep track of the spec's requirement that we wait for ongoing reads and writes during shutdown.
let currentRead = promiseResolvedWith(undefined);
let currentWrite = promiseResolvedWith(undefined);

return new Promise((resolve, reject) => {
Expand Down Expand Up @@ -200,21 +201,21 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}

return transformPromiseWith(writer._readyPromise, () => {
return new Promise((resolveRead, rejectRead) => {
currentRead = new Promise((resolveRead, rejectRead) => {
ReadableStreamDefaultReaderRead(
reader,
{
chunkSteps: chunk => {
currentWrite = transformPromiseWith(
WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {}
);
currentWrite = WritableStreamDefaultWriterWrite(writer, chunk);
setPromiseIsHandledToTrue(currentWrite);
resolveRead(false);
},
closeSteps: () => resolveRead(true),
errorSteps: rejectRead
}
);
});
return currentRead;
});
}

Expand Down Expand Up @@ -259,13 +260,38 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
setPromiseIsHandledToTrue(pipeLoop());

function waitForWritesToFinish() {
// Another write may have started while we were waiting on this currentWrite, so we have to be sure to wait
// for that too.
const oldCurrentWrite = currentWrite;
return transformPromiseWith(
currentWrite,
() => oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined
);
let oldCurrentWrite;
return promiseResolvedWith(check());

function check() {
// Another write may have started while we were waiting on this currentWrite,
// so we have to be sure to wait for that too.
if (oldCurrentWrite !== currentWrite) {
oldCurrentWrite = currentWrite;
return transformPromiseWith(currentWrite, check, check);
}
return undefined;
}
}

function waitForReadsAndWritesToFinish() {
let oldCurrentRead;
let oldCurrentWrite;
return promiseResolvedWith(check());

function check() {
// Another read or write may have started while we were waiting on this currentRead or currentWrite,
// so we have to be sure to wait for that too.
if (oldCurrentRead !== currentRead) {
oldCurrentRead = currentRead;
return transformPromiseWith(currentRead, check, check);
}
if (oldCurrentWrite !== currentWrite) {
oldCurrentWrite = currentWrite;
return transformPromiseWith(currentWrite, check, check);
}
return undefined;
}
}

function isOrBecomesErrored(stream, promise, action) {
Expand Down Expand Up @@ -299,8 +325,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
function doTheRest() {
uponPromise(
action(),
() => finalize(originalIsError, originalError),
newError => finalize(true, newError)
() => waitForReadsAndWritesThenFinalize(originalIsError, originalError),
newError => waitForReadsAndWritesThenFinalize(true, newError)
);
}
}
Expand All @@ -311,11 +337,11 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}
shuttingDown = true;

if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) {
uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error));
} else {
finalize(isError, error);
}
waitForReadsAndWritesThenFinalize(isError, error);
}

function waitForReadsAndWritesThenFinalize(isError, error) {
uponFulfillment(waitForReadsAndWritesToFinish(), () => finalize(isError, error));
}

function finalize(isError, error) {
Expand Down

0 comments on commit 4d08211

Please sign in to comment.