diff --git a/lib/topologies/mongos.js b/lib/topologies/mongos.js index 725e5f6ad..d311da612 100644 --- a/lib/topologies/mongos.js +++ b/lib/topologies/mongos.js @@ -895,6 +895,7 @@ var executeWriteOperation = function(self, op, ns, ops, options, callback) { } // increment and assign txnNumber + options.willRetryWrite = true; options.session.incrementTransactionNumber(); server[op](ns, ops, options, (err, result) => { diff --git a/lib/topologies/replset.js b/lib/topologies/replset.js index 5bee16b7a..86bd06593 100644 --- a/lib/topologies/replset.js +++ b/lib/topologies/replset.js @@ -1231,6 +1231,7 @@ function executeWriteOperation(args, options, callback) { // increment and assign txnNumber if (willRetryWrite) { options.session.incrementTransactionNumber(); + options.willRetryWrite = willRetryWrite; } // optionally autostart transaction if requested diff --git a/lib/wireprotocol/3_2_support.js b/lib/wireprotocol/3_2_support.js index 447ac77c2..611490e71 100644 --- a/lib/wireprotocol/3_2_support.js +++ b/lib/wireprotocol/3_2_support.js @@ -18,20 +18,23 @@ var WireProtocol = function(legacyWireProtocol) { * * @param {Object} command the command to decorate * @param {ClientSession} session the session tracking transaction state + * @param {boolean} [isRetryableWrite=false] if true, will be decorated for retryable writes */ -function decorateWithTransactionsData(command, session) { +function decorateWithTransactionsData(command, session, isRetryableWrite) { if (!session) { return; } // first apply non-transaction-specific sessions data const serverSession = session.serverSession; - if (serverSession.txnNumber) { + const inTransaction = session.inTransaction(); + + if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) { command.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber); } // now try to apply tansaction-specific data - if (!session.inTransaction()) { + if (!inTransaction) { return; } @@ -103,7 +106,7 @@ var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callba } // optionally decorate command with transactions data - decorateWithTransactionsData(writeCommand, options.session); + decorateWithTransactionsData(writeCommand, options.session, options.willRetryWrite); // Options object var opts = { command: true }; diff --git a/test/tests/unit/replset/transactions_feature_decoration_tests.js b/test/tests/unit/replset/transactions_feature_decoration_tests.js new file mode 100644 index 000000000..78706de39 --- /dev/null +++ b/test/tests/unit/replset/transactions_feature_decoration_tests.js @@ -0,0 +1,142 @@ +'use strict'; + +const expect = require('chai').expect; +const ReplSet = require('../../../../lib/topologies/replset'); +const mock = require('mongodb-mock-server'); +const ReplSetFixture = require('../common').ReplSetFixture; +const ClientSession = require('../../../../lib/sessions').ClientSession; +const ServerSessionPool = require('../../../../lib/sessions').ServerSessionPool; + +describe('Transaction Feature Decoration', function() { + let test; + const ns = 'db.foo'; + const noop = () => {}; + const ismaster = Object.assign({}, mock.DEFAULT_ISMASTER_36, { maxWireVersion: 7 }); + + before(() => (test = new ReplSetFixture())); + afterEach(() => mock.cleanup()); + beforeEach(() => test.setup({ ismaster })); + + class TestConfig { + constructor(config, flags) { + this.fnName = config.fnName; + this.cmd = config.cmd; + this.arg = config.arg(); + this.flags = flags; + this.retryWrites = flags.retryWrites; + this.session = flags.session; + this.transaction = flags.transaction; + } + + get shouldPass() { + if (this.session && this.transaction) { + return true; + } + + if (this.fnName === 'command') { + return false; + } + + return this.session && this.retryWrites; + } + + get description() { + const not = this.shouldPass ? '' : 'not '; + const flags = JSON.stringify(this.flags); + + return `should ${not}have a txnNumber when command ${this.cmd} is used with ${flags}`; + } + } + + [ + { fnName: 'insert', cmd: 'insert', arg: () => [{ foo: 1 }] }, + { fnName: 'update', cmd: 'update', arg: () => [{ foo: 1 }] }, + { fnName: 'remove', cmd: 'delete', arg: () => [{ foo: 1 }] }, + { fnName: 'command', cmd: 'fizzBuzz', arg: () => ({ fizzBuzz: 1 }) } + ] + .reduce((testConfigs, op) => { + for (let i = 0; i < 4; i += 1) { + const options = { + retryWrites: i % 2 === 1, + session: i >= 2 + }; + + testConfigs.push(new TestConfig(op, options)); + + if (options.session) { + testConfigs.push(new TestConfig(op, Object.assign({ transaction: true }, options))); + } + } + return testConfigs; + }, []) + .forEach(config => { + it(config.description, { + metadata: { requires: { topology: 'single', mongodb: '>=3.7.3' } }, + test: function(done) { + const replSet = new ReplSet( + [test.primaryServer.address(), test.firstSecondaryServer.address()], + { + setName: 'rs', + connectionTimeout: 3000, + socketTimeout: 0, + haInterval: 100, + size: 1 + } + ); + + function shutdown(err) { + replSet.destroy(); + done(err); + } + + test.primaryServer.setMessageHandler(request => { + try { + const doc = request.document; + + if (doc.ismaster) { + return request.reply(test.primaryStates[0]); + } + + if (doc[config.cmd]) { + if (config.shouldPass) { + expect(doc).to.have.property('txnNumber'); + } else { + expect(doc).to.not.have.property('txnNumber'); + } + + request.reply({ ok: 1 }); + + setTimeout(() => shutdown()); + } + } catch (e) { + return shutdown(e); + } + }); + + const sessionPool = new ServerSessionPool(replSet); + + replSet.on('connect', () => { + const options = {}; + + if (config.retryWrites) { + options.retryWrites = true; + } + + if (config.session) { + options.session = new ClientSession(replSet, sessionPool, {}, {}); + + if (config.transaction) { + options.session.startTransaction(); + } + } + + replSet[config.fnName](ns, config.arg, options, noop); + }); + + replSet.on('error', shutdown); + + replSet.connect(); + } + }); + }); +});