Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(compression): implement wire protocol compression support
Browse files Browse the repository at this point in the history
NODE-1016, NODE-1027

* core work for getting compression message to travel

* Inital compression message test with mock server

* Renaming test

* typo in runner.js

* only emit fullsetup event once

* Extending tests

* Can decompress snappy messages

* Adding zlib support

* Reducing console logging verbosity

* Debugging mock

* Fixing mock to be able to send OP_COMPRESSED

* Added mock tests for various OP_COMPRESSED reception cases

* Edit no compression test to include doc insertion

* Compression tests attempt inserting

* Commenting out unneeded test

* Added test to check for invalid compressor args

* Allowing sending of outbound OP_COMPRESSED

* Don't compress uncompressible messages

* Removing changes to server_tests.js

* Refactoring

* Increasing test verbosity

* (WIP) receiving compressed messages in the mock

* (WIP) convering compressed mock msgs to original messages

* Mock server can receive compressed messages

* CRUD tests for mock server w/ compression

* Mock test bug fixes

* Added tests to mock to compress commands

* Testing that uncompressible commands aren't compressed

* Note in responses whether the message was originally compressed

* Starting integration testing

* Adding Wire Protocol Version detection to tests

* Renaming test

* Some auth tests in server_tests

* Removed erroneous test statements

* refactor(test-runner): don't check mongodb version if skip specified

* Use actual server type in standalone SDAM events:

Currently the server type inside the topologyDescriptionChanged event
for a single topology is 'Standalone', which is unexpected. According to
the SDAM specification a TopologyDescription has a list of
ServerDescriptions, which had a type field indicating the type of server
it is and does not state that the server type never changes if the
topology is single.

My expectation is that a single connection to a replica set primary, for
example, would give me a topology type of 'Single' and a server type of
'RSPrimary', not 'Standalone' like it currently does.

The serverDescriptionChanged event properly has the type set.

* test(sdam): add test for emitting correct SDAM server type

* Allow running tests with a particular version of mongod using the -v flag

* Tidying comments

* Create facility to spawn new mongod with options

* Create compression test

* Tidying snappy compression test implementation

* Removing unneeded import

* Tidying tests

* Switch to using deflate/inflate instead of zip/unzip

* Implementing async compression of outbound OP_COMPRESSED

* Changing clashing port number

* Using strict mode

* Switching to use forEach

* Tidying requires()

* Removing version flag

Use environment variable instead

* Tidying up

* Fixing typo

* [NODE-1021] [NODE-1023] OP_COMPRESSED reception support and mock testing

* core work for getting compression message to travel

* Inital compression message test with mock server

* Renaming test

* typo in runner.js

* Extending tests

* Can decompress snappy messages

* Adding zlib support

* Reducing console logging verbosity

* Debugging mock

* Fixing mock to be able to send OP_COMPRESSED

* Added mock tests for various OP_COMPRESSED reception cases

* Edit no compression test to include doc insertion

* Compression tests attempt inserting

* Commenting out unneeded test

* Added test to check for invalid compressor args

* Add snappy to package.json

* Removing use of "in" and debugging statements

* Changing magic number for a constant

* Remove extraneous console log

* Fixing typo

* Refactoring and bailing early

* Fixing typo

* chore(travis): only run CI for node LTS versions 4 & 6

* chore(travis): use containerized trusty builds

* Changing OP_COMMAND to OP_COMPRESSED

* Small refactor for reading a header

* (WIP) refactoring decompression to be async

* WIP Async decompression

* Tweaking test to use config port

* WIP Making decompression async

* Validate compressorId

* Async decompression now working

* Updating tests (copying in from NODE-1027)

* Adding strict mode

So we can use "let", etc… on older versions of Node

* Moving snappy to devDependencies

* Simplifying code

* Reducing reliance on constants

* Fixing mis-named variable

* Removing unneeded and unused function

* Defining OP_COMMAND constant once

* Improving code readability

* Making snappy optional

* Require_Optional Snappy

* Making decompression code clearer

* Making error message more informative

* Removing debug message

* Reordering function arguments so that "self" is first

* Removing incorrect environment

This environment has not yet been created in this branch

* Using constants to decide compression mechanism

* Removing unused code

* Using !== instead of !=

* Using pre-release require_master

* Tidying up

* Tidying use of constants

* Standardising use of compressorID and simplifying deciding which ID to use

* Tidying up

* Removing unneeded requires

* Updating Travis Node version, require_optional version, and creating package_lock

* Tidying Response creation

* Removing MongoDB 2.4.x from Travis

* Moving OPCODE numbers to wireprotocol/shared.js

* core work for getting compression message to travel

* Inital compression message test with mock server

* Extending tests

* Can decompress snappy messages

* Adding zlib support

* Reducing console logging verbosity

