Skip to content

Commit

Permalink
[feature]: zlib deflate concurrency limit
Browse files Browse the repository at this point in the history
  • Loading branch information
STRML committed Sep 11, 2017
1 parent 80445e7 commit 4ec7b28
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
56 changes: 53 additions & 3 deletions lib/PerMessageDeflate.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,21 @@ const safeBuffer = require('safe-buffer');
const zlib = require('zlib');

const bufferUtil = require('./BufferUtil');
const Limiter = require('async-limiter');

const Buffer = safeBuffer.Buffer;

const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
const EMPTY_BLOCK = Buffer.from([0x00]);

// We limit zlib concurrency, which prevents severe memory fragmentation
// as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
// and https://github.com/websockets/ws/issues/1202
//
// Intentionally global; it's the global thread pool that's
// an issue.
let zlibLimiter;

/**
* Per-message Deflate implementation.
*/
Expand All @@ -25,6 +34,13 @@ class PerMessageDeflate {
this._inflate = null;

this.params = null;

if (!zlibLimiter) {
const concurrency = this._options.compressConcurrencyLimit !== undefined
? this._options.compressConcurrencyLimit
: 10;
zlibLimiter = new Limiter({ concurrency });
}
}

static get extensionName () {
Expand Down Expand Up @@ -249,14 +265,48 @@ class PerMessageDeflate {
}

/**
* Decompress data.
* Decompress data. Concurrency limited by async-limiter.
*
* @param {Buffer} data Compressed data
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @public
*/
decompress (data, fin, callback) {
zlibLimiter.push((done) => {
this._decompress(data, fin, function (err, result) {
done();
callback(err, result);
});
});
}

/**
* Compress data. Concurrency limited by async-limiter.
*
* @param {Buffer} data Data to compress
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @public
*/
compress (data, fin, callback) {
zlibLimiter.push((done) => {
this._compress(data, fin, function (err, result) {
done();
callback(err, result);
});
});
}

/**
* Decompress data.
*
* @param {Buffer} data Compressed data
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @private
*/
_decompress (data, fin, callback) {
const endpoint = this._isServer ? 'client' : 'server';

if (!this._inflate) {
Expand Down Expand Up @@ -322,9 +372,9 @@ class PerMessageDeflate {
* @param {Buffer} data Data to compress
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @public
* @private
*/
compress (data, fin, callback) {
_compress (data, fin, callback) {
if (!data || data.length === 0) {
process.nextTick(callback, null, EMPTY_BLOCK);
return;
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"lint": "eslint ."
},
"dependencies": {
"async-limiter": "^1.0.0",
"safe-buffer": "~5.1.0",
"ultron": "~1.1.0"
},
Expand Down

0 comments on commit 4ec7b28

Please sign in to comment.