From 0628ffe968310702f87b60717fd80debb6d61358 Mon Sep 17 00:00:00 2001 From: Adam Rice Date: Mon, 21 Aug 2017 23:35:20 +0900 Subject: [PATCH 1/4] Has HasBackpressure operation to ReadableStream controllers The TransformStream implementation currently looks at desiredSize to get an approximation of whether there is backpressure on the readable side or not. This is fragile and not really clean enough for standardisation. Add an internal "HasBackpressure" polymorphic method to the ReadableStream*Controller classes to permit TransformStream to determine whether there is backpressure directly. --- index.bs | 22 +++++++++++++++++++ .../lib/readable-stream.js | 12 +++++++++- .../lib/transform-stream.js | 20 +++++------------ 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/index.bs b/index.bs index 128e1b0b2..45848e316 100644 --- a/index.bs +++ b/index.bs @@ -1618,6 +1618,17 @@ readable stream implementation will polymorphically call to either these or thei 1. Return ! PromiseInvokeOrNoop(*this*.[[underlyingSource]], `"cancel"`, « _reason_ ») +
\[[HasBackpressure]]()
+ +
+ This method is used in the implementation of TransformStream. +
+ + + 1. If ! ReadableStreamDefaultControllerShouldCallPull(*this*) is *true*, return *false*. + 1. Otherwise, return *true*. + +
\[[PullSteps]]()
@@ -1999,6 +2010,17 @@ readable stream implementation will polymorphically call to either these or thei 1. Return ! PromiseInvokeOrNoop(*this*.[[underlyingByteSource]], `"cancel"`, « _reason_ ») +
\[[HasBackpressure]]()
+ +
+ This method is used in the implementation of TransformStream. +
+ + + 1. If ! ReadableByteStreamControllerShouldCallPull(*this*) is *true*, return *false*. + 1. Otherwise, return *true*. + +
\[[PullSteps]]()
diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 261a7e389..a135f0aeb 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -13,6 +13,7 @@ const { AcquireWritableStreamDefaultWriter, IsWritableStream, IsWritableStreamLo const CancelSteps = Symbol('[[CancelSteps]]'); const PullSteps = Symbol('[[PullSteps]]'); +const HasBackpressure = Symbol('[[HasBackpressure]]'); class ReadableStream { constructor(underlyingSource = {}, { size, highWaterMark } = {}) { @@ -274,7 +275,8 @@ module.exports = { ReadableStreamDefaultControllerClose, ReadableStreamDefaultControllerEnqueue, ReadableStreamDefaultControllerError, - ReadableStreamDefaultControllerGetDesiredSize + ReadableStreamDefaultControllerGetDesiredSize, + HasBackpressure }; // Abstract operations for the ReadableStream. @@ -965,6 +967,10 @@ class ReadableStreamDefaultController { return PromiseInvokeOrNoop(this._underlyingSource, 'cancel', [reason]); } + [HasBackpressure]() { + return ReadableStreamDefaultControllerShouldCallPull(this) === false; + } + [PullSteps]() { const stream = this._controlledReadableStream; @@ -1329,6 +1335,10 @@ class ReadableByteStreamController { return PromiseInvokeOrNoop(this._underlyingByteSource, 'cancel', [reason]); } + [HasBackpressure]() { + return ReadableByteStreamControllerShouldCallPull(this) === false; + } + [PullSteps]() { const stream = this._controlledReadableStream; assert(ReadableStreamHasDefaultReader(stream) === true); diff --git a/reference-implementation/lib/transform-stream.js b/reference-implementation/lib/transform-stream.js index 39377b2b3..c6df2360b 100644 --- a/reference-implementation/lib/transform-stream.js +++ b/reference-implementation/lib/transform-stream.js @@ -1,7 +1,7 @@ 'use strict'; const assert = require('assert'); const { InvokeOrNoop, PromiseInvokeOrPerformFallback, PromiseInvokeOrNoop, typeIsObject } = require('./helpers.js'); -const { ReadableStream, ReadableStreamDefaultControllerClose, +const { HasBackpressure, ReadableStream, ReadableStreamDefaultControllerClose, ReadableStreamDefaultControllerEnqueue, ReadableStreamDefaultControllerError, ReadableStreamDefaultControllerGetDesiredSize } = require('./readable-stream.js'); const { WritableStream, WritableStreamDefaultControllerError } = require('./writable-stream.js'); @@ -45,10 +45,7 @@ class TransformStream { assert(this._writableController !== undefined); assert(this._readableController !== undefined); - const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(this._readableController); - // Set _backpressure based on desiredSize. As there is no read() at this point, we can just interpret - // desiredSize being non-positive as backpressure. - TransformStreamSetBackpressure(this, desiredSize <= 0); + TransformStreamSetBackpressure(this, this._readableController[HasBackpressure]()); const transformStream = this; const startResult = InvokeOrNoop(transformer, 'start', @@ -154,16 +151,9 @@ function TransformStreamEnqueueToReadable(transformStream, chunk) { throw transformStream._storedError; } - const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(controller); - const maybeBackpressure = desiredSize <= 0; - - if (maybeBackpressure === true && transformStream._backpressure === false) { - // This allows pull() again. When desiredSize is 0, it's possible that a pull() will happen immediately (but - // asynchronously) after this because of pending read()s and set _backpressure back to false. - // - // If pull() could be called from inside enqueue(), then this logic would be wrong. This cannot happen - // because there is always a promise pending from start() or pull() when _backpressure is false. - TransformStreamSetBackpressure(transformStream, true); + const backpressure = controller[HasBackpressure](); + if (backpressure !== transformStream._backpressure) { + TransformStreamSetBackpressure(transformStream, controller[HasBackpressure]()); } } From f5d5b775b7887e0997412a311f0a31947ea5c79c Mon Sep 17 00:00:00 2001 From: Adam Rice Date: Fri, 25 Aug 2017 17:00:10 +0900 Subject: [PATCH 2/4] Stop using polymorphism Replace the polymorphic HasBackpressure method with a non-polymorphic ReadableStreamDefaultControllerHasBackpressure method. --- index.bs | 33 +++++++------------ .../lib/readable-stream.js | 20 +++++------ .../lib/transform-stream.js | 11 ++++--- 3 files changed, 28 insertions(+), 36 deletions(-) diff --git a/index.bs b/index.bs index 45848e316..75607cf89 100644 --- a/index.bs +++ b/index.bs @@ -1618,16 +1618,6 @@ readable stream implementation will polymorphically call to either these or thei 1. Return ! PromiseInvokeOrNoop(*this*.[[underlyingSource]], `"cancel"`, « _reason_ ») -
\[[HasBackpressure]]()
- -
- This method is used in the implementation of TransformStream. -
- - - 1. If ! ReadableStreamDefaultControllerShouldCallPull(*this*) is *true*, return *false*. - 1. Otherwise, return *true*. -
\[[PullSteps]]()
@@ -1781,6 +1771,18 @@ Specifications should not use this on streams they did not create. 1. Return _controller_.[[strategyHWM]] − _controller_.[[queueTotalSize]]. +

ReadableStreamDefaultControllerHasBackpressure ( controller )

+ +
+ This method is used in the implementation of TransformStream. +
+ + + 1. If ! ReadableStreamDefaultControllerShouldCallPull(_controller_) is *true*, return *false*. + 1. Otherwise, return *true*. + +

Class ReadableByteStreamController

@@ -2010,17 +2012,6 @@ readable stream implementation will polymorphically call to either these or thei 1. Return ! PromiseInvokeOrNoop(*this*.[[underlyingByteSource]], `"cancel"`, « _reason_ ») -
\[[HasBackpressure]]()
- -
- This method is used in the implementation of TransformStream. -
- - - 1. If ! ReadableByteStreamControllerShouldCallPull(*this*) is *true*, return *false*. - 1. Otherwise, return *true*. - -
\[[PullSteps]]()
diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index a135f0aeb..447d3158a 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -13,7 +13,6 @@ const { AcquireWritableStreamDefaultWriter, IsWritableStream, IsWritableStreamLo const CancelSteps = Symbol('[[CancelSteps]]'); const PullSteps = Symbol('[[PullSteps]]'); -const HasBackpressure = Symbol('[[HasBackpressure]]'); class ReadableStream { constructor(underlyingSource = {}, { size, highWaterMark } = {}) { @@ -276,7 +275,7 @@ module.exports = { ReadableStreamDefaultControllerEnqueue, ReadableStreamDefaultControllerError, ReadableStreamDefaultControllerGetDesiredSize, - HasBackpressure + ReadableStreamDefaultControllerHasBackpressure }; // Abstract operations for the ReadableStream. @@ -967,10 +966,6 @@ class ReadableStreamDefaultController { return PromiseInvokeOrNoop(this._underlyingSource, 'cancel', [reason]); } - [HasBackpressure]() { - return ReadableStreamDefaultControllerShouldCallPull(this) === false; - } - [PullSteps]() { const stream = this._controlledReadableStream; @@ -1147,6 +1142,15 @@ function ReadableStreamDefaultControllerGetDesiredSize(controller) { return controller._strategyHWM - controller._queueTotalSize; } +// This is used in the implementation of TransformStream. +function ReadableStreamDefaultControllerHasBackpressure(controller) { + if (ReadableStreamDefaultControllerShouldCallPull(controller) === true) { + return false; + } + + return true; +} + class ReadableStreamBYOBRequest { constructor(controller, view) { this._associatedReadableByteStreamController = controller; @@ -1335,10 +1339,6 @@ class ReadableByteStreamController { return PromiseInvokeOrNoop(this._underlyingByteSource, 'cancel', [reason]); } - [HasBackpressure]() { - return ReadableByteStreamControllerShouldCallPull(this) === false; - } - [PullSteps]() { const stream = this._controlledReadableStream; assert(ReadableStreamHasDefaultReader(stream) === true); diff --git a/reference-implementation/lib/transform-stream.js b/reference-implementation/lib/transform-stream.js index c6df2360b..5ce307e73 100644 --- a/reference-implementation/lib/transform-stream.js +++ b/reference-implementation/lib/transform-stream.js @@ -1,9 +1,10 @@ 'use strict'; const assert = require('assert'); const { InvokeOrNoop, PromiseInvokeOrPerformFallback, PromiseInvokeOrNoop, typeIsObject } = require('./helpers.js'); -const { HasBackpressure, ReadableStream, ReadableStreamDefaultControllerClose, +const { ReadableStream, ReadableStreamDefaultControllerClose, ReadableStreamDefaultControllerEnqueue, ReadableStreamDefaultControllerError, - ReadableStreamDefaultControllerGetDesiredSize } = require('./readable-stream.js'); + ReadableStreamDefaultControllerGetDesiredSize, + ReadableStreamDefaultControllerHasBackpressure } = require('./readable-stream.js'); const { WritableStream, WritableStreamDefaultControllerError } = require('./writable-stream.js'); // Class TransformStream @@ -45,7 +46,7 @@ class TransformStream { assert(this._writableController !== undefined); assert(this._readableController !== undefined); - TransformStreamSetBackpressure(this, this._readableController[HasBackpressure]()); + TransformStreamSetBackpressure(this, ReadableStreamDefaultControllerHasBackpressure(this._readableController)); const transformStream = this; const startResult = InvokeOrNoop(transformer, 'start', @@ -151,9 +152,9 @@ function TransformStreamEnqueueToReadable(transformStream, chunk) { throw transformStream._storedError; } - const backpressure = controller[HasBackpressure](); + const backpressure = ReadableStreamDefaultControllerHasBackpressure(controller); if (backpressure !== transformStream._backpressure) { - TransformStreamSetBackpressure(transformStream, controller[HasBackpressure]()); + TransformStreamSetBackpressure(transformStream, backpressure); } } From af21cff63d35b6443da793f4f27decce506b98ae Mon Sep 17 00:00:00 2001 From: Adam Rice Date: Fri, 25 Aug 2017 17:28:52 +0900 Subject: [PATCH 3/4] Remove surplus blank line --- index.bs | 1 - 1 file changed, 1 deletion(-) diff --git a/index.bs b/index.bs index 75607cf89..a5d0180a7 100644 --- a/index.bs +++ b/index.bs @@ -1618,7 +1618,6 @@ readable stream implementation will polymorphically call to either these or thei 1. Return ! PromiseInvokeOrNoop(*this*.[[underlyingSource]], `"cancel"`, « _reason_ ») -
\[[PullSteps]]()
From fabfa57894ae27ce8c26a0af258b60b88005988f Mon Sep 17 00:00:00 2001 From: Adam Rice Date: Fri, 25 Aug 2017 20:35:34 +0900 Subject: [PATCH 4/4] _backpressure always starts true Hard-code it, and take advantage of the invariant to simplify the start() method for TSReadableSource. --- .../lib/transform-stream.js | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/reference-implementation/lib/transform-stream.js b/reference-implementation/lib/transform-stream.js index 5ce307e73..863a7565e 100644 --- a/reference-implementation/lib/transform-stream.js +++ b/reference-implementation/lib/transform-stream.js @@ -46,7 +46,7 @@ class TransformStream { assert(this._writableController !== undefined); assert(this._readableController !== undefined); - TransformStreamSetBackpressure(this, ReadableStreamDefaultControllerHasBackpressure(this._readableController)); + TransformStreamSetBackpressure(this, true); const transformStream = this; const startResult = InvokeOrNoop(transformer, 'start', @@ -391,20 +391,7 @@ class TransformStreamDefaultSource { transformStream._readableController = c; - return this._startPromise.then(() => { - // Prevent the first pull() call until there is backpressure. - - assert(transformStream._backpressureChangePromise !== undefined, - '_backpressureChangePromise should have been initialized'); - - if (transformStream._backpressure === true) { - return Promise.resolve(); - } - - assert(transformStream._backpressure === false, '_backpressure should have been initialized'); - - return transformStream._backpressureChangePromise; - }); + return this._startPromise; } pull() {