Skip to content

Commit

Permalink
feat(withSession): add top level helper for session lifetime
Browse files Browse the repository at this point in the history
`withSession` allows users to ignore resource management while
using sessions. A provided method/operation will be provided with
an implicitly created session, which will be cleaned up for them
automatically once the operation is complete.

NODE-1418
  • Loading branch information
mbroadst committed Apr 19, 2018
1 parent f5a7227 commit 9976b86
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 1 deletion.
42 changes: 42 additions & 0 deletions lib/mongo_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const shallowClone = require('./utils').shallowClone;
const authenticate = require('./authenticate');
const ServerSessionPool = require('mongodb-core').Sessions.ServerSessionPool;
const executeOperation = require('./utils').executeOperation;
const isPromiseLike = require('./utils').isPromiseLike;

/**
* @fileOverview The **MongoClient** class is a class that allows for making Connections to MongoDB.
Expand Down Expand Up @@ -504,6 +505,47 @@ MongoClient.prototype.startSession = function(options) {
return this.topology.startSession(options, this.s.options);
};

/**
* Runs a given operation with an implicitly created session. The lifetime of the session
* will be handled without the need for user interaction.
*
* NOTE: presently the operation MUST return a Promise (either explicit or implicity as an async function)
*
* @param {Function} operation An operation to execute with an implicitly created session. The signature of this MUST be `(session) => {}`
* @param {Object} [options] Optional settings to be appled to implicitly created session
* @return {Promise} returns Promise if no callback passed
*/
MongoClient.prototype.withSession = function(operation, options, callback) {
if (typeof options === 'function') (callback = options), (options = undefined);
const session = this.startSession(options);

const cleanupHandler = (err, result, opts) => {
opts = Object.assign({ throw: true }, opts);
session.endSession();

if (typeof callback === 'function') {
return err ? callback(err, null) : callback(null, result);
} else {
if (err) {
if (opts.throw) throw err;
return Promise.reject(err);
}
return result;
}
};

try {
const result = operation(session);
const promise = isPromiseLike(result) ? result : Promise.resolve(result);

return promise
.then(result => cleanupHandler(null, result))
.catch(err => cleanupHandler(err, null, { throw: true }));
} catch (err) {
return cleanupHandler(err, null, { throw: false });
}
};

var mergeOptions = function(target, source, flatten) {
for (var name in source) {
if (source[name] && typeof source[name] === 'object' && flatten) {
Expand Down
11 changes: 11 additions & 0 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,16 @@ function applyWriteConcern(target, sources, options) {
return target;
}

/**
* Checks if a given value is a Promise
*
* @param {*} maybePromise
* @return true if the provided value is a Promise
*/
function isPromiseLike(maybePromise) {
return maybePromise && typeof maybePromise.then === 'function';
}

exports.filterOptions = filterOptions;
exports.mergeOptions = mergeOptions;
exports.translateOptions = translateOptions;
Expand All @@ -514,3 +524,4 @@ exports.mergeOptionsAndWriteConcern = mergeOptionsAndWriteConcern;
exports.translateReadPreference = translateReadPreference;
exports.executeOperation = executeOperation;
exports.applyWriteConcern = applyWriteConcern;
exports.isPromiseLike = isPromiseLike;
132 changes: 131 additions & 1 deletion test/functional/sessions_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe('Sessions', function() {

it('should send endSessions for multiple sessions', {
metadata: {
requires: { topology: ['single'], mongodb: '>3.6.0-rc0' },
requires: { topology: ['single'], mongodb: '>3.6.0' },
// Skipping session leak tests b/c these are explicit sessions
sessions: { skipLeakTests: true }
},
Expand All @@ -49,4 +49,134 @@ describe('Sessions', function() {
});
}
});

describe.only('withSession', {
metadata: { requires: { mongodb: '>3.6.0' } },
test: function() {
[
{
description: 'should support operations that return promises',
operation: client => session => {
return client
.db('test')
.collection('foo')
.find({}, { session })
.toArray();
}
},
{
nodeVersion: '>=8.x',
description: 'should support async operations',
operation: client => session =>
async function() {
await client
.db('test')
.collection('foo')
.find({}, { session })
.toArray();
}
},
{
description: 'should support operations that return rejected promises',
operation: (/* client */) => (/* session */) => {
return Promise.reject(new Error('something awful'));
}
},
{
description: "should support operations that don't return promises",
operation: (/* client */) => (/* session */) => {
setTimeout(() => {});
}
},
{
description: 'should support operations that throw exceptions',
operation: (/* client */) => (/* session */) => {
throw new Error('something went wrong!');
}
},
{
description: 'should support operations that return promises with a callback',
operation: client => session => {
return client
.db('test')
.collection('foo')
.find({}, { session })
.toArray();
},
callback: resolve => (err, res) => {
expect(err).to.not.exist;
expect(res).to.exist;
resolve();
}
},
{
description: 'should support operations that return rejected promises and a callback',
operation: (/* client */) => (/* session */) => {
return Promise.reject(new Error('something awful'));
},
callback: resolve => (err, res) => {
expect(err).to.exist;
expect(res).to.not.exist;
resolve();
}
},
{
description: "should support operations that don't return promises with a callback",
operation: (/* client */) => (/* session */) => {
setTimeout(() => {});
},
callback: resolve => (err, res) => {
expect(err).to.exist;
expect(res).to.not.exist;
resolve();
}
},
{
description: 'should support operations that throw exceptions with a callback',
operation: (/* client */) => (/* session */) => {
throw new Error('something went wrong!');
},
callback: resolve => (err, res) => {
expect(err).to.exist;
expect(res).to.not.exist;
resolve();
}
}
].forEach(testCase => {
const metadata = {};
if (testCase.nodeVersion) metadata.requires = { node: testCase.nodeVersion };
it(testCase.description, {
metadata: metadata,
test: function() {
const client = this.configuration.newClient(
{ w: 1 },
{ poolSize: 1, auto_reconnect: false }
);

return client.connect().then(client => {
let promise;
if (testCase.callback) {
promise = new Promise(resolve => {
client.withSession(testCase.operation(client), {}, testCase.callback(resolve));
});
} else {
promise = client.withSession(testCase.operation(client));
}

return promise
.catch(() => expect(client.topology.s.sessionPool.sessions).to.have.length(1))
.then(() => expect(client.topology.s.sessionPool.sessions).to.have.length(1))
.then(() => client.close())
.then(() => {
// verify that the `endSessions` command was sent
const lastCommand = test.commands.started[test.commands.started.length - 1];
expect(lastCommand.commandName).to.equal('endSessions');
expect(client.topology.s.sessionPool.sessions).to.have.length(0);
});
});
}
});
});
}
});
});

0 comments on commit 9976b86

Please sign in to comment.