Skip to content

Commit

Permalink
fix(NODE-3515): do proper opTime merging in bulk results (#3012)
Browse files Browse the repository at this point in the history
  • Loading branch information
durran authored Oct 20, 2021
1 parent ee7f095 commit 43300c3
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 33 deletions.
73 changes: 40 additions & 33 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import { PromiseProvider } from '../promise_provider';
import { Long, ObjectId, Document, BSONSerializeOptions, resolveBSONOptions } from '../bson';
import {
Long,
ObjectId,
Document,
BSONSerializeOptions,
resolveBSONOptions,
Timestamp
} from '../bson';
import {
MongoWriteConcernError,
AnyError,
Expand Down Expand Up @@ -433,8 +440,13 @@ export class WriteError {
}
}

/** Converts the number to a Long or returns it. */
function longOrConvert(value: number | Long | Timestamp): Long | Timestamp {
return typeof value === 'number' ? Long.fromNumber(value) : value;
}

/** Merges results into shared data structure */
function mergeBatchResults(
export function mergeBatchResults(
batch: Batch,
bulkResult: BulkResult,
err?: AnyError,
Expand Down Expand Up @@ -469,42 +481,37 @@ function mergeBatchResults(
return;
}

// Deal with opTime if available
// The server write command specification states that lastOp is an optional
// mongod only field that has a type of timestamp. Across various scarce specs
// where opTime is mentioned, it is an "opaque" object that can have a "ts" and
// "t" field with Timestamp and Long as their types respectively.
// The "lastOp" field of the bulk write result is never mentioned in the driver
// specifications or the bulk write spec, so we should probably just keep its
// value consistent since it seems to vary.
// See: https://github.com/mongodb/specifications/blob/master/source/driver-bulk-update.rst#results-object
if (result.opTime || result.lastOp) {
const opTime = result.lastOp || result.opTime;
let lastOpTS = null;
let lastOpT = null;

// We have a time stamp
if (opTime && opTime._bsontype === 'Timestamp') {
if (bulkResult.opTime == null) {
bulkResult.opTime = opTime;
} else if (opTime.greaterThan(bulkResult.opTime)) {
bulkResult.opTime = opTime;
}
} else {
// Existing TS
if (bulkResult.opTime) {
lastOpTS =
typeof bulkResult.opTime.ts === 'number'
? Long.fromNumber(bulkResult.opTime.ts)
: bulkResult.opTime.ts;
lastOpT =
typeof bulkResult.opTime.t === 'number'
? Long.fromNumber(bulkResult.opTime.t)
: bulkResult.opTime.t;
}
let opTime = result.lastOp || result.opTime;

// Current OpTime TS
const opTimeTS = typeof opTime.ts === 'number' ? Long.fromNumber(opTime.ts) : opTime.ts;
const opTimeT = typeof opTime.t === 'number' ? Long.fromNumber(opTime.t) : opTime.t;
// If the opTime is a Timestamp, convert it to a consistent format to be
// able to compare easily. Converting to the object from a timestamp is
// much more straightforward than the other direction.
if (opTime._bsontype === 'Timestamp') {
opTime = { ts: opTime, t: Long.ZERO };
}

// Compare the opTime's
if (bulkResult.opTime == null) {
bulkResult.opTime = opTime;
} else if (opTimeTS.greaterThan(lastOpTS)) {
// If there's no lastOp, just set it.
if (!bulkResult.opTime) {
bulkResult.opTime = opTime;
} else {
// First compare the ts values and set if the opTimeTS value is greater.
const lastOpTS = longOrConvert(bulkResult.opTime.ts);
const opTimeTS = longOrConvert(opTime.ts);
if (opTimeTS.greaterThan(lastOpTS)) {
bulkResult.opTime = opTime;
} else if (opTimeTS.equals(lastOpTS)) {
// If the ts values are equal, then compare using the t values.
const lastOpT = longOrConvert(bulkResult.opTime.t);
const opTimeT = longOrConvert(opTime.t);
if (opTimeT.greaterThan(lastOpT)) {
bulkResult.opTime = opTime;
}
Expand Down
129 changes: 129 additions & 0 deletions test/unit/bulk/common.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
'use strict';

const { expect } = require('chai');
const { mergeBatchResults } = require('../../../src/bulk/common');
const { Timestamp, Long } = require('../../../src/bson');

describe('bulk/common', function () {
describe('#mergeBatchResults', function () {
let opTime;
let lastOp;
const bulkResult = {
ok: 1,
writeErrors: [],
writeConcernErrors: [],
insertedIds: [],
nInserted: 0,
nUpserted: 0,
nMatched: 0,
nModified: 0,
nRemoved: 1,
upserted: []
};
const result = {
n: 8,
nModified: 8,
electionId: '7fffffff0000000000000028',
ok: 1,
$clusterTime: {
clusterTime: '7020546605669417498',
signature: {
hash: 'AAAAAAAAAAAAAAAAAAAAAAAAAAA=',
keyId: 0
}
},
operationTime: '7020546605669417498'
};
const batch = [];

context('when lastOp is an object', function () {
context('when the opTime is a Timestamp', function () {
before(function () {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = Timestamp.fromNumber(8020546605669417496);
bulkResult.opTime = lastOp;
result.opTime = opTime;
});

it('replaces the opTime with the properly formatted object', function () {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.opTime).to.deep.equal({ ts: opTime, t: Long.ZERO });
});
});

context('when the opTime is an object', function () {
context('when the ts is greater', function () {
before(function () {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417497, t: 10 };
bulkResult.opTime = lastOp;
result.opTime = opTime;
});

it('replaces the opTime with the new opTime', function () {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.opTime).to.deep.equal(opTime);
});
});

context('when the ts is equal', function () {
context('when the t is greater', function () {
before(function () {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417496, t: 20 };
bulkResult.opTime = lastOp;
result.opTime = opTime;
});

it('replaces the opTime with the new opTime', function () {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.opTime).to.deep.equal(opTime);
});
});

context('when the t is equal', function () {
before(function () {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417496, t: 10 };
bulkResult.opTime = lastOp;
result.opTime = opTime;
});

it('does not replace the opTime with the new opTime', function () {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.opTime).to.deep.equal(lastOp);
});
});

context('when the t is less', function () {
before(function () {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417496, t: 5 };
bulkResult.opTime = lastOp;
result.opTime = opTime;
});

it('does not replace the opTime with the new opTime', function () {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.opTime).to.deep.equal(lastOp);
});
});
});

context('when the ts is less', function () {
before(function () {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417495, t: 10 };
bulkResult.opTime = lastOp;
result.opTime = opTime;
});

it('does not replace the opTime with the new opTime', function () {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.opTime).to.deep.equal(lastOp);
});
});
});
});
});
});

0 comments on commit 43300c3

Please sign in to comment.