Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
[NODE-1021] [NODE-1023] OP_COMPRESSED reception support and mock testing
Browse files Browse the repository at this point in the history
* core work for getting compression message to travel

* Inital compression message test with mock server

* Renaming test

* typo in runner.js

* Extending tests

* Can decompress snappy messages

* Adding zlib support

* Reducing console logging verbosity

* Debugging mock

* Fixing mock to be able to send OP_COMPRESSED

* Added mock tests for various OP_COMPRESSED reception cases

* Edit no compression test to include doc insertion

* Compression tests attempt inserting

* Commenting out unneeded test

* Added test to check for invalid compressor args

* Add snappy to package.json

* Removing use of "in" and debugging statements

* Changing magic number for a constant

* Remove extraneous console log

* Fixing typo

* Refactoring and bailing early

* Fixing typo

* chore(travis): only run CI for node LTS versions 4 & 6

* chore(travis): use containerized trusty builds

* Changing OP_COMMAND to OP_COMPRESSED

* Small refactor for reading a header

* (WIP) refactoring decompression to be async

* WIP Async decompression

* Tweaking test to use config port

* WIP Making decompression async

* Validate compressorId

* Async decompression now working

* Updating tests (copying in from NODE-1027)

* Adding strict mode

So we can use "let", etc… on older versions of Node

* Moving snappy to devDependencies

* Simplifying code

* Reducing reliance on constants

* Fixing mis-named variable

* Removing unneeded and unused function

* Defining OP_COMMAND constant once

* Improving code readability

* Making snappy optional

* Require_Optional Snappy

* Making decompression code clearer

* Making error message more informative

* Removing debug message

* Reordering function arguments so that "self" is first

* Removing incorrect environment

This environment has not yet been created in this branch

* Using constants to decide compression mechanism

* Removing unused code

* Using !== instead of !=

* Using pre-release require_master

* Tidying up

* Tidying use of constants

* Standardising use of compressorID and simplifying deciding which ID to use

* Tidying up

* Removing unneeded requires

* Updating Travis Node version, require_optional version, and creating package_lock

* Tidying Response creation

* Removing MongoDB 2.4.x from Travis

* Moving OPCODE numbers to wireprotocol/shared.js
  • Loading branch information
malexandert authored and mbroadst committed Aug 6, 2017
1 parent 7a96db2 commit 966ff6d
Show file tree
Hide file tree
Showing 14 changed files with 3,684 additions and 105 deletions.
12 changes: 5 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
os: linux
sudo: required
dist: trusty
language: node_js

node_js:
- "0.10"
- "0.12"
- "4"
- "6"
- "7"
- "8"
sudo: true
#after_success: npm run coverage

env:
- MONGODB_VERSION=2.4.x
- MONGODB_VERSION=2.6.x
- MONGODB_VERSION=3.0.x
- MONGODB_VERSION=3.2.x
- MONGODB_VERSION=3.4.x
- MONGODB_VERSION=3.6.x

89 changes: 32 additions & 57 deletions lib/connection/commands.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
"use strict";

var retrieveBSON = require('../connection/utils').retrieveBSON;
var retrieveBSON = require('./utils').retrieveBSON;
var BSON = retrieveBSON();
var Long = BSON.Long;

// Incrementing request id
var _requestId = 0;

// Wire command operation ids
var OP_QUERY = 2004;
var OP_GETMORE = 2005;
var OP_KILL_CURSORS = 2007;
var opcodes = require('../wireprotocol/shared').opcodes;