* Debugging mock

* Fixing mock to be able to send OP_COMPRESSED

* Added mock tests for various OP_COMPRESSED reception cases

* Compression tests attempt inserting

* Commenting out unneeded test

* Allowing sending of outbound OP_COMPRESSED

* Don't compress uncompressible messages

* Removing changes to server_tests.js

* Refactoring

* (WIP) receiving compressed messages in the mock

* (WIP) convering compressed mock msgs to original messages

* Mock server can receive compressed messages

* CRUD tests for mock server w/ compression

* Mock test bug fixes

* Added tests to mock to compress commands

* Starting integration testing

* Adding Wire Protocol Version detection to tests

* Renaming test

* Some auth tests in server_tests

* Removed erroneous test statements

* Allow running tests with a particular version of mongod using the -v flag

* Tidying comments

* Create facility to spawn new mongod with options

* Create compression test

* Tidying snappy compression test implementation

* Tidying tests

* Switch to using deflate/inflate instead of zip/unzip

* Implementing async compression of outbound OP_COMPRESSED

* Switching to use forEach

* Removing version flag

Use environment variable instead

* Tidying up

* Post-rebase tidying

* Tidying code

* Melding branches together

* Fixing comment

* Adding test topologies

* Improving how zlib compression level is set

There is no need to tell zlib to use the default compression level.

* Fixing compatibility with Node v4

Node v4 seems to have difficulty supporting Buffer.fill(someBuffer).

* Moving parseHeader

* Moving compressorIDs

* Moving uncompressibleCommands

* Moving compress and decompress

* Moving operation construction out of promise

* Removing use of promises in command serialization

* Fixing typo

* Refactoring hasUncompressibleCommands
  • Loading branch information
Sebastian Hallum Clarke authored and mbroadst committed Aug 6, 2017
1 parent 966ff6d commit 2356ffb
Show file tree
Hide file tree
Showing 13 changed files with 475 additions and 126 deletions.
23 changes: 3 additions & 20 deletions lib/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ var inherits = require('util').inherits
, crypto = require('crypto')
, f = require('util').format
, debugOptions = require('./utils').debugOptions
, parseHeader = require('./utils').parseHeader
, compressorIDs = require('./utils').compressorIDs
, parseHeader = require('../wireprotocol/shared').parseHeader
, decompress = require('../wireprotocol/compression').decompress
, Response = require('./commands').Response
, MongoError = require('../error')
, Logger = require('./logger')
Expand Down Expand Up @@ -82,7 +82,7 @@ var Connection = function(messageHandler, options) {
this.host = options.host || 'localhost';
this.family = typeof options.family == 'number' ? options.family : 4;
this.keepAlive = typeof options.keepAlive == 'boolean' ? options.keepAlive : true;
this.keepAliveInitialDelay = typeof options.keepAliveInitialDelay == 'number'
this.keepAliveInitialDelay = typeof options.keepAliveInitialDelay == 'number'
? options.keepAliveInitialDelay : 300000;
this.noDelay = typeof options.noDelay == 'boolean' ? options.noDelay : true;
this.connectionTimeout = typeof options.connectionTimeout == 'number'
Expand Down Expand Up @@ -221,23 +221,6 @@ var closeHandler = function(self) {
}
}

// Decompress a message using the given compressor
var decompress = function(compressorID, compressedData, callback) {
if (compressorID < 0 || compressorID > compressorIDs.length) {
throw new Error('Server sent message compressed using an unsupported compressor. (Received compressor ID ' + compressorID + ')');
}
switch (compressorID) {
case compressorIDs.snappy:
Snappy.uncompress(compressedData, callback);
break;
case compressorIDs.zlib:
zlib.inflate(compressedData, callback);
break;
default:
callback(null, compressedData);
}
}

