Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add withReadConcern builder to AbstractCursor #2645

Merged
merged 1 commit into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/reference/content/tutorials/crud.md
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,8 @@ collection.find({}).max(10) // Set the cursor
collection.find({}).maxTimeMS(1000) // Set the cursor maxTimeMS
collection.find({}).min(100) // Set the cursor min
collection.find({}).returnKey(10) // Set the cursor returnKey
collection.find({}).setReadPreference(ReadPreference.PRIMARY) // Set the cursor readPreference
collection.find({}).withReadPreference(ReadPreference.PRIMARY) // Set the cursor readPreference
collection.find({}).withReadConcern('majority') // Set the cursor readConcern
collection.find({}).showRecordId(true) // Set the cursor showRecordId
collection.find({}).sort([['a', 1]]) // Sets the sort order of the cursor query
collection.find({}).hint('a_1') // Set the cursor hint
Expand Down
29 changes: 28 additions & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { Topology } from '../sdam/topology';
import { Readable, Transform } from 'stream';
import { EventEmitter } from 'events';
import type { ExecutionResult } from '../operations/execute_operation';
import { ReadConcern, ReadConcernLike } from '../read_concern';

const kId = Symbol('id');
const kDocuments = Symbol('documents');
Expand Down Expand Up @@ -50,6 +51,7 @@ export type CursorFlag = typeof CURSOR_FLAGS[number];
export interface AbstractCursorOptions extends BSONSerializeOptions {
session?: ClientSession;
readPreference?: ReadPreferenceLike;
readConcern?: ReadConcernLike;
batchSize?: number;
maxTimeMS?: number;
comment?: Document | string;
Expand All @@ -62,6 +64,7 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
export type InternalAbstractCursorOptions = Omit<AbstractCursorOptions, 'readPreference'> & {
// resolved
readPreference: ReadPreference;
readConcern?: ReadConcern;

// cursor flags, some are deprecated
oplogReplay?: boolean;
Expand Down Expand Up @@ -107,6 +110,11 @@ export abstract class AbstractCursor extends EventEmitter {
...pluckBSONSerializeOptions(options)
};

const readConcern = ReadConcern.fromOptions(options);
if (readConcern) {
this[kOptions].readConcern = readConcern;
}

if (typeof options.batchSize === 'number') {
this[kOptions].batchSize = options.batchSize;
}
Expand Down Expand Up @@ -144,6 +152,10 @@ export abstract class AbstractCursor extends EventEmitter {
return this[kOptions].readPreference;
}

get readConcern(): ReadConcern | undefined {
return this[kOptions].readConcern;
}

get session(): ClientSession | undefined {
return this[kSession];
}
Expand Down Expand Up @@ -434,7 +446,7 @@ export abstract class AbstractCursor extends EventEmitter {
*
* @param readPreference - The new read preference for the cursor.
*/
setReadPreference(readPreference: ReadPreferenceLike): this {
withReadPreference(readPreference: ReadPreferenceLike): this {
assertUninitialized(this);
if (readPreference instanceof ReadPreference) {
this[kOptions].readPreference = readPreference;
Expand All @@ -447,6 +459,21 @@ export abstract class AbstractCursor extends EventEmitter {
return this;
}

/**
* Set the ReadPreference for the cursor.
*
* @param readPreference - The new read preference for the cursor.
*/
withReadConcern(readConcern: ReadConcernLike): this {
assertUninitialized(this);
const resolvedReadConcern = ReadConcern.fromOptions({ readConcern });
if (resolvedReadConcern) {
this[kOptions].readConcern = resolvedReadConcern;
}

return this;
}

/**
* Set a maxTimeMS on the cursor query, allowing for hard timeout limits on queries (Only supported on MongoDB 2.6 or higher)
*
Expand Down
2 changes: 1 addition & 1 deletion src/gridfs-stream/download.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ function init(stream: GridFSBucketReadStream): void {
stream.s.cursor = stream.s.chunks.find(filter).sort({ n: 1 });

if (stream.s.readPreference) {
stream.s.cursor.setReadPreference(stream.s.readPreference);
stream.s.cursor.withReadPreference(stream.s.readPreference);
}

stream.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
Expand Down
4 changes: 2 additions & 2 deletions src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ export class FindOperation extends CommandOperation<FindOptions, Document> {
findCommand.maxTimeMS = options.maxTimeMS;
}

if (this.readConcern && (!this.session || !this.session.inTransaction())) {
findCommand.readConcern = this.readConcern;
if (this.readConcern) {
findCommand.readConcern = this.readConcern.toJSON();
}

if (options.max) {
Expand Down
6 changes: 6 additions & 0 deletions src/read_concern.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { Document } from './bson';

/** @public */
export enum ReadConcernLevel {
local = 'local',
Expand Down Expand Up @@ -78,4 +80,8 @@ export class ReadConcern {
static get SNAPSHOT(): string {
return ReadConcernLevel.snapshot;
}

toJSON(): Document {
return { level: this.level };
}
}
4 changes: 2 additions & 2 deletions test/functional/apm.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ describe('APM', function () {
.batchSize(2)
.comment('some comment')
.maxTimeMS(5000)
.setReadPreference(ReadPreference.PRIMARY)
.withReadPreference(ReadPreference.PRIMARY)
.addCursorFlag('noCursorTimeout', true)
.toArray();
})
Expand Down Expand Up @@ -445,7 +445,7 @@ describe('APM', function () {
.batchSize(2)
.comment('some comment')
.maxTimeMS(5000)
.setReadPreference(ReadPreference.PRIMARY)
.withReadPreference(ReadPreference.PRIMARY)
.addCursorFlag('noCursorTimeout', true)
.toArray();
})
Expand Down
4 changes: 2 additions & 2 deletions test/functional/buffering_proxy.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ describe.skip('Buffering Proxy', function () {

db.collection('test')
.find({})
.setReadPreference(new ReadPreference(ReadPreference.SECONDARY))
.withReadPreference(new ReadPreference(ReadPreference.SECONDARY))
.toArray(function (err) {
expect(err).to.not.exist;
results.push('find');
Expand Down Expand Up @@ -439,7 +439,7 @@ describe.skip('Buffering Proxy', function () {

db.collection('test')
.find({})
.setReadPreference(new ReadPreference(ReadPreference.SECONDARY))
.withReadPreference(new ReadPreference(ReadPreference.SECONDARY))
.toArray(function (err) {
expect(err).to.not.exist;
results.push('find');
Expand Down
22 changes: 20 additions & 2 deletions test/functional/cursor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2321,19 +2321,37 @@ describe('Cursor', function () {
try {
db.collection('shouldFailToSetReadPreferenceOnCursor')
.find()
.setReadPreference('notsecondary');
.withReadPreference('notsecondary');
test.ok(false);
} catch (err) {} // eslint-disable-line

db.collection('shouldFailToSetReadPreferenceOnCursor')
.find()
.setReadPreference('secondary');
.withReadPreference('secondary');

done();
});
}
});

it('should allow setting the cursors readConcern through a builder', {
metadata: { requires: { mongodb: '>=3.2' } },
test: withMonitoredClient(['find'], function (client, events, done) {
const db = client.db(this.configuration.db);
const cursor = db.collection('foo').find().withReadConcern('local');
expect(cursor).property('readConcern').to.have.property('level').equal('local');

cursor.toArray(err => {
expect(err).to.not.exist;

expect(events).to.have.length(1);
const findCommand = events[0];
expect(findCommand).nested.property('command.readConcern').to.eql({ level: 'local' });
done();
});
})
});

it('shouldNotFailDueToStackOverflowEach', {
// Add a tag that our runner can trigger on
// in this case we are setting that node needs to be higher than 0.10.X to run
Expand Down
2 changes: 1 addition & 1 deletion test/functional/max_staleness.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ describe('Max Staleness', function () {
// Get a db with a new readPreference
db.collection('test')
.find({})
.setReadPreference(readPreference)
.withReadPreference(readPreference)
.toArray(function (err) {
expect(err).to.not.exist;
expect(test.checkCommand).to.eql({
Expand Down
16 changes: 8 additions & 8 deletions test/functional/readpreference.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -478,15 +478,15 @@ describe('ReadPreference', function () {
})
});

it('should set hedge using [.setReadPreference & empty hedge] ', {
it('should set hedge using [.withReadPreference & empty hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function (client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, { hedge: {} });
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.withReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: {} };
Expand All @@ -496,15 +496,15 @@ describe('ReadPreference', function () {
})
});

it('should set hedge using [.setReadPreference & enabled hedge] ', {
it('should set hedge using [.withReadPreference & enabled hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function (client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, { hedge: { enabled: true } });
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.withReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: { enabled: true } };
Expand All @@ -514,7 +514,7 @@ describe('ReadPreference', function () {
})
});

it('should set hedge using [.setReadPreference & disabled hedge] ', {
it('should set hedge using [.withReadPreference & disabled hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function (client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, {
Expand All @@ -524,7 +524,7 @@ describe('ReadPreference', function () {
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.withReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: { enabled: false } };
Expand All @@ -534,15 +534,15 @@ describe('ReadPreference', function () {
})
});

it('should set hedge using [.setReadPreference & undefined hedge] ', {
it('should set hedge using [.withReadPreference & undefined hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function (client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null);
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.withReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY };
Expand Down