From 7e942bacbf6bb69c0072fb59cdcaeced2c046a2e Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Wed, 27 May 2020 09:21:21 -0400 Subject: [PATCH] fix: always clear cancelled wait queue members during processing Each time the wait queue in the connection pool is processed it is currently gated by having available connections. This can result in a memory leak where many wait queue members are cancelled, but the pool is unable to clear them out before all connections are used for new operations. NODE-2413 --- lib/cmap/connection_pool.js | 14 +++++++-- test/unit/cmap/connection_pool.test.js | 39 +++++++++++++++++++++++++- 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/lib/cmap/connection_pool.js b/lib/cmap/connection_pool.js index 9de52c4008..e384efc0b2 100644 --- a/lib/cmap/connection_pool.js +++ b/lib/cmap/connection_pool.js @@ -190,6 +190,10 @@ class ConnectionPool extends EventEmitter { return this[kConnections].length; } + get waitQueueSize() { + return this[kWaitQueue].length; + } + /** * Check a connection out of this pool. The connection will continue to be tracked, but no reference to it * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or @@ -287,7 +291,7 @@ class ConnectionPool extends EventEmitter { this[kCancellationToken].emit('cancel'); // drain the wait queue - while (this[kWaitQueue].length) { + while (this.waitQueueSize) { const waitQueueMember = this[kWaitQueue].pop(); clearTimeout(waitQueueMember.timer); if (!waitQueueMember[kCancelled]) { @@ -441,13 +445,17 @@ function processWaitQueue(pool) { return; } - while (pool[kWaitQueue].length && pool.availableConnectionCount) { + while (pool.waitQueueSize) { const waitQueueMember = pool[kWaitQueue].peekFront(); if (waitQueueMember[kCancelled]) { pool[kWaitQueue].shift(); continue; } + if (!pool.availableConnectionCount) { + break; + } + const connection = pool[kConnections].shift(); const isStale = connectionIsStale(pool, connection); const isIdle = connectionIsIdle(pool, connection); @@ -464,7 +472,7 @@ function processWaitQueue(pool) { } const maxPoolSize = pool.options.maxPoolSize; - if (pool[kWaitQueue].length && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) { + if (pool.waitQueueSize && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) { createConnection(pool, (err, connection) => { const waitQueueMember = pool[kWaitQueue].shift(); if (waitQueueMember == null) { diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 9c6ba3f6b5..df62258438 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -3,10 +3,11 @@ const util = require('util'); const loadSpecTests = require('../../spec').loadSpecTests; const ConnectionPool = require('../../../lib/cmap/connection_pool').ConnectionPool; +const WaitQueueTimeoutError = require('../../../lib/cmap/errors').WaitQueueTimeoutError; const EventEmitter = require('events').EventEmitter; const mock = require('mongodb-mock-server'); const cmapEvents = require('../../../lib/cmap/events'); - +const sinon = require('sinon'); const chai = require('chai'); chai.use(require('../../functional/spec-runner/matcher').default); const expect = chai.expect; @@ -113,6 +114,42 @@ describe('Connection Pool', function() { ); }); + it('should clear timed out wait queue members if no connections are available', function(done) { + server.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply(mock.DEFAULT_ISMASTER_36); + } + }); + + const pool = new ConnectionPool( + Object.assign({ bson: new BSON(), maxPoolSize: 1, waitQueueTimeoutMS: 200 }, server.address()) + ); + + pool.checkOut((err, conn) => { + expect(err).to.not.exist; + expect(conn).to.exist; + + pool.checkOut(err => { + expect(err).to.exist.and.be.instanceOf(WaitQueueTimeoutError); + + // We can only process the wait queue with `checkIn` and `checkOut`, so we + // force the pool here to think there are no available connections, even though + // we are checking the connection back in. This simulates a slow leak where + // incoming requests outpace the ability of the queue to fully process cancelled + // wait queue members + sinon.stub(pool, 'availableConnectionCount').get(() => 0); + pool.checkIn(conn); + + expect(pool) + .property('waitQueueSize') + .to.equal(0); + + done(); + }); + }); + }); + describe('withConnection', function() { it('should manage a connection for a successful operation', function(done) { server.setMessageHandler(request => {