Skip to content

Commit

Permalink
refactor(pool): introduce draining state to account for late ops
Browse files Browse the repository at this point in the history
Sometime we request operations as fire-and-forget right before the
pool is destroyed (`endSessions` is a good example). In a graceful
destruction the pool still needs to account for these operations,
so a new state `draining` was introduced to prevent new operations
while allowing the pool to drain existing queued work.
  • Loading branch information
mbroadst committed Nov 5, 2019
1 parent b1e043f commit fd4f4ce
Showing 1 changed file with 27 additions and 21 deletions.
48 changes: 27 additions & 21 deletions lib/core/connection/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ const makeStateMachine = require('../utils').makeStateMachine;
const DISCONNECTED = 'disconnected';
const CONNECTING = 'connecting';
const CONNECTED = 'connected';
const DRAINING = 'draining';
const DESTROYING = 'destroying';
const DESTROYED = 'destroyed';
const stateTransition = makeStateMachine({
[DISCONNECTED]: [CONNECTING, DESTROYING, DISCONNECTED],
[CONNECTING]: [CONNECTING, DESTROYING, CONNECTED, DISCONNECTED],
[CONNECTED]: [CONNECTED, DISCONNECTED, DESTROYING],
[DISCONNECTED]: [CONNECTING, DRAINING, DISCONNECTED],
[CONNECTING]: [CONNECTING, CONNECTED, DRAINING, DISCONNECTED],
[CONNECTED]: [CONNECTED, DISCONNECTED, DRAINING],
[DRAINING]: [DRAINING, DESTROYING, DESTROYED],
[DESTROYING]: [DESTROYING, DESTROYED],
[DESTROYED]: [DESTROYED]
});
Expand Down Expand Up @@ -270,7 +272,7 @@ function connectionFailureHandler(pool, event, err, conn) {

// No more socket available propegate the event
if (pool.socketCount() === 0) {
if (pool.state !== DESTROYED && pool.state !== DESTROYING) {
if (pool.state !== DESTROYED && pool.state !== DESTROYING && pool.state !== DRAINING) {
stateTransition(pool, DISCONNECTED);
}

Expand Down Expand Up @@ -607,6 +609,8 @@ Pool.prototype.unref = function() {

// Destroy the connections
function destroy(self, connections, options, callback) {
stateTransition(self, DESTROYING);

eachAsync(
connections,
(conn, cb) => {
Expand Down Expand Up @@ -644,8 +648,8 @@ Pool.prototype.destroy = function(force, callback) {
return;
}

// Set state to destroyed
stateTransition(this, DESTROYING);
// Set state to draining
stateTransition(this, DRAINING);

// Are we force closing
if (force) {
Expand All @@ -672,6 +676,14 @@ Pool.prototype.destroy = function(force, callback) {

// Wait for the operations to drain before we close the pool
function checkStatus() {
if (self.state === DESTROYED || self.state === DESTROYING) {
if (typeof callback === 'function') {
callback();
}

return;
}

flushMonitoringOperations(self.queue);

if (self.queue.length === 0) {
Expand Down Expand Up @@ -795,17 +807,12 @@ Pool.prototype.write = function(command, options, cb) {

// Pool was destroyed error out
if (this.state === DESTROYED || this.state === DESTROYING) {
// Callback with an error
if (cb) {
try {
cb(new MongoError('pool destroyed'));
} catch (err) {
process.nextTick(function() {
throw err;
});
}
}
cb(new MongoError('pool destroyed'));
return;
}

if (this.state === DRAINING) {
cb(new MongoError('pool is draining, new operations prohibited'));
return;
}

Expand Down Expand Up @@ -938,7 +945,7 @@ function removeConnection(self, connection) {
}

function createConnection(pool, callback) {
if (pool.state === DESTROYED) {
if (pool.state === DESTROYED || pool.state === DESTROYING) {
if (typeof callback === 'function') {
callback(new MongoError('Cannot create connection when pool is destroyed'));
}
Expand Down Expand Up @@ -979,7 +986,7 @@ function createConnection(pool, callback) {
}

// the pool might have been closed since we started creating the connection
if (pool.state === DESTROYED) {
if (pool.state === DESTROYED || pool.state === DESTROYING) {
if (typeof callback === 'function') {
callback(new MongoError('Pool was destroyed after connection creation'));
}
Expand Down Expand Up @@ -1032,7 +1039,6 @@ function _execute(self) {
// operations
if (self.connectingConnections > 0) {
self.executing = false;
setTimeout(() => _execute(self)(), 10);
return;
}

Expand All @@ -1047,8 +1053,8 @@ function _execute(self) {
// Flush any monitoring operations
flushMonitoringOperations(self.queue);

// attempt to grow the pool
if (totalConnections < self.options.size) {
// Try to create a new connection to execute stuck operation
if (totalConnections < self.options.size && self.queue.length > 0) {
createConnection(self);
}

Expand Down

0 comments on commit fd4f4ce

Please sign in to comment.