Skip to content

Commit

Permalink
refactor(pool): allow reconnect during connect flow
Browse files Browse the repository at this point in the history
This allows us to attempt server selection during the initial call
to `connect`, by respecting the `auto_reconnect` property for the
connection pool's `connect` method. In the process a number of
parts of the pool were cleaned up, and code was deduplicated
including:

 - remove unused code paths in creating connections
 - simplify connect logic
 - reuse `createConnection` in `attemptReconnect`

NODE-2121
  • Loading branch information
mbroadst committed Oct 9, 2019
1 parent 21aa117 commit 8153065
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 143 deletions.
227 changes: 87 additions & 140 deletions lib/core/connection/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,6 @@ var Pool = function(topology, options) {
// Operation work queue
this.queue = [];

// Contains the reconnect connection
this.reconnectConnection = null;

// Number of consecutive timeouts caught
this.numberOfConsecutiveTimeouts = 0;
// Current pool Index
Expand Down Expand Up @@ -214,7 +211,6 @@ function resetPoolState(pool) {
pool.availableConnections = [];
pool.connectingConnections = 0;
pool.executing = false;
pool.reconnectConnection = null;
pool.numberOfConsecutiveTimeouts = 0;
pool.connectionIndex = 0;
pool.retriesLeft = pool.options.reconnectTries;
Expand Down Expand Up @@ -299,68 +295,45 @@ function connectionFailureHandler(pool, event, err, conn) {
// Do we need to do anything to maintain the minimum pool size
const totalConnections = totalConnectionCount(pool);
if (totalConnections < pool.minSize) {
_createConnection(pool);
createConnection(pool);
}
}

function attemptReconnect(self) {
function attemptReconnect(pool, callback) {
return function() {
self.emit('attemptReconnect', self);
if (self.state === DESTROYED || self.state === DESTROYING) return;
pool.emit('attemptReconnect', pool);

if (pool.state === DESTROYED || pool.state === DESTROYING) {
if (typeof callback === 'function') {
callback(new MongoError('Cannot create connection when pool is destroyed'));
}

// We are connected do not try again
if (self.isConnected()) {
self.reconnectId = null;
return;
}

self.connectingConnections++;
connect(self.options, (err, connection) => {
self.connectingConnections--;

if (err) {
if (self.logger.isDebug()) {
self.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
}
pool.retriesLeft = pool.retriesLeft - 1;
if (pool.retriesLeft <= 0) {
pool.destroy();

self.retriesLeft = self.retriesLeft - 1;
if (self.retriesLeft <= 0) {
self.destroy();
self.emit(
'reconnectFailed',
new MongoNetworkError(
f(
'failed to reconnect after %s attempts with interval %s ms',
self.options.reconnectTries,
self.options.reconnectInterval
)
)
);
} else {
self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
}
const error = new MongoNetworkError(
`failed to reconnect after ${pool.options.reconnectTries} attempts with interval ${
pool.options.reconnectInterval
} ms`
);

return;
pool.emit('reconnectFailed', error);
if (typeof callback === 'function') {
callback(error);
}

if (self.state === DESTROYED || self.state === DESTROYING) {
return connection.destroy();
}
return;
}

self.reconnectId = null;
handlers.forEach(event => connection.removeAllListeners(event));
connection.on('error', self._connectionErrorHandler);
connection.on('close', self._connectionCloseHandler);
connection.on('timeout', self._connectionTimeoutHandler);
connection.on('parseError', self._connectionParseErrorHandler);
connection.on('message', self._messageHandler);

self.retriesLeft = self.options.reconnectTries;
self.availableConnections.push(connection);
self.reconnectConnection = null;
self.emit('reconnect', self);
_execute(self)();
});
// clear the reconnect id on retry
pool.reconnectId = null;

// now retry creating a connection
createConnection(pool, callback);
};
}

Expand Down Expand Up @@ -564,64 +537,26 @@ Pool.prototype.connect = function() {
throw new MongoError('connection in unlawful state ' + this.state);
}

const self = this;
stateTransition(this, CONNECTING);

self.connectingConnections++;
connect(self.options, (err, connection) => {
self.connectingConnections--;

createConnection(this, (err, conn) => {
if (err) {
if (self.logger.isDebug()) {
self.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
}

if (self.state === CONNECTING) {
self.emit('error', err);
if (this.state === CONNECTING) {
this.emit('error', err);
}

this.destroy();
return;
}

if (self.state === DESTROYED || self.state === DESTROYING) {
return self.destroy();
}
stateTransition(this, CONNECTED);
this.emit('connect', this, conn);

// attach event handlers
connection.on('error', self._connectionErrorHandler);
connection.on('close', self._connectionCloseHandler);
connection.on('timeout', self._connectionTimeoutHandler);
connection.on('parseError', self._connectionParseErrorHandler);
connection.on('message', self._messageHandler);

// If we are in a topology, delegate the auth to it
// This is to avoid issues where we would auth against an
// arbiter
if (self.options.inTopology) {
stateTransition(self, CONNECTED);
self.availableConnections.push(connection);
return self.emit('connect', self, connection);
}

if (self.state === DESTROYED || self.state === DESTROYING) {
return self.destroy();
}

if (err) {
self.destroy();
return self.emit('error', err);
}

stateTransition(self, CONNECTED);
self.availableConnections.push(connection);

if (self.minSize) {
for (let i = 0; i < self.minSize; i++) {
_createConnection(self);
// create min connections
if (this.minSize) {
for (let i = 0; i < this.minSize; i++) {
createConnection(this);
}
}

self.emit('connect', self, connection);
});
};

Expand Down Expand Up @@ -718,12 +653,6 @@ Pool.prototype.destroy = function(force, callback) {
clearTimeout(this.reconnectId);
}

// If we have a reconnect connection running, close
// immediately
if (this.reconnectConnection) {
this.reconnectConnection.destroy();
}

// Wait for the operations to drain before we close the pool
function checkStatus() {
flushMonitoringOperations(self.queue);
Expand Down Expand Up @@ -782,7 +711,7 @@ Pool.prototype.reset = function(callback) {
resetPoolState(this);

// create an initial connection, and kick off execution again
_createConnection(this);
createConnection(this);

if (typeof callback === 'function') {
callback(null, null);
Expand Down Expand Up @@ -996,55 +925,73 @@ function removeConnection(self, connection) {
if (remove(connection, self.inUseConnections)) return;
}

const handlers = ['close', 'message', 'error', 'timeout', 'parseError', 'connect'];
function _createConnection(self) {
if (self.state === DESTROYED || self.state === DESTROYING) {
function createConnection(pool, callback) {
if (pool.state === DESTROYED || pool.state === DESTROYING) {
if (typeof callback === 'function') {
callback(new MongoError('Cannot create connection when pool is destroyed'));
}

return;
}

self.connectingConnections++;
connect(self.options, (err, connection) => {
self.connectingConnections--;
pool.connectingConnections++;
connect(pool.options, (err, connection) => {
pool.connectingConnections--;

if (err) {
if (self.logger.isDebug()) {
self.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
if (pool.logger.isDebug()) {
pool.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
}

if (!self.reconnectId && self.options.reconnect) {
self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
if (!pool.reconnectId && pool.options.reconnect) {
pool.reconnectId = setTimeout(
attemptReconnect(pool, callback),
pool.options.reconnectInterval
);

return;
}

if (typeof callback === 'function') {
callback(err);
}

return;
}

if (self.state === DESTROYED || self.state === DESTROYING) {
removeConnection(self, connection);
return connection.destroy();
// the pool might have been closed since we started creating the connection
if (pool.state === DESTROYED || pool.state === DESTROYING) {
if (typeof callback === 'function') {
callback(new MongoError('Pool was destroyed after connection creation'));
}

connection.destroy();
return;
}

connection.on('error', self._connectionErrorHandler);
connection.on('close', self._connectionCloseHandler);
connection.on('timeout', self._connectionTimeoutHandler);
connection.on('parseError', self._connectionParseErrorHandler);
connection.on('message', self._messageHandler);
// otherwise, connect relevant event handlers and add it to our available connections
connection.on('error', pool._connectionErrorHandler);
connection.on('close', pool._connectionCloseHandler);
connection.on('timeout', pool._connectionTimeoutHandler);
connection.on('parseError', pool._connectionParseErrorHandler);
connection.on('message', pool._messageHandler);

if (self.state === DESTROYED || self.state === DESTROYING) {
return connection.destroy();
}
pool.availableConnections.push(connection);

// Remove the connection from the connectingConnections list
removeConnection(self, connection);
// if there is a reconnect in progress, reset state and emit event
if (pool.reconnectId) {
pool.reconnectId = null;
pool.retriesLeft = pool.options.reconnectTries;
pool.emit('reconnect', pool);
}

// Handle error
if (err) {
return connection.destroy();
// if a callback was provided, return the connection
if (typeof callback === 'function') {
callback(null, connection);
}

// Push to available
self.availableConnections.push(connection);
// Execute any work waiting
_execute(self)();
// immediately execute any waiting work
_execute(pool)();
});
}

Expand Down Expand Up @@ -1146,7 +1093,7 @@ function _execute(self) {
// Attempt to grow the pool if it's not yet maxsize
if (totalConnections < self.options.size && self.queue.length > 0) {
// Create a new connection
_createConnection(self);
createConnection(self);
}

// Re-execute the operation
Expand All @@ -1166,7 +1113,7 @@ function _execute(self) {
// Lets put the workItem back on the list
self.queue.unshift(workItem);
// Create a new connection
_createConnection(self);
createConnection(self);
// Break from the loop
break;
}
Expand Down
1 change: 1 addition & 0 deletions lib/core/sdam/topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ function selectServers(topology, selector, timeout, start, callback) {

const iterationTimer = setTimeout(() => {
topology.removeListener('topologyDescriptionChanged', descriptionChangedHandler);

callback(
new MongoTimeoutError(
`Server selection timed out after ${timeout} ms`,
Expand Down
6 changes: 3 additions & 3 deletions test/functional/scram_sha_256_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ describe('SCRAM-SHA-256 auth', function() {
},
authSource: this.configuration.db,
authMechanism: 'SCRAM-SHA-1',
serverSelectionTimeoutMS: 10
serverSelectionTimeoutMS: 2000
};

return withClient(
Expand All @@ -207,7 +207,7 @@ describe('SCRAM-SHA-256 auth', function() {
password: 'pencil'
},
authSource: 'admin',
serverSelectionTimeoutMS: 1000
serverSelectionTimeoutMS: 2000
};

const badPasswordOptions = {
Expand All @@ -216,7 +216,7 @@ describe('SCRAM-SHA-256 auth', function() {
password: 'pencil'
},
authSource: 'admin',
serverSelectionTimeoutMS: 1000
serverSelectionTimeoutMS: 2000
};

const getErrorMsg = options =>
Expand Down

0 comments on commit 8153065

Please sign in to comment.