Skip to content

Commit

Permalink
zlib: fix interaction of flushing and needDrain
Browse files Browse the repository at this point in the history
Backport-PR-URL: #14571
Backport-Reviewed-By: James M Snell <[email protected]>

Fixes: #14523
PR-URL: #14527
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Colin Ihrig <[email protected]>
  • Loading branch information
addaleax committed Aug 2, 2017
1 parent b1fef05 commit e529914
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 2 deletions.
30 changes: 28 additions & 2 deletions lib/zlib.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ function Zlib(opts, mode) {
this._flushFlag = opts.flush || constants.Z_NO_FLUSH;
this._finishFlushFlag = opts.finishFlush !== undefined ?
opts.finishFlush : constants.Z_FINISH;
this._scheduledFlushFlag = constants.Z_NO_FLUSH;

if (opts.chunkSize !== undefined) {
if (opts.chunkSize < constants.Z_MIN_CHUNK) {
Expand Down Expand Up @@ -300,6 +301,23 @@ Zlib.prototype._flush = function _flush(callback) {
this._transform(Buffer.alloc(0), '', callback);
};

// If a flush is scheduled while another flush is still pending, a way to figure
// out which one is the "stronger" flush is needed.
// Roughly, the following holds:
// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH <
// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH
const flushiness = [];
let i = 0;
for (const flushFlag of [constants.Z_NO_FLUSH, constants.Z_BLOCK,
constants.Z_PARTIAL_FLUSH, constants.Z_SYNC_FLUSH,
constants.Z_FULL_FLUSH, constants.Z_FINISH]) {
flushiness[flushFlag] = i++;
}

function maxFlush(a, b) {
return flushiness[a] > flushiness[b] ? a : b;
}

Zlib.prototype.flush = function flush(kind, callback) {
var ws = this._writableState;

Expand All @@ -315,13 +333,21 @@ Zlib.prototype.flush = function flush(kind, callback) {
if (callback)
this.once('end', callback);
} else if (ws.needDrain) {
if (callback) {
const drainHandler = () => this.flush(kind, callback);
const alreadyHadFlushScheduled =
this._scheduledFlushFlag !== constants.Z_NO_FLUSH;
this._scheduledFlushFlag = maxFlush(kind, this._scheduledFlushFlag);

// If a callback was passed, always register a new `drain` + flush handler,
// mostly because that’s simpler and flush callbacks piling up is a rare
// thing anyway.
if (!alreadyHadFlushScheduled || callback) {
const drainHandler = () => this.flush(this._scheduledFlushFlag, callback);
this.once('drain', drainHandler);
}
} else {
this._flushFlag = kind;
this.write(Buffer.alloc(0), '', callback);
this._scheduledFlushFlag = constants.Z_NO_FLUSH;
}
};

Expand Down
27 changes: 27 additions & 0 deletions test/parallel/test-zlib-flush-drain-longblock.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict';

// Regression test for https://github.com/nodejs/node/issues/14523.
// Checks that flushes interact properly with writableState.needDrain,
// even if no flush callback was passed.

const common = require('../common');
const assert = require('assert');
const zlib = require('zlib');

const zipper = zlib.createGzip({ highWaterMark: 16384 });
const unzipper = zlib.createGunzip();
zipper.pipe(unzipper);

zipper.write('A'.repeat(17000));
zipper.flush();

let received = 0;
unzipper.on('data', common.mustCall((d) => {
received += d.length;
}, 2));

// Properly `.end()`ing the streams would interfere with checking that
// `.flush()` works.
process.on('exit', () => {
assert.strictEqual(received, 17000);
});
41 changes: 41 additions & 0 deletions test/parallel/test-zlib-flush-multiple-scheduled.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const zlib = require('zlib');

const {
Z_PARTIAL_FLUSH, Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH
} = zlib.constants;

common.crashOnUnhandledRejection();

async function getOutput(...sequenceOfFlushes) {
const zipper = zlib.createGzip({ highWaterMark: 16384 });

zipper.write('A'.repeat(17000));
for (const flush of sequenceOfFlushes) {
zipper.flush(flush);
}

const data = [];

return new Promise((resolve) => {
zipper.on('data', common.mustCall((d) => {
data.push(d);
if (data.length === 2) resolve(Buffer.concat(data));
}, 2));
});
}

(async function() {
assert.deepStrictEqual(await getOutput(Z_SYNC_FLUSH),
await getOutput(Z_SYNC_FLUSH, Z_PARTIAL_FLUSH));
assert.deepStrictEqual(await getOutput(Z_SYNC_FLUSH),
await getOutput(Z_PARTIAL_FLUSH, Z_SYNC_FLUSH));

assert.deepStrictEqual(await getOutput(Z_FINISH),
await getOutput(Z_FULL_FLUSH, Z_FINISH));
assert.deepStrictEqual(await getOutput(Z_FINISH),
await getOutput(Z_SYNC_FLUSH, Z_FINISH));
})();

0 comments on commit e529914

Please sign in to comment.