From 3133fc375ee6c9adcece7fb8499634bc19482d6c Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Fri, 14 Sep 2018 21:39:44 -0400 Subject: [PATCH] Revert "feat(connection): Implement fast fallback" This reverts commits: 956d5a2765edee0d1a8fabafe9cb43d26ff04ce5. 60646a70cff899c68378776e785afc56f76deeb9. 41e3d5e8ef8865963521160292449e4c1a328110. 68a6656041bb2c5e6ae361268587942184ebbfb4. 721ca6f479ffab5d39f082be65633762197b1d47. 6d821515ad4aed360b212b9d5abb329c11c8fcde. 719781531cec43ab01e8208c3fd0aacb8e0ff234. 6dff3266a701470499fa7a63e689f6de9c9a5355. b0fb2e32e6e2b3c3ff2cb6a024108c3e3c2051e6. t622394a571855c146a165416c53e7e3f3bd2c657. --- lib/connection/connection.js | 398 +++++++++--------- test/tests/functional/pool_tests.js | 40 ++ .../functional/rs_mocks/add_remove_tests.js | 2 +- 3 files changed, 230 insertions(+), 210 deletions(-) diff --git a/lib/connection/connection.js b/lib/connection/connection.js index 88b20bf09..d102f341d 100644 --- a/lib/connection/connection.js +++ b/lib/connection/connection.js @@ -228,60 +228,60 @@ function addConnection(id, connection) { // // Connection handlers -var errorHandler = function(conn) { +var errorHandler = function(self) { return function(err) { - if (connectionAccounting) deleteConnection(conn.id); + if (connectionAccounting) deleteConnection(self.id); // Debug information - if (conn.logger.isDebug()) - conn.logger.debug( + if (self.logger.isDebug()) + self.logger.debug( f( 'connection %s for [%s:%s] errored out with [%s]', - conn.id, - conn.host, - conn.port, + self.id, + self.host, + self.port, JSON.stringify(err) ) ); // Emit the error - if (conn.listeners('error').length > 0) conn.emit('error', new MongoNetworkError(err), conn); + if (self.listeners('error').length > 0) self.emit('error', new MongoNetworkError(err), self); }; }; -var timeoutHandler = function(conn) { +var timeoutHandler = function(self) { return function() { - if (connectionAccounting) deleteConnection(conn.id); + if (connectionAccounting) deleteConnection(self.id); // Debug information - if (conn.logger.isDebug()) - conn.logger.debug(f('connection %s for [%s:%s] timed out', conn.id, conn.host, conn.port)); + if (self.logger.isDebug()) + self.logger.debug(f('connection %s for [%s:%s] timed out', self.id, self.host, self.port)); // Emit timeout error - conn.emit( + self.emit( 'timeout', - new MongoNetworkError(f('connection %s to %s:%s timed out', conn.id, conn.host, conn.port)), - conn + new MongoNetworkError(f('connection %s to %s:%s timed out', self.id, self.host, self.port)), + self ); }; }; -var closeHandler = function(conn) { +var closeHandler = function(self) { return function(hadError) { - if (connectionAccounting) deleteConnection(conn.id); + if (connectionAccounting) deleteConnection(self.id); // Debug information - if (conn.logger.isDebug()) - conn.logger.debug(f('connection %s with for [%s:%s] closed', conn.id, conn.host, conn.port)); + if (self.logger.isDebug()) + self.logger.debug(f('connection %s with for [%s:%s] closed', self.id, self.host, self.port)); // Emit close event if (!hadError) { - conn.emit( + self.emit( 'close', - new MongoNetworkError(f('connection %s to %s:%s closed', conn.id, conn.host, conn.port)), - conn + new MongoNetworkError(f('connection %s to %s:%s closed', self.id, self.host, self.port)), + self ); } }; }; // Handle a message once it is recieved -var emitMessageHandler = function(conn, message) { +var emitMessageHandler = function(self, message) { var msgHeader = parseHeader(message); if (msgHeader.opCode === OP_COMPRESSED) { msgHeader.fromCompressed = true; @@ -301,98 +301,98 @@ var emitMessageHandler = function(conn, message) { 'Decompressing a compressed message from the server failed. The message is corrupt.' ); } - conn.messageHandler( - new Response(conn.bson, message, msgHeader, decompressedMsgBody, conn.responseOptions), - conn + self.messageHandler( + new Response(self.bson, message, msgHeader, decompressedMsgBody, self.responseOptions), + self ); }); } else { - conn.messageHandler( + self.messageHandler( new Response( - conn.bson, + self.bson, message, msgHeader, message.slice(MESSAGE_HEADER_SIZE), - conn.responseOptions + self.responseOptions ), - conn + self ); } }; -var dataHandler = function(conn) { +var dataHandler = function(self) { return function(data) { // Parse until we are done with the data while (data.length > 0) { // If we still have bytes to read on the current message - if (conn.bytesRead > 0 && conn.sizeOfMessage > 0) { + if (self.bytesRead > 0 && self.sizeOfMessage > 0) { // Calculate the amount of remaining bytes - var remainingBytesToRead = conn.sizeOfMessage - conn.bytesRead; + var remainingBytesToRead = self.sizeOfMessage - self.bytesRead; // Check if the current chunk contains the rest of the message if (remainingBytesToRead > data.length) { // Copy the new data into the exiting buffer (should have been allocated when we know the message size) - data.copy(conn.buffer, conn.bytesRead); + data.copy(self.buffer, self.bytesRead); // Adjust the number of bytes read so it point to the correct index in the buffer - conn.bytesRead = conn.bytesRead + data.length; + self.bytesRead = self.bytesRead + data.length; // Reset state of buffer data = Buffer.alloc(0); } else { // Copy the missing part of the data into our current buffer - data.copy(conn.buffer, conn.bytesRead, 0, remainingBytesToRead); + data.copy(self.buffer, self.bytesRead, 0, remainingBytesToRead); // Slice the overflow into a new buffer that we will then re-parse data = data.slice(remainingBytesToRead); // Emit current complete message try { - var emitBuffer = conn.buffer; + var emitBuffer = self.buffer; // Reset state of buffer - conn.buffer = null; - conn.sizeOfMessage = 0; - conn.bytesRead = 0; - conn.stubBuffer = null; + self.buffer = null; + self.sizeOfMessage = 0; + self.bytesRead = 0; + self.stubBuffer = null; - emitMessageHandler(conn, emitBuffer); + emitMessageHandler(self, emitBuffer); } catch (err) { var errorObject = { err: 'socketHandler', trace: err, - bin: conn.buffer, + bin: self.buffer, parseState: { - sizeOfMessage: conn.sizeOfMessage, - bytesRead: conn.bytesRead, - stubBuffer: conn.stubBuffer + sizeOfMessage: self.sizeOfMessage, + bytesRead: self.bytesRead, + stubBuffer: self.stubBuffer } }; // We got a parse Error fire it off then keep going - conn.emit('parseError', errorObject, conn); + self.emit('parseError', errorObject, self); } } } else { // Stub buffer is kept in case we don't get enough bytes to determine the // size of the message (< 4 bytes) - if (conn.stubBuffer != null && conn.stubBuffer.length > 0) { + if (self.stubBuffer != null && self.stubBuffer.length > 0) { // If we have enough bytes to determine the message size let's do it - if (conn.stubBuffer.length + data.length > 4) { + if (self.stubBuffer.length + data.length > 4) { // Prepad the data - var newData = Buffer.alloc(conn.stubBuffer.length + data.length); - conn.stubBuffer.copy(newData, 0); - data.copy(newData, conn.stubBuffer.length); + var newData = Buffer.alloc(self.stubBuffer.length + data.length); + self.stubBuffer.copy(newData, 0); + data.copy(newData, self.stubBuffer.length); // Reassign for parsing data = newData; // Reset state of buffer - conn.buffer = null; - conn.sizeOfMessage = 0; - conn.bytesRead = 0; - conn.stubBuffer = null; + self.buffer = null; + self.sizeOfMessage = 0; + self.bytesRead = 0; + self.stubBuffer = null; } else { // Add the the bytes to the stub buffer - var newStubBuffer = Buffer.alloc(conn.stubBuffer.length + data.length); + var newStubBuffer = Buffer.alloc(self.stubBuffer.length + data.length); // Copy existing stub buffer - conn.stubBuffer.copy(newStubBuffer, 0); + self.stubBuffer.copy(newStubBuffer, 0); // Copy missing part of the data - data.copy(newStubBuffer, conn.stubBuffer.length); + data.copy(newStubBuffer, self.stubBuffer.length); // Exit parsing loop data = Buffer.alloc(0); } @@ -402,59 +402,59 @@ var dataHandler = function(conn) { // var sizeOfMessage = data.readUInt32LE(0); var sizeOfMessage = data[0] | (data[1] << 8) | (data[2] << 16) | (data[3] << 24); // If we have a negative sizeOfMessage emit error and return - if (sizeOfMessage < 0 || sizeOfMessage > conn.maxBsonMessageSize) { + if (sizeOfMessage < 0 || sizeOfMessage > self.maxBsonMessageSize) { errorObject = { err: 'socketHandler', trace: '', - bin: conn.buffer, + bin: self.buffer, parseState: { sizeOfMessage: sizeOfMessage, - bytesRead: conn.bytesRead, - stubBuffer: conn.stubBuffer + bytesRead: self.bytesRead, + stubBuffer: self.stubBuffer } }; // We got a parse Error fire it off then keep going - conn.emit('parseError', errorObject, conn); + self.emit('parseError', errorObject, self); return; } // Ensure that the size of message is larger than 0 and less than the max allowed if ( sizeOfMessage > 4 && - sizeOfMessage < conn.maxBsonMessageSize && + sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage > data.length ) { - conn.buffer = Buffer.alloc(sizeOfMessage); + self.buffer = Buffer.alloc(sizeOfMessage); // Copy all the data into the buffer - data.copy(conn.buffer, 0); + data.copy(self.buffer, 0); // Update bytes read - conn.bytesRead = data.length; + self.bytesRead = data.length; // Update sizeOfMessage - conn.sizeOfMessage = sizeOfMessage; + self.sizeOfMessage = sizeOfMessage; // Ensure stub buffer is null - conn.stubBuffer = null; + self.stubBuffer = null; // Exit parsing loop data = Buffer.alloc(0); } else if ( sizeOfMessage > 4 && - sizeOfMessage < conn.maxBsonMessageSize && + sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage === data.length ) { try { emitBuffer = data; // Reset state of buffer - conn.buffer = null; - conn.sizeOfMessage = 0; - conn.bytesRead = 0; - conn.stubBuffer = null; + self.buffer = null; + self.sizeOfMessage = 0; + self.bytesRead = 0; + self.stubBuffer = null; // Exit parsing loop data = Buffer.alloc(0); // Emit the message - emitMessageHandler(conn, emitBuffer); + emitMessageHandler(self, emitBuffer); } catch (err) { - conn.emit('parseError', err, conn); + self.emit('parseError', err, self); } - } else if (sizeOfMessage <= 4 || sizeOfMessage > conn.maxBsonMessageSize) { + } else if (sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonMessageSize) { errorObject = { err: 'socketHandler', trace: null, @@ -467,32 +467,32 @@ var dataHandler = function(conn) { } }; // We got a parse Error fire it off then keep going - conn.emit('parseError', errorObject, conn); + self.emit('parseError', errorObject, self); // Clear out the state of the parser - conn.buffer = null; - conn.sizeOfMessage = 0; - conn.bytesRead = 0; - conn.stubBuffer = null; + self.buffer = null; + self.sizeOfMessage = 0; + self.bytesRead = 0; + self.stubBuffer = null; // Exit parsing loop data = Buffer.alloc(0); } else { emitBuffer = data.slice(0, sizeOfMessage); // Reset state of buffer - conn.buffer = null; - conn.sizeOfMessage = 0; - conn.bytesRead = 0; - conn.stubBuffer = null; + self.buffer = null; + self.sizeOfMessage = 0; + self.bytesRead = 0; + self.stubBuffer = null; // Copy rest of message data = data.slice(sizeOfMessage); // Emit the message - emitMessageHandler(conn, emitBuffer); + emitMessageHandler(self, emitBuffer); } } else { // Create a buffer that contains the space for the non-complete message - conn.stubBuffer = Buffer.alloc(data.length); + self.stubBuffer = Buffer.alloc(data.length); // Copy the data to the stub buffer - data.copy(conn.stubBuffer, 0); + data.copy(self.stubBuffer, 0); // Exit parsing loop data = Buffer.alloc(0); } @@ -529,122 +529,97 @@ function merge(options1, options2) { } } -function prepareConnectionOptions(conn, _options) { - let options; - if (conn.ssl) { - options = { - socket: conn.connection, - rejectUnauthorized: conn.rejectUnauthorized - }; - - // Merge in options - merge(options, conn.options); - merge(options, _options); - - // Set options for ssl - if (conn.ca) options.ca = conn.ca; - if (conn.crl) options.crl = conn.crl; - if (conn.cert) options.cert = conn.cert; - if (conn.key) options.key = conn.key; - if (conn.passphrase) options.passphrase = conn.passphrase; - - // Override checkServerIdentity behavior - if (conn.checkServerIdentity === false) { - // Skip the identiy check by returning undefined as per node documents - // https://nodejs.org/api/tls.html#tls_tls_connect_options_callback - options.checkServerIdentity = function() { - return undefined; - }; - } else if (typeof conn.checkServerIdentity === 'function') { - options.checkServerIdentity = conn.checkServerIdentity; - } - - // Set default sni servername to be the same as host - if (options.servername == null) { - options.servername = conn.host; - } +function makeSSLConnection(self, _options) { + let sslOptions = { + socket: self.connection, + rejectUnauthorized: self.rejectUnauthorized + }; - options = Object.assign({}, options, { host: conn.host, port: conn.port }); - } else { - if (conn.domainSocket) { - options = { path: conn.host }; - } else { - options = { port: conn.port, host: conn.host }; - } + // Merge in options + merge(sslOptions, self.options); + merge(sslOptions, _options); + + // Set options for ssl + if (self.ca) sslOptions.ca = self.ca; + if (self.crl) sslOptions.crl = self.crl; + if (self.cert) sslOptions.cert = self.cert; + if (self.key) sslOptions.key = self.key; + if (self.passphrase) sslOptions.passphrase = self.passphrase; + + // Override checkServerIdentity behavior + if (self.checkServerIdentity === false) { + // Skip the identiy check by retuning undefined as per node documents + // https://nodejs.org/api/tls.html#tls_tls_connect_options_callback + sslOptions.checkServerIdentity = function() { + return undefined; + }; + } else if (typeof self.checkServerIdentity === 'function') { + sslOptions.checkServerIdentity = self.checkServerIdentity; } - return options; -} - -function makeConnection(conn, options, callback) { - const netModule = options.ssl ? tls : net; + // Set default sni servername to be the same as host + if (sslOptions.servername == null) { + sslOptions.servername = self.host; + } - const connection = netModule.connect(options, function() { - if (conn.ssl) { - // Error on auth or skip - if (connection.authorizationError && conn.rejectUnauthorized) { - return conn.emit('error', connection.authorizationError, conn, { ssl: true }); - } + // Attempt SSL connection + const connection = tls.connect(self.port, self.host, sslOptions, function() { + // Error on auth or skip + if (connection.authorizationError && self.rejectUnauthorized) { + return self.emit('error', connection.authorizationError, self, { ssl: true }); } - // Set socket timeout instead of connection timeout - connection.setTimeout(conn.socketTimeout); - return callback(null, connection); + // Set socket timeout instead of connection timeout + connection.setTimeout(self.socketTimeout); + // We are done emit connect + self.emit('connect', self); }); // Set the options for the connection - connection.setKeepAlive(conn.keepAlive, conn.keepAliveInitialDelay); - connection.setTimeout(conn.connectionTimeout); - connection.setNoDelay(conn.noDelay); + connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay); + connection.setTimeout(self.connectionTimeout); + connection.setNoDelay(self.noDelay); - // Add handlers for events - connection.once('error', err => callback(err, null)); + return connection; } -function normalConnect(conn, family, _options, callback) { - const options = prepareConnectionOptions(conn, _options); - makeConnection(conn, Object.assign({ family }, options), (err, connection) => { - if (err) return callback(err, null); - callback(null, connection); - }); -} +function makeUnsecureConnection(self, family) { + // Create new connection instance + let connection_options; + if (self.domainSocket) { + connection_options = { path: self.host }; + } else { + connection_options = { port: self.port, host: self.host }; + connection_options.family = family; + } -function fastFallbackConnect(conn, _options, callback) { - const options = prepareConnectionOptions(conn, _options); - - let errors = []; - let connection; - const connectionHandler = (err, _connection) => { - if (err) { - if (errors.length > 0) { - // an error occurred for the second time, we have officially failed - // return mongo error to be emitted - return callback(err, null); - } + const connection = net.createConnection(connection_options); - // otherwise push the error, and wait for subsequent connects - errors.push(err); - return; - } + // Set the options for the connection + connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay); + connection.setTimeout(self.connectionTimeout); + connection.setNoDelay(self.noDelay); - if (_connection) { - if (connection) { - _connection.removeAllListeners('error'); - _connection.unref(); - return; - } + connection.once('connect', function() { + // Set socket timeout instead of connection timeout + connection.setTimeout(self.socketTimeout); + // Emit connect event + self.emit('connect', self); + }); - connection = _connection; - return callback(null, connection); - } - }; + return connection; +} - makeConnection(conn, Object.assign({ family: 6 }, options), connectionHandler); +function doConnect(self, family, _options, _errorHandler) { + self.connection = self.ssl + ? makeSSLConnection(self, _options) + : makeUnsecureConnection(self, family); - // IPv4 attempts to connect 250ms after IPv6 to give IPv6 preference - setTimeout(() => { - makeConnection(conn, Object.assign({ family: 4 }, options), connectionHandler); - }, 250); + // Add handlers for events + self.connection.once('error', _errorHandler); + self.connection.once('timeout', timeoutHandler(self)); + self.connection.once('close', closeHandler(self)); + self.connection.on('data', dataHandler(self)); } /** @@ -662,29 +637,34 @@ Connection.prototype.connect = function(_options) { this.responseOptions.promoteBuffers = _options.promoteBuffers; } - const connectHandler = (err, connection) => { - const connectionErrorHandler = errorHandler(this); - - if (err) { - connectionErrorHandler(err); - return; - } - - // Add handlers for events - connection.once('error', connectionErrorHandler); - connection.once('timeout', timeoutHandler(this)); - connection.once('close', closeHandler(this)); - connection.on('data', dataHandler(this)); - this.connection = connection; - this.emit('connect', this); - return; - }; + const _errorHandler = errorHandler(this); if (this.family !== void 0) { - return normalConnect(this, this.family, _options, connectHandler); + return doConnect(this, this.family, _options, _errorHandler); } - return fastFallbackConnect(this, _options, connectHandler); + return doConnect(this, 6, _options, err => { + if (this.logger.isDebug()) { + this.logger.debug( + f( + 'connection %s for [%s:%s] errored out with [%s]', + this.id, + this.host, + this.port, + JSON.stringify(err) + ) + ); + } + + // clean up existing event handlers + this.connection.removeAllListeners('error'); + this.connection.removeAllListeners('timeout'); + this.connection.removeAllListeners('close'); + this.connection.removeAllListeners('data'); + this.connection = undefined; + + return doConnect(this, 4, _options, _errorHandler); + }); }; /** @@ -695,9 +675,9 @@ Connection.prototype.connect = function(_options) { Connection.prototype.unref = function() { if (this.connection) this.connection.unref(); else { - var conn = this; + var self = this; this.once('connect', function() { - conn.connection.unref(); + self.connection.unref(); }); } }; diff --git a/test/tests/functional/pool_tests.js b/test/tests/functional/pool_tests.js index 6c94a0237..28006c59a 100644 --- a/test/tests/functional/pool_tests.js +++ b/test/tests/functional/pool_tests.js @@ -48,6 +48,46 @@ describe('Pool tests', function() { } }); + it('Should only listen on connect once', { + metadata: { requires: { topology: 'single' } }, + + test: function(done) { + // Attempt to connect + var pool = new Pool(null, { + host: this.configuration.host, + port: this.configuration.port, + bson: new Bson(), + messageHandler: function() {} + }); + + let connection; + + // Add event listeners + pool.on('connect', function() { + var connections = pool.allConnections(); + + process.nextTick(() => { + // Now that we are in next tick, connection should still exist, but there + // should be no connect listeners + expect(connection.connection.listenerCount('connect')).to.equal(0); + expect(connections).to.have.lengthOf(1); + + pool.destroy(); + done(); + }); + }); + + expect(pool.allConnections()).to.have.lengthOf(0); + + // Start connection + pool.connect(); + + expect(pool.allConnections()).to.have.lengthOf(1); + connection = pool.allConnections()[0]; + expect(connection.connection.listenerCount('connect')).to.equal(1); + } + }); + it('should correctly write ismaster operation to the server', { metadata: { requires: { topology: 'single' } }, diff --git a/test/tests/functional/rs_mocks/add_remove_tests.js b/test/tests/functional/rs_mocks/add_remove_tests.js index b05864429..c02647c74 100644 --- a/test/tests/functional/rs_mocks/add_remove_tests.js +++ b/test/tests/functional/rs_mocks/add_remove_tests.js @@ -602,7 +602,7 @@ describe('ReplSet Add Remove (mocks)', function() { server.destroy(); done(); }, 1000); - }, 630); // This connection implementation slows down conneciton to nodes, need to fix this + }, 500); server.on('left', function(_type, _server) { if (_type === 'secondary' && _server.name === 'localhost:32003') {