// Handle a message once it is recieved
var emitMessageHandler = function (self, message) {
var msgHeader = parseHeader(message);
Expand Down
136 changes: 99 additions & 37 deletions lib/connection/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ var inherits = require('util').inherits,
f = require('util').format,
Query = require('./commands').Query,
CommandResult = require('./command_result'),
assign = require('../utils').assign;
assign = require('../utils').assign,
Snappy = require('./utils').retrieveSnappy(),
zlib = require('zlib'),
MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE,
opcodes = require('../wireprotocol/shared').opcodes,
compress = require('../wireprotocol/compression').compress,
compressorIDs = require('../wireprotocol/compression').compressorIDs,
uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands;

var MongoCR = require('../auth/mongocr')
, X509 = require('../auth/x509')
Expand Down Expand Up @@ -918,6 +925,56 @@ Pool.prototype.destroy = function(force) {
checkStatus();
}

// Prepare the buffer that Pool.prototype.write() uses to send to the server
var serializeCommands = function(self, commands, result, callback) {
// Base case when there are no more commands to serialize
if (commands.length === 0) return callback(null, result);

// Pop off the zeroth command and serialize it
var thisCommand = commands.shift();
var originalCommandBuffer = thisCommand.toBin();

// Check whether we and the server have agreed to use a compressor
if (self.options.agreedCompressor && !hasUncompressibleCommands(thisCommand)) {
// Transform originalCommandBuffer into OP_COMPRESSED
var concatenatedOriginalCommandBuffer = Buffer.concat(originalCommandBuffer);
var messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);

// Extract information needed for OP_COMPRESSED from the uncompressed message
var originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);

// Compress the message body
compress(self, messageToBeCompressed, function(err, compressedMessage) {
if (err) return callback(err, null);

// Create the msgHeader of OP_COMPRESSED
var msgHeader = new Buffer(MESSAGE_HEADER_SIZE);
msgHeader.writeInt32LE(MESSAGE_HEADER_SIZE + 9 + compressedMessage.length, 0); // messageLength
msgHeader.writeInt32LE(thisCommand.requestId, 4); // requestID
msgHeader.writeInt32LE(0, 8); // responseTo (zero)
msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode

// Create the compression details of OP_COMPRESSED
var compressionDetails = new Buffer(9);
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
compressionDetails.writeUInt8(compressorIDs[self.options.agreedCompressor], 8); // compressorID

// Push the concatenation of the OP_COMPRESSED message onto results
result.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));

// Continue recursing through the commands array
serializeCommands(self, commands, result, callback);
})
} else {
// Push the serialization of the command onto results
result.push(originalCommandBuffer);

// Continue recursing through the commands array
serializeCommands(self, commands, result, callback);
}
}

