From 2356ffb2331a001135289847a24ee0190c749dc0 Mon Sep 17 00:00:00 2001 From: Sebastian Hallum Clarke Date: Wed, 5 Jul 2017 15:17:10 -0400 Subject: [PATCH] feat(compression): implement wire protocol compression support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NODE-1016, NODE-1027 * core work for getting compression message to travel * Inital compression message test with mock server * Renaming test * typo in runner.js * only emit fullsetup event once * Extending tests * Can decompress snappy messages * Adding zlib support * Reducing console logging verbosity * Debugging mock * Fixing mock to be able to send OP_COMPRESSED * Added mock tests for various OP_COMPRESSED reception cases * Edit no compression test to include doc insertion * Compression tests attempt inserting * Commenting out unneeded test * Added test to check for invalid compressor args * Allowing sending of outbound OP_COMPRESSED * Don't compress uncompressible messages * Removing changes to server_tests.js * Refactoring * Increasing test verbosity * (WIP) receiving compressed messages in the mock * (WIP) convering compressed mock msgs to original messages * Mock server can receive compressed messages * CRUD tests for mock server w/ compression * Mock test bug fixes * Added tests to mock to compress commands * Testing that uncompressible commands aren't compressed * Note in responses whether the message was originally compressed * Starting integration testing * Adding Wire Protocol Version detection to tests * Renaming test * Some auth tests in server_tests * Removed erroneous test statements * refactor(test-runner): don't check mongodb version if skip specified * Use actual server type in standalone SDAM events: Currently the server type inside the topologyDescriptionChanged event for a single topology is 'Standalone', which is unexpected. According to the SDAM specification a TopologyDescription has a list of ServerDescriptions, which had a type field indicating the type of server it is and does not state that the server type never changes if the topology is single. My expectation is that a single connection to a replica set primary, for example, would give me a topology type of 'Single' and a server type of 'RSPrimary', not 'Standalone' like it currently does. The serverDescriptionChanged event properly has the type set. * test(sdam): add test for emitting correct SDAM server type * Allow running tests with a particular version of mongod using the -v flag * Tidying comments * Create facility to spawn new mongod with options * Create compression test * Tidying snappy compression test implementation * Removing unneeded import * Tidying tests * Switch to using deflate/inflate instead of zip/unzip * Implementing async compression of outbound OP_COMPRESSED * Changing clashing port number * Using strict mode * Switching to use forEach * Tidying requires() * Removing version flag Use environment variable instead * Tidying up * Fixing typo * [NODE-1021] [NODE-1023] OP_COMPRESSED reception support and mock testing * core work for getting compression message to travel * Inital compression message test with mock server * Renaming test * typo in runner.js * Extending tests * Can decompress snappy messages * Adding zlib support * Reducing console logging verbosity * Debugging mock * Fixing mock to be able to send OP_COMPRESSED * Added mock tests for various OP_COMPRESSED reception cases * Edit no compression test to include doc insertion * Compression tests attempt inserting * Commenting out unneeded test * Added test to check for invalid compressor args * Add snappy to package.json * Removing use of "in" and debugging statements * Changing magic number for a constant * Remove extraneous console log * Fixing typo * Refactoring and bailing early * Fixing typo * chore(travis): only run CI for node LTS versions 4 & 6 * chore(travis): use containerized trusty builds * Changing OP_COMMAND to OP_COMPRESSED * Small refactor for reading a header * (WIP) refactoring decompression to be async * WIP Async decompression * Tweaking test to use config port * WIP Making decompression async * Validate compressorId * Async decompression now working * Updating tests (copying in from NODE-1027) * Adding strict mode So we can use "let", etc… on older versions of Node * Moving snappy to devDependencies * Simplifying code * Reducing reliance on constants * Fixing mis-named variable * Removing unneeded and unused function * Defining OP_COMMAND constant once * Improving code readability * Making snappy optional * Require_Optional Snappy * Making decompression code clearer * Making error message more informative * Removing debug message * Reordering function arguments so that "self" is first * Removing incorrect environment This environment has not yet been created in this branch * Using constants to decide compression mechanism * Removing unused code * Using !== instead of != * Using pre-release require_master * Tidying up * Tidying use of constants * Standardising use of compressorID and simplifying deciding which ID to use * Tidying up * Removing unneeded requires * Updating Travis Node version, require_optional version, and creating package_lock * Tidying Response creation * Removing MongoDB 2.4.x from Travis * Moving OPCODE numbers to wireprotocol/shared.js * core work for getting compression message to travel * Inital compression message test with mock server * Extending tests * Can decompress snappy messages * Adding zlib support * Reducing console logging verbosity * Debugging mock * Fixing mock to be able to send OP_COMPRESSED * Added mock tests for various OP_COMPRESSED reception cases * Compression tests attempt inserting * Commenting out unneeded test * Allowing sending of outbound OP_COMPRESSED * Don't compress uncompressible messages * Removing changes to server_tests.js * Refactoring * (WIP) receiving compressed messages in the mock * (WIP) convering compressed mock msgs to original messages * Mock server can receive compressed messages * CRUD tests for mock server w/ compression * Mock test bug fixes * Added tests to mock to compress commands * Starting integration testing * Adding Wire Protocol Version detection to tests * Renaming test * Some auth tests in server_tests * Removed erroneous test statements * Allow running tests with a particular version of mongod using the -v flag * Tidying comments * Create facility to spawn new mongod with options * Create compression test * Tidying snappy compression test implementation * Tidying tests * Switch to using deflate/inflate instead of zip/unzip * Implementing async compression of outbound OP_COMPRESSED * Switching to use forEach * Removing version flag Use environment variable instead * Tidying up * Post-rebase tidying * Tidying code * Melding branches together * Fixing comment * Adding test topologies * Improving how zlib compression level is set There is no need to tell zlib to use the default compression level. * Fixing compatibility with Node v4 Node v4 seems to have difficulty supporting Buffer.fill(someBuffer). * Moving parseHeader * Moving compressorIDs * Moving uncompressibleCommands * Moving compress and decompress * Moving operation construction out of promise * Removing use of promises in command serialization * Fixing typo * Refactoring hasUncompressibleCommands --- lib/connection/connection.js | 23 +- lib/connection/pool.js | 136 ++++++++--- lib/connection/utils.js | 15 -- lib/topologies/server.js | 14 ++ lib/topologies/shared.js | 4 +- lib/wireprotocol/compression.js | 64 ++++++ lib/wireprotocol/shared.js | 13 +- test/mock/lib/request.js | 10 +- test/mock/lib/server.js | 60 ++++- test/runner.js | 7 + .../single_topology_tests.js | 27 +-- test/tests/functional/server_tests.js | 211 ++++++++++++++++-- .../single_mocks/compression_tests.js | 17 +- 13 files changed, 475 insertions(+), 126 deletions(-) create mode 100644 lib/wireprotocol/compression.js diff --git a/lib/connection/connection.js b/lib/connection/connection.js index 2a16337ce..b6d9851a5 100644 --- a/lib/connection/connection.js +++ b/lib/connection/connection.js @@ -7,8 +7,8 @@ var inherits = require('util').inherits , crypto = require('crypto') , f = require('util').format , debugOptions = require('./utils').debugOptions - , parseHeader = require('./utils').parseHeader - , compressorIDs = require('./utils').compressorIDs + , parseHeader = require('../wireprotocol/shared').parseHeader + , decompress = require('../wireprotocol/compression').decompress , Response = require('./commands').Response , MongoError = require('../error') , Logger = require('./logger') @@ -82,7 +82,7 @@ var Connection = function(messageHandler, options) { this.host = options.host || 'localhost'; this.family = typeof options.family == 'number' ? options.family : 4; this.keepAlive = typeof options.keepAlive == 'boolean' ? options.keepAlive : true; - this.keepAliveInitialDelay = typeof options.keepAliveInitialDelay == 'number' + this.keepAliveInitialDelay = typeof options.keepAliveInitialDelay == 'number' ? options.keepAliveInitialDelay : 300000; this.noDelay = typeof options.noDelay == 'boolean' ? options.noDelay : true; this.connectionTimeout = typeof options.connectionTimeout == 'number' @@ -221,23 +221,6 @@ var closeHandler = function(self) { } } -// Decompress a message using the given compressor -var decompress = function(compressorID, compressedData, callback) { - if (compressorID < 0 || compressorID > compressorIDs.length) { - throw new Error('Server sent message compressed using an unsupported compressor. (Received compressor ID ' + compressorID + ')'); - } - switch (compressorID) { - case compressorIDs.snappy: - Snappy.uncompress(compressedData, callback); - break; - case compressorIDs.zlib: - zlib.inflate(compressedData, callback); - break; - default: - callback(null, compressedData); - } -} - // Handle a message once it is recieved var emitMessageHandler = function (self, message) { var msgHeader = parseHeader(message); diff --git a/lib/connection/pool.js b/lib/connection/pool.js index 07771db20..d7818347e 100644 --- a/lib/connection/pool.js +++ b/lib/connection/pool.js @@ -8,7 +8,14 @@ var inherits = require('util').inherits, f = require('util').format, Query = require('./commands').Query, CommandResult = require('./command_result'), - assign = require('../utils').assign; + assign = require('../utils').assign, + Snappy = require('./utils').retrieveSnappy(), + zlib = require('zlib'), + MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE, + opcodes = require('../wireprotocol/shared').opcodes, + compress = require('../wireprotocol/compression').compress, + compressorIDs = require('../wireprotocol/compression').compressorIDs, + uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands; var MongoCR = require('../auth/mongocr') , X509 = require('../auth/x509') @@ -918,6 +925,56 @@ Pool.prototype.destroy = function(force) { checkStatus(); } +// Prepare the buffer that Pool.prototype.write() uses to send to the server +var serializeCommands = function(self, commands, result, callback) { + // Base case when there are no more commands to serialize + if (commands.length === 0) return callback(null, result); + + // Pop off the zeroth command and serialize it + var thisCommand = commands.shift(); + var originalCommandBuffer = thisCommand.toBin(); + + // Check whether we and the server have agreed to use a compressor + if (self.options.agreedCompressor && !hasUncompressibleCommands(thisCommand)) { + // Transform originalCommandBuffer into OP_COMPRESSED + var concatenatedOriginalCommandBuffer = Buffer.concat(originalCommandBuffer); + var messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE); + + // Extract information needed for OP_COMPRESSED from the uncompressed message + var originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12); + + // Compress the message body + compress(self, messageToBeCompressed, function(err, compressedMessage) { + if (err) return callback(err, null); + + // Create the msgHeader of OP_COMPRESSED + var msgHeader = new Buffer(MESSAGE_HEADER_SIZE); + msgHeader.writeInt32LE(MESSAGE_HEADER_SIZE + 9 + compressedMessage.length, 0); // messageLength + msgHeader.writeInt32LE(thisCommand.requestId, 4); // requestID + msgHeader.writeInt32LE(0, 8); // responseTo (zero) + msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode + + // Create the compression details of OP_COMPRESSED + var compressionDetails = new Buffer(9); + compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode + compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader + compressionDetails.writeUInt8(compressorIDs[self.options.agreedCompressor], 8); // compressorID + + // Push the concatenation of the OP_COMPRESSED message onto results + result.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage])); + + // Continue recursing through the commands array + serializeCommands(self, commands, result, callback); + }) + } else { + // Push the serialization of the command onto results + result.push(originalCommandBuffer); + + // Continue recursing through the commands array + serializeCommands(self, commands, result, callback); + } +} + /** * Write a message to MongoDB * @method @@ -933,6 +990,11 @@ Pool.prototype.write = function(commands, options, cb) { // Always have options options = options || {}; + // We need to have a callback function unless the message returns no response + if(!(typeof cb == 'function') && !options.noResponse) { + throw new MongoError('write method must provide a callback'); + } + // Pool was destroyed error out if(this.state == DESTROYED || this.state == DESTROYING) { // Callback with an error @@ -968,25 +1030,6 @@ Pool.prototype.write = function(commands, options, cb) { cb: cb, raw: false, promoteLongs: true, promoteValues: true, promoteBuffers: false, fullResult: false }; - var buffer = null - - if(Array.isArray(commands)) { - buffer = []; - - for(var i = 0; i < commands.length; i++) { - buffer.push(commands[i].toBin()); - } - - // Get the requestId - operation.requestId = commands[commands.length - 1].requestId; - } else { - operation.requestId = commands.requestId; - buffer = commands.toBin(); - } - - // Set the buffers - operation.buffer = buffer; - // Set the options for the parsing operation.promoteLongs = typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true; operation.promoteValues = typeof options.promoteValues == 'boolean' ? options.promoteValues : true; @@ -997,7 +1040,6 @@ Pool.prototype.write = function(commands, options, cb) { operation.command = typeof options.command == 'boolean' ? options.command : false; operation.fullResult = typeof options.fullResult == 'boolean' ? options.fullResult : false; operation.noResponse = typeof options.noResponse == 'boolean' ? options.noResponse : false; - // operation.requestId = options.requestId; // Optional per operation socketTimeout operation.socketTimeout = options.socketTimeout; @@ -1007,25 +1049,45 @@ Pool.prototype.write = function(commands, options, cb) { operation.socketTimeout = options.socketTimeout; } - // We need to have a callback function unless the message returns no response - if(!(typeof cb == 'function') && !options.noResponse) { - throw new MongoError('write method must provide a callback'); + // Ensure commands is an array + if (!Array.isArray(commands)) { + commands = [commands]; } - // If we have a monitoring operation schedule as the very first operation - // Otherwise add to back of queue - if(options.monitoring) { - this.queue.unshift(operation); - } else { - this.queue.push(operation); - } + // Get the requestId + operation.requestId = commands[commands.length - 1].requestId; - // Attempt to execute the operation - if(!self.executing) { - process.nextTick(function() { - _execute(self)(); - }); - } + // Prepare the operation buffer + serializeCommands(self, commands, [], function(err, serializedCommands) { + if (err) throw err; + + // Set the operation's buffer to the serialization of the commands + operation.buffer = serializedCommands; + + // If we have a monitoring operation schedule as the very first operation + // Otherwise add to back of queue + if(options.monitoring) { + self.queue.unshift(operation); + } else { + self.queue.push(operation); + } + + // Attempt to execute the operation + if(!self.executing) { + process.nextTick(function() { + _execute(self)(); + }); + } + }) + +} + +// Return whether a command contains an uncompressible command term +// Will return true if command contains no uncompressible command terms +var hasUncompressibleCommands = function(command) { + return uncompressibleCommands.some(function(cmd) { + return command.query.hasOwnProperty(cmd); + }); } // Remove connection method diff --git a/lib/connection/utils.js b/lib/connection/utils.js index b95c8d19d..8e68e6962 100644 --- a/lib/connection/utils.js +++ b/lib/connection/utils.js @@ -97,20 +97,6 @@ var retrieveSnappy = function() { return snappy; } -// Parses the header of a wire protocol message -var parseHeader = function(message) { - return { - length: message.readInt32LE(0), - requestId: message.readInt32LE(4), - responseTo: message.readInt32LE(8), - opCode: message.readInt32LE(12) - } -} - -exports.compressorIDs = { - snappy: 1, - zlib: 2 -} exports.setProperty = setProperty; exports.getProperty = getProperty; exports.getSingleProperty = getSingleProperty; @@ -118,4 +104,3 @@ exports.copy = copy; exports.debugOptions = debugOptions; exports.retrieveBSON = retrieveBSON; exports.retrieveSnappy = retrieveSnappy; -exports.parseHeader = parseHeader; diff --git a/lib/topologies/server.js b/lib/topologies/server.js index 8fe7329f3..534a5b151 100644 --- a/lib/topologies/server.js +++ b/lib/topologies/server.js @@ -265,6 +265,20 @@ var eventHandler = function(self, event) { return; } + // Determine whether the server is instructing us to use a compressor + if (result.result && result.result.compression) { + for (var i = 0; i < self.s.compression.compressors.length; i++) { + if (result.result.compression.indexOf(self.s.compression.compressors[i]) > -1) { + self.s.pool.options.agreedCompressor = self.s.compression.compressors[i] + break; + } + } + + if (self.s.compression.zlibCompressionLevel) { + self.s.pool.options.zlibCompressionLevel = self.s.compression.zlibCompressionLevel; + } + } + // Ensure no error emitted after initial connect when reconnecting self.initalConnect = false; // Save the ismaster diff --git a/lib/topologies/shared.js b/lib/topologies/shared.js index 95f8ae003..02796c3b0 100644 --- a/lib/topologies/shared.js +++ b/lib/topologies/shared.js @@ -64,11 +64,11 @@ function createCompressionInfo(options) { // Check that all supplied compressors are valid options.compression.compressors.forEach(function(compressor) { if (compressor !== 'snappy' && compressor !== 'zlib') { - throw new Error('compressors must be at least one of snappy or zlib') + throw new Error('compressors must be at least one of snappy or zlib'); } }) - return options.compression.compressors + return options.compression.compressors; } function clone(object) { diff --git a/lib/wireprotocol/compression.js b/lib/wireprotocol/compression.js new file mode 100644 index 000000000..3e9f4b669 --- /dev/null +++ b/lib/wireprotocol/compression.js @@ -0,0 +1,64 @@ +var Snappy = require('../connection/utils').retrieveSnappy(), + zlib = require('zlib'); + +var compressorIDs = { + snappy: 1, + zlib: 2 +} + +var uncompressibleCommands = [ + 'ismaster', + 'saslStart', + 'saslContinue', + 'getnonce', + 'authenticate', + 'createUser', + 'updateUser', + 'copydbSaslStart', + 'copydbgetnonce', + 'copydb' +]; + + +// Facilitate compressing a message using an agreed compressor +var compress = function(self, dataToBeCompressed, callback) { + switch (self.options.agreedCompressor) { + case 'snappy': + Snappy.compress(dataToBeCompressed, callback); + break; + case 'zlib': + // Determine zlibCompressionLevel + var zlibOptions = {}; + if (self.options.zlibCompressionLevel) { + zlibOptions.level = self.options.zlibCompressionLevel + } + zlib.deflate(dataToBeCompressed, zlibOptions, callback); + break; + default: + throw new Error('Attempt to compress message using unknown compressor \"' + self.options.agreedCompressor + '\".'); + } +} + +// Decompress a message using the given compressor +var decompress = function(compressorID, compressedData, callback) { + if (compressorID < 0 || compressorID > compressorIDs.length) { + throw new Error('Server sent message compressed using an unsupported compressor. (Received compressor ID ' + compressorID + ')'); + } + switch (compressorID) { + case compressorIDs.snappy: + Snappy.uncompress(compressedData, callback); + break; + case compressorIDs.zlib: + zlib.inflate(compressedData, callback); + break; + default: + callback(null, compressedData); + } +} + +module.exports = { + compressorIDs: compressorIDs, + uncompressibleCommands: uncompressibleCommands, + compress: compress, + decompress: decompress +} diff --git a/lib/wireprotocol/shared.js b/lib/wireprotocol/shared.js index 6f1d54007..4b596b85f 100644 --- a/lib/wireprotocol/shared.js +++ b/lib/wireprotocol/shared.js @@ -37,8 +37,19 @@ var getReadPreference = function(cmd, options) { return readPreference; } +// Parses the header of a wire protocol message +var parseHeader = function(message) { + return { + length: message.readInt32LE(0), + requestId: message.readInt32LE(4), + responseTo: message.readInt32LE(8), + opCode: message.readInt32LE(12) + } +} + module.exports = { getReadPreference: getReadPreference, MESSAGE_HEADER_SIZE: MESSAGE_HEADER_SIZE, - opcodes: opcodes + opcodes: opcodes, + parseHeader: parseHeader } diff --git a/test/mock/lib/request.js b/test/mock/lib/request.js index cbaa5898c..6b272283e 100644 --- a/test/mock/lib/request.js +++ b/test/mock/lib/request.js @@ -2,7 +2,7 @@ var Long = require('bson').Long, Snappy = require('./../../../lib/connection/utils').retrieveSnappy(), zlib = require('zlib'), opcodes = require('../../../lib/wireprotocol/shared').opcodes, - compressorIDs = require('../../../lib/connection/utils').compressorIDs; + compressorIDs = require('../../../lib/wireprotocol/compression').compressorIDs; /* * Request class @@ -46,7 +46,6 @@ Request.prototype.reply = function(documents, options) { // Header field responseTo: this.response.requestId, requestId: this.response.requestId + 1, - originalOpCode: options.originalOpCode, compressorID: compressorIDs[options.compression.compressor] || 0 }) @@ -109,11 +108,12 @@ var Response = function(bson, documents, options) { } /** - * @ignore - * Preparing a compressed response of the OP_COMPRESSED type - */ +* @ignore +* Preparing a compressed response of the OP_COMPRESSED type +*/ var CompressedResponse = function(bson, uncompressedResponse, options) { this.bson = bson; + // Header this.requestId = options.requestId; this.responseTo = options.responseTo; diff --git a/test/mock/lib/server.js b/test/mock/lib/server.js index 8e45273c1..b767f0870 100644 --- a/test/mock/lib/server.js +++ b/test/mock/lib/server.js @@ -1,6 +1,11 @@ var net = require('net'), Long = require('bson').Long, BSON = require('bson'), + Snappy = require('snappy'), + zlib = require('zlib'), + MESSAGE_HEADER_SIZE = require('../../../lib/wireprotocol/shared').MESSAGE_HEADER_SIZE, + opcodes = require('../../../lib/wireprotocol/shared').opcodes, + compressorIDs = require('../../../lib/wireprotocol/compression').compressorIDs, Request = require('./request'), Query = require('./protocol').Query, GetMore = require('./protocol').GetMore, @@ -122,20 +127,61 @@ Server.prototype.receive = function() { var protocol = function(self, message) { var index = 0 - // Get the opCode for the message + self.isCompressed = false; + // Get the size for the message var size = message[index++] | message[index++] << 8 | message[index++] << 16 | message[index++] << 24; if(size != message.length) throw new Error('corrupt wire protocol message'); // Adjust to opcode index = 12; // Get the opCode for the message var type = message[index++] | message[index++] << 8 | message[index++] << 16 | message[index++] << 24; + + // Unpack and decompress if the message is OP_COMPRESSED + if(type == opcodes.OP_COMPRESSED) { + var requestID = message.readInt32LE(4) + var responseTo = message.readInt32LE(8) + var originalOpcode = message.readInt32LE(16) + var uncompressedSize = message.readInt32LE(20) + var compressorID = message.readUInt8(24) + + var compressedData = message.slice(25) + switch (compressorID) { + case compressorIDs.snappy: + var uncompressedData = Snappy.uncompressSync(compressedData) + break; + case compressorIDs.zlib: + var uncompressedData = zlib.inflateSync(compressedData) + break; + default: + var uncompressedData = compressedData; + } + + if (uncompressedData.length !== uncompressedSize) { + throw new Error('corrupt wire protocol message: uncompressed message is not the correct size') + } + + // Reconstruct the msgHeader of the uncompressed opcode + var newMsgHeader = Buffer(MESSAGE_HEADER_SIZE); + newMsgHeader.writeInt32LE(MESSAGE_HEADER_SIZE + uncompressedData.length, 0) + newMsgHeader.writeInt32LE(requestID, 4) + newMsgHeader.writeInt32LE(responseTo, 8) + newMsgHeader.writeInt32LE(originalOpcode, 12) + + // Full uncompressed message + var message = Buffer.concat([newMsgHeader, uncompressedData]) + var type = originalOpcode + + // Compressed flag + self.isCompressed = true; + } + // Switch on type - if(type == 2001) return new Update(self.bson, message); - if(type == 2002) return new Insert(self.bson, message); - if(type == 2004) return new Query(self.bson, message); - if(type == 2005) return new GetMore(self.bson, message); - if(type == 2006) return new Delete(self.bson, message); - if(type == 2007) return new KillCursor(self.bson, message); + if(type == opcodes.OP_UPDATE) return new Update(self.bson, message); + if(type == opcodes.OP_INSERT) return new Insert(self.bson, message); + if(type == opcodes.OP_QUERY) return new Query(self.bson, message); + if(type == opcodes.OP_GETMORE) return new GetMore(self.bson, message); + if(type == opcodes.OP_DELETE) return new Delete(self.bson, message); + if(type == opcodes.OP_KILL_CURSORS) return new KillCursor(self.bson, message); throw new Error('unknown wire protocol message type'); } diff --git a/test/runner.js b/test/runner.js index 8a757e969..9ec6eee88 100644 --- a/test/runner.js +++ b/test/runner.js @@ -334,6 +334,13 @@ if(argv.t == 'functional') { }) } + if (argv.e == 'snappyCompression') { + config.manager = new ServerManager('mongod', { + dbpath: path.join(path.resolve('db'), f("data-%d", 27017)), + networkMessageCompressors: 'snappy' + }) + } + if(argv.e == 'replicaset') { config = { host: 'localhost', port: 31000, setName: 'rs' diff --git a/test/tests/functional/sdam_monitoring_mocks/single_topology_tests.js b/test/tests/functional/sdam_monitoring_mocks/single_topology_tests.js index 3398b2db9..1fad0c543 100644 --- a/test/tests/functional/sdam_monitoring_mocks/single_topology_tests.js +++ b/test/tests/functional/sdam_monitoring_mocks/single_topology_tests.js @@ -79,38 +79,31 @@ exports['Should correctly emit sdam monitoring events for single server'] = { // Add event listeners server.once('connect', function(_server) { - // console.log("!!!!!!!!!!!!!!!!!!!!!!!!!! connect") id = _server.id; _server.destroy({emitClose:true}); }); server.on('serverOpening', function(event) { - // console.log("------------- serverOpening") flags[0] = event; }); server.on('serverClosed', function(event) { - // console.log("------------- serverClosed") flags[1] = event; }); server.on('serverDescriptionChanged', function(event) { - // console.log("------------- serverDescriptionChanged") flags[2] = event; }); server.on('topologyOpening', function(event) { - // console.log("------------- topologyOpening") flags[3] = event; }); server.on('topologyClosed', function(event) { - // console.log("------------- topologyClosed") flags[4] = event; }); server.on('topologyDescriptionChanged', function(event) { - // console.log("------------- topologyDescriptionChanged") flags[5] = event; }); @@ -170,7 +163,7 @@ exports['Should correctly emit sdam monitoring events for single server'] = { }, 100); }); - setTimeout(function() { server.connect(); }, 100); + process.nextTick(function() { server.connect(); }); } } @@ -201,7 +194,7 @@ exports['Should correctly emit sdam monitoring events for single server, with co "maxWireVersion" : 3, "minWireVersion" : 0, "ok" : 1, - "hosts": [ "localhost:37008" ] // <-- this makes it an RSPrimary + "hosts": [ "a:27017", "b:27017" ] // <-- this makes it an RSPrimary } // Primary server states @@ -217,12 +210,12 @@ exports['Should correctly emit sdam monitoring events for single server, with co // Boot the mock var __server; co(function*() { - __server = yield mockupdb.createServer(37008, 'localhost'); + server = yield mockupdb.createServer(37008, 'localhost'); // Primary state machine co(function*() { while(running) { - var request = yield __server.receive(); + var request = yield server.receive(); // Get the document var doc = request.document; @@ -248,38 +241,31 @@ exports['Should correctly emit sdam monitoring events for single server, with co // Add event listeners server.once('connect', function(_server) { - // console.log("!!!!!!!!!!!!!!!!!!!!!!!!!! connect 1") id = _server.id; _server.destroy({emitClose:true}); }); server.on('serverOpening', function(event) { - // console.log("------------- serverOpening") flags[0] = event; }); server.on('serverClosed', function(event) { - // console.log("------------- serverClosed") flags[1] = event; }); server.on('serverDescriptionChanged', function(event) { - // console.log("------------- serverDescriptionChanged") flags[2] = event; }); server.on('topologyOpening', function(event) { - // console.log("------------- topologyOpening") flags[3] = event; }); server.on('topologyClosed', function(event) { - // console.log("------------- topologyClosed") flags[4] = event; }); server.on('topologyDescriptionChanged', function(event) { - // console.log("------------- topologyDescriptionChanged") flags[5] = event; }); @@ -332,13 +318,10 @@ exports['Should correctly emit sdam monitoring events for single server, with co ] } }, flags[5]); - running = false; - __server.destroy(); - test.done(); }, 100); }); - setTimeout(function() { server.connect(); }, 100); + process.nextTick(function() { server.connect(); }); } } diff --git a/test/tests/functional/server_tests.js b/test/tests/functional/server_tests.js index 0cb546fba..0eb750480 100644 --- a/test/tests/functional/server_tests.js +++ b/test/tests/functional/server_tests.js @@ -1,7 +1,11 @@ "use strict"; -var f = require('util').format - , Long = require('bson').Long; +let f = require('util').format + , Long = require('bson').Long + , locateAuthMethod = require('./shared').locateAuthMethod + , executeCommand = require('./shared').executeCommand; + +const WIRE_PROTOCOL_COMPRESSION_SUPPORT_MIN_VERSION = 5 exports['Should correctly connect server to single instance'] = { metadata: { requires: { topology: "single" } }, @@ -127,34 +131,32 @@ exports['Should correctly connect server to single instance and execute insert'] } } -exports['Should correctly connect server to single instance and execute insert (with compression if supported by the server)'] = { - metadata: { requires: { topology: ["single"] } }, +exports['Should correctly connect server to single instance and send an uncompressed message if an uncompressible command is specified'] = { + metadata: { requires: { topology: "single" } }, test: function(configuration, test) { var Server = require('../../../lib/topologies/server') - , bson = require('bson'); + , bson = require('bson') + , ReadPreference = configuration.require.ReadPreference; // Attempt to connect var server = new Server({ host: configuration.host , port: configuration.port , bson: new bson() - , compression: { compressors: ['snappy'] } + , compression: { compressors: ['snappy', 'zlib'] } }) // Add event listeners server.on('connect', function(server) { - server.insert('integration_tests.inserts', {a:1}, function(err, r) { + server.command("system.$cmd", {ismaster: true}, {readPreference: new ReadPreference('primary')}, function(err, result) { + if (err) { + console.log(err) + } test.equal(null, err); - test.equal(1, r.result.n); - server.insert('integration_tests.inserts', {a:1}, {ordered:false}, function(err, r) { - test.equal(null, err); - test.equal(1, r.result.n); - - server.destroy(); - test.done(); - }); + server.destroy(); + test.done(); }); }); @@ -768,7 +770,7 @@ exports['Should correctly promoteValues when calling getMore on queries'] = { var cursor = server.cursor(ns, { find: ns , query: {} - , limit: 102 + , limit: 102 }, { promoteValues: false }); @@ -784,6 +786,7 @@ exports['Should correctly promoteValues when calling getMore on queries'] = { test.equal(typeof doc.long, 'object'); test.equal(doc.long._bsontype, 'Long'); test.equal(typeof doc.double, 'object'); + test.equal(doc.double._bsontype, 'Double'); // Call next callNext(cursor); @@ -798,3 +801,179 @@ exports['Should correctly promoteValues when calling getMore on queries'] = { server.connect(); } } + +exports['Should error when invalid compressors are specified'] = { + metadata: { requires: { topology: "single" } }, + + test: function(configuration, test) { + var Server = require('../../../lib/topologies/server') + , bson = require('bson'); + + // Attempt to connect + try { + var server = new Server({ + host: configuration.host + , port: configuration.port + , bson: new bson() + , compression: { compressors: ['notACompressor', 'alsoNotACompressor', 'snappy'] } + }) + } catch(err) { + test.equal('compressors must be at least one of snappy or zlib', err.message); + test.done(); + } + } +} + +exports['Should correctly connect server specifying compression to single instance with authentication and insert documents'] = { + metadata: { requires: { topology: ["auth", "snappyCompression"] } }, + + test: function(configuration, test) { + var Server = require('../../../lib/topologies/server') + , Connection = require('../../../lib/connection/connection') + , bson = require('bson') + , Query = require('../../../lib/connection/commands').Query; + + + Connection.enableConnectionAccounting(); + + configuration.manager.restart(true).then(function() { + locateAuthMethod(configuration, function(err, method) { + test.equal(null, err); + + // Attempt to connect + executeCommand(configuration, 'admin', { + createUser: 'root', + pwd: "root", + roles: [ { role: "root", db: "admin" } ], + digestPassword: true + }, function(err, r) { + var server = new Server({ + host: configuration.host + , port: configuration.port + , bson: new bson() + , compression: { compressors: ['snappy', 'zlib'] } + }); + + // Add event listeners + server.on('connect', function(server) { + server.insert('integration_tests.inserts', {a:1}, function(err, r) { + test.equal(null, err); + test.equal(1, r.result.n); + + server.insert('integration_tests.inserts', {a:1}, {ordered:false}, function(err, r) { + test.equal(null, err); + test.equal(1, r.result.n); + + server.destroy(); + Connection.disableConnectionAccounting(); + test.done(); + }); + }); + }); + + server.connect({auth: [method, 'admin', 'root', 'root']}); + }); + }); + }); + } +} + +exports['Should fail to connect server specifying compression to single instance with incorrect authentication credentials'] = { + metadata: { requires: { topology: ["auth", "snappyCompression"] } }, + + test: function(configuration, test) { + var Server = require('../../../lib/topologies/server') + , Connection = require('../../../lib/connection/connection') + , bson = require('bson') + , Query = require('../../../lib/connection/commands').Query; + + + Connection.enableConnectionAccounting(); + + configuration.manager.restart(true).then(function() { + locateAuthMethod(configuration, function(err, method) { + test.equal(null, err); + + // Attempt to connect + executeCommand(configuration, 'admin', { + createUser: 'root', + pwd: "root", + roles: [ { role: "root", db: "admin" } ], + digestPassword: true + }, function(err, r) { + var server = new Server({ + host: configuration.host + , port: configuration.port + , bson: new bson() + , compression: { compressors: ['snappy', 'zlib'] } + }); + + // Add event listeners + server.on('error', function() { + test.equal(0, Object.keys(Connection.connections()).length); + Connection.disableConnectionAccounting(); + test.done(); + }); + + server.connect({auth: [method, 'admin', 'root2', 'root']}); + }); + }); + }); + } +} + +exports['Should correctly connect server to single instance and execute insert with snappy compression if supported by the server'] = { + metadata: { requires: { topology: ["single", "snappyCompression"] } }, + + test: function(configuration, test) { + var Server = require('../../../lib/topologies/server') + , bson = require('bson'); + + // Attempt to connect to server + var server = new Server({ + host: configuration.host + , port: configuration.port + , bson: new bson() + , compression: { + compressors: ['snappy', 'zlib'] + } + }) + + // Add event listeners + server.on('connect', function(server) { + let envShouldSupportCompression = configuration.manager.options.networkMessageCompressors == 'snappy' && server.ismaster.maxWireVersion >= WIRE_PROTOCOL_COMPRESSION_SUPPORT_MIN_VERSION; + + // Check compression has been negotiated + if (envShouldSupportCompression) { + test.equal('snappy', server.s.pool.options.agreedCompressor); + } + + server.insert('integration_tests.inserts', {a:1}, function(err, r) { + test.equal(null, err); + test.equal(1, r.result.n); + if (envShouldSupportCompression) { + test.equal(true, r.message.fromCompressed); + } else { + test.equal(true, r.message.fromCompressed == false || r.message.fromCompressed == undefined); + } + + server.insert('integration_tests.inserts', {a:2}, {ordered:false}, function(err, r) { + test.equal(null, err); + test.equal(1, r.result.n); + if (envShouldSupportCompression) { + test.equal(true, r.message.fromCompressed); + } else { + test.equal(true, r.message.fromCompressed == false || r.message.fromCompressed == undefined); + } + + server.destroy(); + test.done(); + }); + }); + }); + + // Start connection + server.connect(); + + } +} diff --git a/test/tests/functional/single_mocks/compression_tests.js b/test/tests/functional/single_mocks/compression_tests.js index 624409589..c04aad76a 100644 --- a/test/tests/functional/single_mocks/compression_tests.js +++ b/test/tests/functional/single_mocks/compression_tests.js @@ -40,6 +40,7 @@ exports['server should recieve list of client\'s supported compressors in handsh test.equal(request.response.documents[0].compression[0], 'snappy'); test.equal(request.response.documents[0].compression[1], 'zlib'); request.reply(serverResponse); + running = false } }); @@ -114,15 +115,19 @@ exports['should connect and insert document when server is responding with OP_CO if (currentStep == 0) { test.equal(request.response.documents[0].compression[0], 'snappy'); test.equal(request.response.documents[0].compression[1], 'zlib'); + test.equal(server.isCompressed, false); // Acknowledge connection using OP_COMPRESSED with no compression request.reply(serverResponse, { compression: { compressor: "no_compression"}}); } else if (currentStep == 1) { + test.equal(server.isCompressed, false); // Acknowledge insertion using OP_COMPRESSED with no compression request.reply({ok:1, n: doc.documents.length, lastOp: new Date()}, { compression: { compressor: "no_compression"}}); } else if (currentStep == 2 || currentStep == 3) { + test.equal(server.isCompressed, false); // Acknowledge update using OP_COMPRESSED with no compression request.reply({ok:1, n: 1}, { compression: { compressor: "no_compression"}}); } else if (currentStep == 4) { + test.equal(server.isCompressed, false); request.reply({ok:1}, { compression: {compressor: "no_compression"}}) } currentStep++; @@ -226,15 +231,19 @@ exports['should connect and insert document when server is responding with OP_CO if (currentStep == 0) { test.equal(request.response.documents[0].compression[0], 'snappy'); test.equal(request.response.documents[0].compression[1], 'zlib'); + test.equal(server.isCompressed, false); // Acknowledge connection using OP_COMPRESSED with snappy request.reply(serverResponse, { compression: { compressor: "snappy"}}); } else if (currentStep == 1) { + test.equal(server.isCompressed, true); // Acknowledge insertion using OP_COMPRESSED with snappy request.reply({ok:1, n: doc.documents.length, lastOp: new Date()}, { compression: { compressor: "snappy"}}); } else if (currentStep == 2 || currentStep == 3) { + test.equal(server.isCompressed, true); // Acknowledge update using OP_COMPRESSED with snappy request.reply({ok:1, n: 1}, { compression: { compressor: "snappy"}}); } else if (currentStep == 4) { + test.equal(server.isCompressed, true); request.reply({ok:1}, { compression: {compressor: "snappy"}}) } currentStep++; @@ -284,7 +293,6 @@ exports['should connect and insert document when server is responding with OP_CO }); }) }) - }) }); @@ -338,15 +346,19 @@ exports['should connect and insert document when server is responding with OP_CO if (currentStep == 0) { test.equal(request.response.documents[0].compression[0], 'snappy'); test.equal(request.response.documents[0].compression[1], 'zlib'); + test.equal(server.isCompressed, false); // Acknowledge connection using OP_COMPRESSED with zlib request.reply(serverResponse, { compression: { compressor: "zlib"}}); } else if (currentStep == 1) { + test.equal(server.isCompressed, true); // Acknowledge insertion using OP_COMPRESSED with zlib request.reply({ok:1, n: doc.documents.length, lastOp: new Date()}, { compression: { compressor: "zlib"}}); } else if (currentStep == 2 || currentStep == 3) { // Acknowledge update using OP_COMPRESSED with zlib + test.equal(server.isCompressed, true); request.reply({ok:1, n: 1}, { compression: { compressor: "zlib"}}); } else if (currentStep == 4) { + test.equal(server.isCompressed, true); request.reply({ok:1}, { compression: {compressor: "zlib"}}) } currentStep++; @@ -450,12 +462,15 @@ exports['should not compress uncompressible commands'] = { if (currentStep == 0) { test.equal(request.response.documents[0].compression[0], 'snappy'); test.equal(request.response.documents[0].compression[1], 'zlib'); + test.equal(server.isCompressed, false); // Acknowledge connection using OP_COMPRESSED with snappy request.reply(serverResponse, { compression: { compressor: "snappy"}}); } else if (currentStep == 1) { + test.equal(server.isCompressed, true); // Acknowledge ping using OP_COMPRESSED with snappy request.reply({ok:1}, { compression: {compressor: "snappy"}}) } else if (currentStep >= 2) { + test.equal(server.isCompressed, false); // Acknowledge further uncompressible commands using OP_COMPRESSED with snappy request.reply({ok:1}, { compression: {compressor: "snappy"}}) }