// Query flags
var OPTS_TAILABLE_CURSOR = 2;
Expand Down Expand Up @@ -184,10 +182,10 @@ Query.prototype.toBin = function() {
index = index + 4;

// Write header information OP_QUERY
header[index + 3] = (OP_QUERY >> 24) & 0xff;
header[index + 2] = (OP_QUERY >> 16) & 0xff;
header[index + 1] = (OP_QUERY >> 8) & 0xff;
header[index] = (OP_QUERY) & 0xff;
header[index + 3] = (opcodes.OP_QUERY >> 24) & 0xff;
header[index + 2] = (opcodes.OP_QUERY >> 16) & 0xff;
header[index + 1] = (opcodes.OP_QUERY >> 8) & 0xff;
header[index] = (opcodes.OP_QUERY) & 0xff;
index = index + 4;

// Write header information flags
Expand Down Expand Up @@ -267,10 +265,10 @@ GetMore.prototype.toBin = function() {
index = index + 4;

// index = write32bit(index, _buffer, OP_GETMORE);
_buffer[index + 3] = (OP_GETMORE >> 24) & 0xff;
_buffer[index + 2] = (OP_GETMORE >> 16) & 0xff;
_buffer[index + 1] = (OP_GETMORE >> 8) & 0xff;
_buffer[index] = (OP_GETMORE) & 0xff;
_buffer[index + 3] = (opcodes.OP_GETMORE >> 24) & 0xff;
_buffer[index + 2] = (opcodes.OP_GETMORE >> 16) & 0xff;
_buffer[index + 1] = (opcodes.OP_GETMORE >> 8) & 0xff;
_buffer[index] = (opcodes.OP_GETMORE) & 0xff;
index = index + 4;

// index = write32bit(index, _buffer, 0);
Expand Down Expand Up @@ -351,10 +349,10 @@ KillCursor.prototype.toBin = function() {
index = index + 4;

// index = write32bit(index, _buffer, OP_KILL_CURSORS);
_buffer[index + 3] = (OP_KILL_CURSORS >> 24) & 0xff;
_buffer[index + 2] = (OP_KILL_CURSORS >> 16) & 0xff;
_buffer[index + 1] = (OP_KILL_CURSORS >> 8) & 0xff;
_buffer[index] = (OP_KILL_CURSORS) & 0xff;
_buffer[index + 3] = (opcodes.OP_KILL_CURSORS >> 24) & 0xff;
_buffer[index + 2] = (opcodes.OP_KILL_CURSORS >> 16) & 0xff;
_buffer[index + 1] = (opcodes.OP_KILL_CURSORS >> 8) & 0xff;
_buffer[index] = (opcodes.OP_KILL_CURSORS) & 0xff;
index = index + 4;

// index = write32bit(index, _buffer, 0);
Expand Down Expand Up @@ -394,53 +392,26 @@ KillCursor.prototype.toBin = function() {
return _buffer;
}

var Response = function(bson, data, opts) {
var Response = function(bson, message, msgHeader, msgBody, opts) {
opts = opts || {promoteLongs: true, promoteValues: true, promoteBuffers: false};
this.parsed = false;

//
// Parse Header
//
this.index = 0;
this.raw = data;
this.data = data;
this.raw = message;
this.data = msgBody;
this.bson = bson;
this.opts = opts;

// Read the message length
this.length = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
this.index = this.index + 4;

// Fetch the request id for this reply
this.requestId = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
this.index = this.index + 4;

// Fetch the id of the request that triggered the response
this.responseTo = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
this.index = this.index + 4;
// Read the message header
this.length = msgHeader.length;
this.requestId = msgHeader.requestId;
this.responseTo = msgHeader.responseTo;
this.opCode = msgHeader.opCode;
this.fromCompressed = msgHeader.fromCompressed;

// Skip op-code field
this.index = this.index + 4;

// Unpack flags
this.responseFlags = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
this.index = this.index + 4;

// Unpack the cursor
var lowBits = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
this.index = this.index + 4;
var highBits = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
this.index = this.index + 4;
// Create long object
this.cursorId = new Long(lowBits, highBits);

// Unpack the starting from
this.startingFrom = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
this.index = this.index + 4;

// Unpack the number of objects returned
this.numberReturned = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
this.index = this.index + 4;
// Read the message body
this.responseFlags = msgBody.readInt32LE(0);
this.cursorId = new Long(msgBody.readInt32LE(4), msgBody.readInt32LE(8));
this.startingFrom = msgBody.readInt32LE(12);
this.numberReturned = msgBody.readInt32LE(16);

// Preallocate document array
this.documents = new Array(this.numberReturned);
Expand Down Expand Up @@ -485,6 +456,10 @@ Response.prototype.parse = function(options) {
promoteBuffers: promoteBuffers
};

// Position within OP_REPLY at which documents start
// (See https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#wire-op-reply)
this.index = 20;

//
// Single document and documentsReturnedIn set
//
Expand Down
62 changes: 56 additions & 6 deletions lib/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ var inherits = require('util').inherits
, crypto = require('crypto')
, f = require('util').format
, debugOptions = require('./utils').debugOptions
, parseHeader = require('./utils').parseHeader
, compressorIDs = require('./utils').compressorIDs
, Response = require('./commands').Response
, MongoError = require('../error')
, Logger = require('./logger');
, Logger = require('./logger')
, zlib = require('zlib')
, Snappy = require('./utils').retrieveSnappy()
, OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED
, MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE;

var _id = 0;
var debugFields = ['host', 'port', 'size', 'keepAlive', 'keepAliveInitialDelay', 'noDelay'
Expand Down Expand Up @@ -215,6 +221,49 @@ var closeHandler = function(self) {
}
}

// Decompress a message using the given compressor
var decompress = function(compressorID, compressedData, callback) {
if (compressorID < 0 || compressorID > compressorIDs.length) {
throw new Error('Server sent message compressed using an unsupported compressor. (Received compressor ID ' + compressorID + ')');
}
switch (compressorID) {
case compressorIDs.snappy:
Snappy.uncompress(compressedData, callback);
break;
case compressorIDs.zlib:
zlib.inflate(compressedData, callback);
break;
default:
callback(null, compressedData);
}
}

// Handle a message once it is recieved
var emitMessageHandler = function (self, message) {
var msgHeader = parseHeader(message);
if (msgHeader.opCode == OP_COMPRESSED) {
msgHeader.fromCompressed = true;
var index = MESSAGE_HEADER_SIZE;
msgHeader.opCode = message.readInt32LE(index);
index += 4;
msgHeader.length = message.readInt32LE(index);
index += 4;
var compressorID = message[index];
index++;
decompress(compressorID, message.slice(index), function(err, decompressedMsgBody) {
if (err) {
throw err;
}
if (decompressedMsgBody.length !== msgHeader.length) {
throw new Error('Decompressing a compressed message from the server failed. The message is corrupt.')
}
self.messageHandler(new Response(self.bson, message, msgHeader, decompressedMsgBody, self.responseOptions), self);
})
} else {
self.messageHandler(new Response(self.bson, message, msgHeader, message.slice(MESSAGE_HEADER_SIZE), self.responseOptions), self);
}
}

var dataHandler = function(self) {
return function(data) {
// Parse until we are done with the data
Expand Down Expand Up @@ -246,8 +295,9 @@ var dataHandler = function(self) {
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
// Emit the buffer
self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);

emitMessageHandler(self, emitBuffer);

} catch(err) {
var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{
sizeOfMessage:self.sizeOfMessage,
Expand Down Expand Up @@ -328,7 +378,7 @@ var dataHandler = function(self) {
// Exit parsing loop
data = new Buffer(0);
// Emit the message
self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
emitMessageHandler(self, emitBuffer);
} catch (err) {
self.emit("parseError", err, self);
}
Expand Down Expand Up @@ -358,7 +408,7 @@ var dataHandler = function(self) {
// Copy rest of message
data = data.slice(sizeOfMessage);
// Emit the message
self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
emitMessageHandler(self, emitBuffer);
}
} else {
// Create a buffer that contains the space for the non-complete message
Expand Down Expand Up @@ -539,7 +589,7 @@ Connection.prototype.write = function(buffer) {
// Iterate over all buffers and write them in order to the socket
for(i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary');
return true;
}
}

// Connection is destroyed return write failed
return false;
Expand Down
35 changes: 35 additions & 0 deletions lib/connection/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,44 @@ var retrieveBSON = function() {
return BSON;
}

// Throw an error if an attempt to use Snappy is made when Snappy is not installed
var noSnappyWarning = function() {
throw new Error('Attempted to use Snappy compression, but Snappy is not installed. Install or disable Snappy compression and try again.')
}

// Facilitate loading Snappy optionally
var retrieveSnappy = function() {
var snappy = require_optional('snappy');
if (!snappy) {
snappy = {
compress: noSnappyWarning,
uncompress: noSnappyWarning,
compressSync: noSnappyWarning,
uncompressSync: noSnappyWarning
}
}
return snappy;
}

// Parses the header of a wire protocol message
var parseHeader = function(message) {
return {
length: message.readInt32LE(0),
requestId: message.readInt32LE(4),
responseTo: message.readInt32LE(8),
opCode: message.readInt32LE(12)
}
}

exports.compressorIDs = {
snappy: 1,
zlib: 2
}
exports.setProperty = setProperty;
exports.getProperty = getProperty;
exports.getSingleProperty = getSingleProperty;
exports.copy = copy;
exports.debugOptions = debugOptions;
exports.retrieveBSON = retrieveBSON;
exports.retrieveSnappy = retrieveSnappy;
exports.parseHeader = parseHeader;
9 changes: 6 additions & 3 deletions lib/topologies/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ var inherits = require('util').inherits,
BasicCursor = require('../cursor'),
sdam = require('./shared'),
assign = require('../utils').assign,
createClientInfo = require('./shared').createClientInfo;
createClientInfo = require('./shared').createClientInfo,
createCompressionInfo = require('./shared').createCompressionInfo;

// Used for filtering out fields for loggin
var debugFields = ['reconnect', 'reconnectTries', 'reconnectInterval', 'emitError', 'cursorFactory', 'host'
Expand Down Expand Up @@ -112,7 +113,8 @@ var Server = function(options) {
? options.monitoringInterval
: 5000,
// Topology id
topologyId: -1
topologyId: -1,
compression: {compressors: createCompressionInfo(options)}
}

// Curent ismaster
Expand Down Expand Up @@ -247,7 +249,8 @@ var eventHandler = function(self, event) {
// Query options
var queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true };
// Create a query instance
var query = new Query(self.s.bson, 'admin.$cmd', {ismaster:true, client: self.clientInfo}, queryOptions);
var compressors = (self.s.compression && self.s.compression.compressors) ? self.s.compression.compressors : [];
var query = new Query(self.s.bson, 'admin.$cmd', {ismaster:true, client: self.clientInfo, compression: compressors}, queryOptions);
// Get start time
var start = new Date().getTime();
// Execute the ismaster query
Expand Down
16 changes: 16 additions & 0 deletions lib/topologies/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ function createClientInfo(options) {
return clientInfo;
}

function createCompressionInfo(options) {
if (!options.compression || !options.compression.compressors) {
return [];
}

// Check that all supplied compressors are valid
options.compression.compressors.forEach(function(compressor) {
if (compressor !== 'snappy' && compressor !== 'zlib') {
throw new Error('compressors must be at least one of snappy or zlib')
}
})

return options.compression.compressors
}

function clone(object) {
return JSON.parse(JSON.stringify(object));
}
Expand Down Expand Up @@ -322,6 +337,7 @@ module.exports.emitServerDescriptionChanged = emitServerDescriptionChanged;
module.exports.emitTopologyDescriptionChanged = emitTopologyDescriptionChanged;
module.exports.cloneOptions = cloneOptions;
module.exports.createClientInfo = createClientInfo;
module.exports.createCompressionInfo = createCompressionInfo;
module.exports.clone = clone;
module.exports.diff = diff;
module.exports.Interval = Interval;
Expand Down
Loading

0 comments on commit 966ff6d

Please sign in to comment.