diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index 843ced877..f3c790b50 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -141,7 +141,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC let shuttingDown = false; - // This is used to keep track of the spec's requirement that we wait for ongoing writes during shutdown. + // This is used to keep track of the spec's requirement that we wait for ongoing reads and writes during shutdown. + let currentRead = promiseResolvedWith(undefined); let currentWrite = promiseResolvedWith(undefined); return new Promise((resolve, reject) => { @@ -200,14 +201,13 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } return transformPromiseWith(writer._readyPromise, () => { - return new Promise((resolveRead, rejectRead) => { + currentRead = new Promise((resolveRead, rejectRead) => { ReadableStreamDefaultReaderRead( reader, { chunkSteps: chunk => { - currentWrite = transformPromiseWith( - WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {} - ); + currentWrite = WritableStreamDefaultWriterWrite(writer, chunk); + setPromiseIsHandledToTrue(currentWrite); resolveRead(false); }, closeSteps: () => resolveRead(true), @@ -215,6 +215,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } ); }); + return currentRead; }); } @@ -259,13 +260,38 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC setPromiseIsHandledToTrue(pipeLoop()); function waitForWritesToFinish() { - // Another write may have started while we were waiting on this currentWrite, so we have to be sure to wait - // for that too. - const oldCurrentWrite = currentWrite; - return transformPromiseWith( - currentWrite, - () => oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined - ); + let oldCurrentWrite; + return promiseResolvedWith(check()); + + function check() { + // Another write may have started while we were waiting on this currentWrite, + // so we have to be sure to wait for that too. + if (oldCurrentWrite !== currentWrite) { + oldCurrentWrite = currentWrite; + return transformPromiseWith(currentWrite, check, check); + } + return undefined; + } + } + + function waitForReadsAndWritesToFinish() { + let oldCurrentRead; + let oldCurrentWrite; + return promiseResolvedWith(check()); + + function check() { + // Another read or write may have started while we were waiting on this currentRead or currentWrite, + // so we have to be sure to wait for that too. + if (oldCurrentRead !== currentRead) { + oldCurrentRead = currentRead; + return transformPromiseWith(currentRead, check, check); + } + if (oldCurrentWrite !== currentWrite) { + oldCurrentWrite = currentWrite; + return transformPromiseWith(currentWrite, check, check); + } + return undefined; + } } function isOrBecomesErrored(stream, promise, action) { @@ -299,8 +325,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC function doTheRest() { uponPromise( action(), - () => finalize(originalIsError, originalError), - newError => finalize(true, newError) + () => waitForReadsAndWritesThenFinalize(originalIsError, originalError), + newError => waitForReadsAndWritesThenFinalize(true, newError) ); } } @@ -311,11 +337,11 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } shuttingDown = true; - if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { - uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error)); - } else { - finalize(isError, error); - } + waitForReadsAndWritesThenFinalize(isError, error); + } + + function waitForReadsAndWritesThenFinalize(isError, error) { + uponFulfillment(waitForReadsAndWritesToFinish(), () => finalize(isError, error)); } function finalize(isError, error) {