Skip to content

Commit

Permalink
Deflake ReadableStream piping flow control test (#5662)
Browse files Browse the repository at this point in the history
Test 'Piping to a WritableStream that does not consume the writes fast
enough exerts backpressure on the ReadableStream' in
streams/piping/flow-control.js is flaky on Chromium
memory sanitizer bots due to its use of timers.

Change to use an interleaved step design. Each step does not start until the
previous one has completed.

Relevant to whatwg/streams#548.
  • Loading branch information
ricea authored Apr 25, 2017
1 parent e1dcc27 commit 2a9976d
Showing 1 changed file with 80 additions and 46 deletions.
126 changes: 80 additions & 46 deletions streams/piping/flow-control.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,44 @@ promise_test(() => {

}, 'Piping from a ReadableStream to a WritableStream that desires more chunks before finishing with previous ones');

promise_test(() => {
class StepTracker {
constructor() {
this.waiters = [];
this.wakers = [];
}

// Returns promise which resolves when step `n` is reached. Also schedules step n + 1 to happen shortly after the
// promise is resolved.
waitThenAdvance(n) {
if (this.waiters[n] === undefined) {
this.waiters[n] = new Promise(resolve => {
this.wakers[n] = resolve;
});
this.waiters[n]
.then(() => flushAsyncEvents())
.then(() => {
if (this.wakers[n + 1] !== undefined) {
this.wakers[n + 1]();
}
});
}
if (n == 0) {
this.wakers[0]();
}
return this.waiters[n];
}
}

promise_test(() => {
const steps = new StepTracker();
const desiredSizes = [];
const rs = recordingReadableStream({
start(controller) {
delay(100).then(() => enqueue('a'));
delay(200).then(() => enqueue('b'));
delay(300).then(() => enqueue('c'));
delay(400).then(() => enqueue('d'));
delay(500).then(() => controller.close());
steps.waitThenAdvance(1).then(() => enqueue('a'));
steps.waitThenAdvance(3).then(() => enqueue('b'));
steps.waitThenAdvance(5).then(() => enqueue('c'));
steps.waitThenAdvance(7).then(() => enqueue('d'));
steps.waitThenAdvance(11).then(() => controller.close());

function enqueue(chunk) {
controller.enqueue(chunk);
Expand All @@ -190,80 +218,86 @@ promise_test(() => {

const chunksFinishedWriting = [];
const writableStartPromise = Promise.resolve();
let writeCalled = false;
const ws = recordingWritableStream({
start() {
return writableStartPromise;
},
write(chunk) {
return delay(350).then(() => {
const waitForStep = writeCalled ? 12 : 9;
writeCalled = true;
return steps.waitThenAdvance(waitForStep).then(() => {
chunksFinishedWriting.push(chunk);
});
}
});

return writableStartPromise.then(() => {
return Promise.all([
rs.pipeTo(ws).then(() => {
assert_array_equals(desiredSizes, [1, 0, -1, -2], 'backpressure must have been exerted at the source');
assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks started writing');

assert_array_equals(rs.eventsWithoutPulls, [], 'nothing unexpected should happen to the ReadableStream');
assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c', 'write', 'd', 'close'],
'all chunks were written (and the WritableStream closed)');
}),
const pipePromise = rs.pipeTo(ws);
steps.waitThenAdvance(0);

delay(125).then(() => {
assert_array_equals(chunksFinishedWriting, [], 'at t = 125 ms, zero chunks must have finished writing');
assert_array_equals(ws.events, ['write', 'a'], 'at t = 125 ms, one chunk must have been written');
return Promise.all([
steps.waitThenAdvance(2).then(() => {
assert_array_equals(chunksFinishedWriting, [], 'at step 2, zero chunks must have finished writing');
assert_array_equals(ws.events, ['write', 'a'], 'at step 2, one chunk must have been written');

// When 'a' (the very first chunk) was enqueued, it was immediately used to fulfill the outstanding read request
// promise, leaving the queue empty.
assert_array_equals(desiredSizes, [1],
'at t = 125 ms, the desiredSize at the last enqueue (100 ms) must have been 1');
assert_equals(rs.controller.desiredSize, 1, 'at t = 125 ms, the current desiredSize must be 1');
'at step 2, the desiredSize at the last enqueue (step 1) must have been 1');
assert_equals(rs.controller.desiredSize, 1, 'at step 2, the current desiredSize must be 1');
}),

delay(225).then(() => {
assert_array_equals(chunksFinishedWriting, [], 'at t = 225 ms, zero chunks must have finished writing');
assert_array_equals(ws.events, ['write', 'a'], 'at t = 225 ms, one chunk must have been written');
steps.waitThenAdvance(4).then(() => {
assert_array_equals(chunksFinishedWriting, [], 'at step 4, zero chunks must have finished writing');
assert_array_equals(ws.events, ['write', 'a'], 'at step 4, one chunk must have been written');

// When 'b' was enqueued at 200 ms, the queue was also empty, since immediately after enqueuing 'a' at
// t = 100 ms, it was dequeued in order to fulfill the read() call that was made at time t = 0. Thus the queue
// When 'b' was enqueued at step 3, the queue was also empty, since immediately after enqueuing 'a' at
// step 1, it was dequeued in order to fulfill the read() call that was made at step 0. Thus the queue
// had size 1 (thus desiredSize of 0).
assert_array_equals(desiredSizes, [1, 0],
'at t = 225 ms, the desiredSize at the last enqueue (200 ms) must have been 0');
assert_equals(rs.controller.desiredSize, 0, 'at t = 225 ms, the current desiredSize must be 0');
'at step 4, the desiredSize at the last enqueue (step 3) must have been 0');
assert_equals(rs.controller.desiredSize, 0, 'at step 4, the current desiredSize must be 0');
}),

delay(325).then(() => {
assert_array_equals(chunksFinishedWriting, [], 'at t = 325 ms, zero chunks must have finished writing');
assert_array_equals(ws.events, ['write', 'a'], 'at t = 325 ms, one chunk must have been written');
steps.waitThenAdvance(6).then(() => {
assert_array_equals(chunksFinishedWriting, [], 'at step 6, zero chunks must have finished writing');
assert_array_equals(ws.events, ['write', 'a'], 'at step 6, one chunk must have been written');

// When 'c' was enqueued at 300 ms, the queue was not empty; it had 'b' in it, since 'b' will not be read until
// the first write completes at 450 ms. Thus, the queue size is 2 after enqueuing 'c', giving a desiredSize of
// When 'c' was enqueued at step 5, the queue was not empty; it had 'b' in it, since 'b' will not be read until
// the first write completes at step 9. Thus, the queue size is 2 after enqueuing 'c', giving a desiredSize of
// -1.
assert_array_equals(desiredSizes, [1, 0, -1],
'at t = 325 ms, the desiredSize at the last enqueue (300 ms) must have been -1');
assert_equals(rs.controller.desiredSize, -1, 'at t = 325 ms, the current desiredSize must be -1');
'at step 6, the desiredSize at the last enqueue (step 5) must have been -1');
assert_equals(rs.controller.desiredSize, -1, 'at step 6, the current desiredSize must be -1');
}),

delay(425).then(() => {
assert_array_equals(chunksFinishedWriting, [], 'at t = 425 ms, zero chunks must have finished writing');
assert_array_equals(ws.events, ['write', 'a'], 'at t = 425 ms, one chunk must have been written');
steps.waitThenAdvance(8).then(() => {
assert_array_equals(chunksFinishedWriting, [], 'at step 8, zero chunks must have finished writing');
assert_array_equals(ws.events, ['write', 'a'], 'at step 8, one chunk must have been written');

// When 'd' was enqueued at 400 ms, the situation is the same as before, leading to a queue containing 'b', 'c',
// and 'd'. (Remember the first write will only finish at 100 ms + 350 ms = 450 ms.)
// When 'd' was enqueued at step 7, the situation is the same as before, leading to a queue containing 'b', 'c',
// and 'd'.
assert_array_equals(desiredSizes, [1, 0, -1, -2],
'at t = 425 ms, the desiredSize at the last enqueue (400 ms) must have been -2');
assert_equals(rs.controller.desiredSize, -2, 'at t = 425 ms, the current desiredSize must be -2');
'at step 8, the desiredSize at the last enqueue (step 7) must have been -2');
assert_equals(rs.controller.desiredSize, -2, 'at step 8, the current desiredSize must be -2');
}),

delay(475).then(() => {
assert_array_equals(chunksFinishedWriting, ['a'], 'at t = 475 ms, one chunk must have finished writing');
steps.waitThenAdvance(10).then(() => {
assert_array_equals(chunksFinishedWriting, ['a'], 'at step 10, one chunk must have finished writing');
assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
'at t = 475 ms, two chunks must have been written');
'at step 10, two chunks must have been written');

assert_equals(rs.controller.desiredSize, -1, 'at step 10, the current desiredSize must be -1');
}),

pipePromise.then(() => {
assert_array_equals(desiredSizes, [1, 0, -1, -2], 'backpressure must have been exerted at the source');
assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks finished writing');

assert_equals(rs.controller.desiredSize, -1, 'at t = 475 ms, the current desiredSize must be -1');
assert_array_equals(rs.eventsWithoutPulls, [], 'nothing unexpected should happen to the ReadableStream');
assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c', 'write', 'd', 'close'],
'all chunks were written (and the WritableStream closed)');
})
]);
});
Expand Down

0 comments on commit 2a9976d

Please sign in to comment.