/**
* Write a message to MongoDB
* @method
Expand All @@ -933,6 +990,11 @@ Pool.prototype.write = function(commands, options, cb) {
// Always have options
options = options || {};

// We need to have a callback function unless the message returns no response
if(!(typeof cb == 'function') && !options.noResponse) {
throw new MongoError('write method must provide a callback');
}

// Pool was destroyed error out
if(this.state == DESTROYED || this.state == DESTROYING) {
// Callback with an error
Expand Down Expand Up @@ -968,25 +1030,6 @@ Pool.prototype.write = function(commands, options, cb) {
cb: cb, raw: false, promoteLongs: true, promoteValues: true, promoteBuffers: false, fullResult: false
};

var buffer = null

if(Array.isArray(commands)) {
buffer = [];

for(var i = 0; i < commands.length; i++) {
buffer.push(commands[i].toBin());
}

// Get the requestId
operation.requestId = commands[commands.length - 1].requestId;
} else {
operation.requestId = commands.requestId;
buffer = commands.toBin();
}

// Set the buffers
operation.buffer = buffer;

// Set the options for the parsing
operation.promoteLongs = typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true;
operation.promoteValues = typeof options.promoteValues == 'boolean' ? options.promoteValues : true;
Expand All @@ -997,7 +1040,6 @@ Pool.prototype.write = function(commands, options, cb) {
operation.command = typeof options.command == 'boolean' ? options.command : false;
operation.fullResult = typeof options.fullResult == 'boolean' ? options.fullResult : false;
operation.noResponse = typeof options.noResponse == 'boolean' ? options.noResponse : false;
// operation.requestId = options.requestId;

// Optional per operation socketTimeout
operation.socketTimeout = options.socketTimeout;
Expand All @@ -1007,25 +1049,45 @@ Pool.prototype.write = function(commands, options, cb) {
operation.socketTimeout = options.socketTimeout;
}

// We need to have a callback function unless the message returns no response
if(!(typeof cb == 'function') && !options.noResponse) {
throw new MongoError('write method must provide a callback');
// Ensure commands is an array
if (!Array.isArray(commands)) {
commands = [commands];
}

// If we have a monitoring operation schedule as the very first operation
// Otherwise add to back of queue
if(options.monitoring) {
this.queue.unshift(operation);
} else {
this.queue.push(operation);
}
// Get the requestId
operation.requestId = commands[commands.length - 1].requestId;

// Attempt to execute the operation
if(!self.executing) {
process.nextTick(function() {
_execute(self)();
});
}
// Prepare the operation buffer
serializeCommands(self, commands, [], function(err, serializedCommands) {
if (err) throw err;

// Set the operation's buffer to the serialization of the commands
operation.buffer = serializedCommands;

// If we have a monitoring operation schedule as the very first operation
// Otherwise add to back of queue
if(options.monitoring) {
self.queue.unshift(operation);
} else {
self.queue.push(operation);
}

// Attempt to execute the operation
if(!self.executing) {
process.nextTick(function() {
_execute(self)();
});
}
})

}

// Return whether a command contains an uncompressible command term
// Will return true if command contains no uncompressible command terms
var hasUncompressibleCommands = function(command) {
return uncompressibleCommands.some(function(cmd) {
return command.query.hasOwnProperty(cmd);
});
}

// Remove connection method
Expand Down
15 changes: 0 additions & 15 deletions lib/connection/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,25 +97,10 @@ var retrieveSnappy = function() {
return snappy;
}

// Parses the header of a wire protocol message
var parseHeader = function(message) {
return {
length: message.readInt32LE(0),
requestId: message.readInt32LE(4),
responseTo: message.readInt32LE(8),
opCode: message.readInt32LE(12)
}
}

exports.compressorIDs = {
snappy: 1,
zlib: 2
}
exports.setProperty = setProperty;
exports.getProperty = getProperty;
exports.getSingleProperty = getSingleProperty;
exports.copy = copy;
exports.debugOptions = debugOptions;
exports.retrieveBSON = retrieveBSON;
exports.retrieveSnappy = retrieveSnappy;
exports.parseHeader = parseHeader;
14 changes: 14 additions & 0 deletions lib/topologies/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,20 @@ var eventHandler = function(self, event) {
return;
}

// Determine whether the server is instructing us to use a compressor
if (result.result && result.result.compression) {
for (var i = 0; i < self.s.compression.compressors.length; i++) {
if (result.result.compression.indexOf(self.s.compression.compressors[i]) > -1) {
self.s.pool.options.agreedCompressor = self.s.compression.compressors[i]
break;
}
}

if (self.s.compression.zlibCompressionLevel) {
self.s.pool.options.zlibCompressionLevel = self.s.compression.zlibCompressionLevel;
}
}

// Ensure no error emitted after initial connect when reconnecting
self.initalConnect = false;
// Save the ismaster
Expand Down
4 changes: 2 additions & 2 deletions lib/topologies/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ function createCompressionInfo(options) {
// Check that all supplied compressors are valid
options.compression.compressors.forEach(function(compressor) {
if (compressor !== 'snappy' && compressor !== 'zlib') {
throw new Error('compressors must be at least one of snappy or zlib')
throw new Error('compressors must be at least one of snappy or zlib');
}
})

return options.compression.compressors
return options.compression.compressors;
}

function clone(object) {
Expand Down
64 changes: 64 additions & 0 deletions lib/wireprotocol/compression.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
var Snappy = require('../connection/utils').retrieveSnappy(),
zlib = require('zlib');

var compressorIDs = {
snappy: 1,
zlib: 2
}

var uncompressibleCommands = [
'ismaster',
'saslStart',
'saslContinue',
'getnonce',
'authenticate',
'createUser',
'updateUser',
'copydbSaslStart',
'copydbgetnonce',
'copydb'
];


// Facilitate compressing a message using an agreed compressor
var compress = function(self, dataToBeCompressed, callback) {
switch (self.options.agreedCompressor) {
case 'snappy':
Snappy.compress(dataToBeCompressed, callback);
break;
case 'zlib':
// Determine zlibCompressionLevel
var zlibOptions = {};
if (self.options.zlibCompressionLevel) {
zlibOptions.level = self.options.zlibCompressionLevel
}
zlib.deflate(dataToBeCompressed, zlibOptions, callback);
break;
default:
throw new Error('Attempt to compress message using unknown compressor \"' + self.options.agreedCompressor + '\".');
}
}

// Decompress a message using the given compressor
var decompress = function(compressorID, compressedData, callback) {
if (compressorID < 0 || compressorID > compressorIDs.length) {
throw new Error('Server sent message compressed using an unsupported compressor. (Received compressor ID ' + compressorID + ')');
}
switch (compressorID) {
case compressorIDs.snappy:
Snappy.uncompress(compressedData, callback);
break;
case compressorIDs.zlib:
zlib.inflate(compressedData, callback);
break;
default:
callback(null, compressedData);
}
}

module.exports = {
compressorIDs: compressorIDs,
uncompressibleCommands: uncompressibleCommands,
compress: compress,
decompress: decompress
}
13 changes: 12 additions & 1 deletion lib/wireprotocol/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,19 @@ var getReadPreference = function(cmd, options) {
return readPreference;
}

// Parses the header of a wire protocol message
var parseHeader = function(message) {
return {
length: message.readInt32LE(0),
requestId: message.readInt32LE(4),
responseTo: message.readInt32LE(8),
opCode: message.readInt32LE(12)
}
}

module.exports = {
getReadPreference: getReadPreference,
MESSAGE_HEADER_SIZE: MESSAGE_HEADER_SIZE,
opcodes: opcodes
opcodes: opcodes,
parseHeader: parseHeader
}
Loading

0 comments on commit 2356ffb

Please sign in to comment.