diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 62661c04d7..0323b9ffac 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -78,8 +78,20 @@ export interface AbstractCursorOptions extends BSONSerializeOptions { session?: ClientSession; readPreference?: ReadPreferenceLike; readConcern?: ReadConcernLike; + /** + * Specifies the number of documents to return in each response from MongoDB + */ batchSize?: number; + /** + * When applicable `maxTimeMS` controls the amount of time the initial command + * that constructs a cursor should take. (ex. find, aggregate, listCollections) + */ maxTimeMS?: number; + /** + * When applicable `maxAwaitTimeMS` controls the amount of time subsequent getMores + * that a cursor uses to fetch more data should take. (ex. cursor.next()) + */ + maxAwaitTimeMS?: number; /** * Comment to apply to the operation. * @@ -89,7 +101,19 @@ export interface AbstractCursorOptions extends BSONSerializeOptions { * In server versions 4.4 and above, 'comment' can be any valid BSON type. */ comment?: unknown; + /** + * By default, MongoDB will automatically close a cursor when the + * client has exhausted all results in the cursor. However, for [capped collections](https://www.mongodb.com/docs/manual/core/capped-collections) + * you may use a Tailable Cursor that remains open after the client exhausts + * the results in the initial cursor. + */ tailable?: boolean; + /** + * If awaitData is set to true, when the cursor reaches the end of the capped collection, + * MongoDB blocks the query thread for a period of time waiting for new data to arrive. + * When new data is inserted into the capped collection, the blocked thread is signaled + * to wake up and return the next batch to the client. + */ awaitData?: boolean; noCursorTimeout?: boolean; } @@ -155,7 +179,7 @@ export abstract class AbstractCursor< } this[kClient] = client; this[kNamespace] = namespace; - this[kDocuments] = []; // TODO: https://github.com/microsoft/TypeScript/issues/36230 + this[kDocuments] = []; this[kInitialized] = false; this[kClosed] = false; this[kKilled] = false; @@ -186,6 +210,10 @@ export abstract class AbstractCursor< this[kOptions].maxTimeMS = options.maxTimeMS; } + if (typeof options.maxAwaitTimeMS === 'number') { + this[kOptions].maxAwaitTimeMS = options.maxAwaitTimeMS; + } + if (options.session instanceof ClientSession) { this[kSession] = options.session; } else { @@ -617,21 +645,8 @@ export abstract class AbstractCursor< /** @internal */ _getMore(batchSize: number, callback: Callback): void { - const cursorId = this[kId]; - const cursorNs = this[kNamespace]; - const server = this[kServer]; - - if (cursorId == null) { - callback(new MongoRuntimeError('Unable to iterate cursor with no id')); - return; - } - - if (server == null) { - callback(new MongoRuntimeError('Unable to iterate cursor without selected server')); - return; - } - - const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, { ...this[kOptions], session: this[kSession], batchSize diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts index 2d999e5ce2..c86c3f3685 100644 --- a/src/operations/get_more.ts +++ b/src/operations/get_more.ts @@ -39,7 +39,7 @@ export class GetMoreOperation extends AbstractOperation { cursorId: Long; override options: GetMoreOptions; - constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) { + constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions) { super(options); this.options = options; @@ -63,6 +63,10 @@ export class GetMoreOperation extends AbstractOperation { ); } + if (this.cursorId == null || this.cursorId.isZero()) { + return callback(new MongoRuntimeError('Unable to iterate cursor with no id')); + } + const collection = this.ns.collection; if (collection == null) { // Cursors should have adopted the namespace returned by MongoDB diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index be8c155006..d7abc7a003 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1111,7 +1111,7 @@ describe('Change Streams', function () { changeStream.next((err, doc) => { expect(err).to.exist; expect(doc).to.not.exist; - expect(err.message).to.equal('ChangeStream is closed'); + expect(err?.message).to.equal('ChangeStream is closed'); changeStream.close(() => client.close(done)); }); }); @@ -1372,23 +1372,139 @@ describe('Change Streams', function () { ) .run(); + UnifiedTestSuiteBuilder.describe('entity.watch() server-side options') + .runOnRequirement({ + topologies: ['replicaset', 'sharded-replicaset', 'sharded', 'load-balanced'], + minServerVersion: '4.4.0' + }) + .createEntities([ + { client: { id: 'client0', observeEvents: ['commandStartedEvent'] } }, + { database: { id: 'db0', client: 'client0', databaseName: 'watchOpts' } }, + { collection: { id: 'collection0', database: 'db0', collectionName: 'watchOpts' } } + ]) + .test( + TestBuilder.it( + 'should use maxAwaitTimeMS option to set maxTimeMS on getMore and should not set maxTimeMS on aggregate' + ) + .operation({ + object: 'collection0', + name: 'createChangeStream', + saveResultAsEntity: 'changeStreamOnClient', + arguments: { maxAwaitTimeMS: 5000 } + }) + .operation({ + name: 'insertOne', + object: 'collection0', + arguments: { document: { a: 1 } }, + ignoreResultAndError: true + }) + .operation({ + object: 'changeStreamOnClient', + name: 'iterateUntilDocumentOrError', + ignoreResultAndError: true + }) + .expectEvents({ + client: 'client0', + events: [ + { + commandStartedEvent: { + commandName: 'aggregate', + command: { maxTimeMS: { $$exists: false } } + } + }, + { commandStartedEvent: { commandName: 'insert' } }, + { commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 5000 } } } + ] + }) + .toJSON() + ) + .test( + TestBuilder.it( + 'should use maxTimeMS option to set maxTimeMS on aggregate and not set maxTimeMS on getMore' + ) + .operation({ + object: 'collection0', + name: 'createChangeStream', + saveResultAsEntity: 'changeStreamOnClient', + arguments: { maxTimeMS: 5000 } + }) + .operation({ + name: 'insertOne', + object: 'collection0', + arguments: { document: { a: 1 } }, + ignoreResultAndError: true + }) + .operation({ + object: 'changeStreamOnClient', + name: 'iterateUntilDocumentOrError', + ignoreResultAndError: true + }) + .expectEvents({ + client: 'client0', + ignoreExtraEvents: true, // Sharded clusters have extra getMores + events: [ + { commandStartedEvent: { commandName: 'aggregate', command: { maxTimeMS: 5000 } } }, + { commandStartedEvent: { commandName: 'insert' } }, + { + commandStartedEvent: { + commandName: 'getMore', + command: { maxTimeMS: { $$exists: false } } + } + } + ] + }) + .toJSON() + ) + .test( + TestBuilder.it( + 'should use maxTimeMS option to set maxTimeMS on aggregate and maxAwaitTimeMS option to set maxTimeMS on getMore' + ) + .operation({ + object: 'collection0', + name: 'createChangeStream', + saveResultAsEntity: 'changeStreamOnClient', + arguments: { maxTimeMS: 5000, maxAwaitTimeMS: 6000 } + }) + .operation({ + name: 'insertOne', + object: 'collection0', + arguments: { document: { a: 1 } }, + ignoreResultAndError: true + }) + .operation({ + object: 'changeStreamOnClient', + name: 'iterateUntilDocumentOrError', + ignoreResultAndError: true + }) + .expectEvents({ + client: 'client0', + ignoreExtraEvents: true, // Sharded clusters have extra getMores + events: [ + { commandStartedEvent: { commandName: 'aggregate', command: { maxTimeMS: 5000 } } }, + { commandStartedEvent: { commandName: 'insert' } }, + { commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 6000 } } } + ] + }) + .toJSON() + ) + .run(); + describe('BSON Options', function () { let client: MongoClient; let db: Db; let collection: Collection; let cs: ChangeStream; + beforeEach(async function () { client = await this.configuration.newClient({ monitorCommands: true }).connect(); db = client.db('db'); collection = await db.createCollection('collection'); }); + afterEach(async function () { await db.dropCollection('collection'); await cs.close(); await client.close(); - client = undefined; - db = undefined; - collection = undefined; }); context('promoteLongs', () => { @@ -1452,7 +1568,7 @@ describe('Change Streams', function () { it('does not send invalid options on the aggregate command', { metadata: { requires: { topology: '!single' } }, test: async function () { - const started = []; + const started: CommandStartedEvent[] = []; client.on('commandStarted', filterForCommands(['aggregate'], started)); const doc = { invalidBSONOption: true }; @@ -1473,7 +1589,7 @@ describe('Change Streams', function () { it('does not send invalid options on the getMore command', { metadata: { requires: { topology: '!single' } }, test: async function () { - const started = []; + const started: CommandStartedEvent[] = []; client.on('commandStarted', filterForCommands(['aggregate'], started)); const doc = { invalidBSONOption: true }; @@ -1503,7 +1619,7 @@ describe('ChangeStream resumability', function () { const changeStreamResumeOptions: ChangeStreamOptions = { fullDocument: 'updateLookup', collation: { locale: 'en', maxVariable: 'punct' }, - maxAwaitTimeMS: 20000, + maxAwaitTimeMS: 2000, batchSize: 200 }; diff --git a/test/integration/crud/maxTimeMS.test.ts b/test/integration/crud/maxTimeMS.test.ts new file mode 100644 index 0000000000..8bc0de587d --- /dev/null +++ b/test/integration/crud/maxTimeMS.test.ts @@ -0,0 +1,219 @@ +import { expect } from 'chai'; +import { inspect } from 'util'; + +import { + Collection, + CommandStartedEvent, + FindCursor, + MongoClient, + MongoCursorExhaustedError, + MongoServerError +} from '../../../src'; +import { getSymbolFrom } from '../../tools/utils'; + +describe('MaxTimeMS', function () { + let client: MongoClient; + let commandStartedEvents: CommandStartedEvent[]; + + beforeEach(async function () { + client = this.configuration.newClient({ monitorCommands: true }); + commandStartedEvents = []; + client.on('commandStarted', ev => commandStartedEvents.push(ev)); + }); + + afterEach(async function () { + commandStartedEvents = []; + await client.close(); + }); + + it('should correctly respect the maxTimeMS property on count', async function () { + const col = client.db().collection('max_time_ms'); + await col.insertMany([{ agg_pipe: 1 }], { writeConcern: { w: 1 } }); + const cursor = col.find({ $where: 'sleep(100) || true' }).maxTimeMS(50); + const kBuiltOptions = getSymbolFrom(cursor, 'builtOptions'); + expect(cursor[kBuiltOptions]).to.have.property('maxTimeMS', 50); + + const error = await cursor.count().catch(error => error); + expect(error).to.be.instanceOf(MongoServerError); + + const countCommandEvent = commandStartedEvents.find(ev => ev.commandName === 'count'); + expect(countCommandEvent).to.have.nested.property('command.maxTimeMS', 50); + }); + + it('should correctly respect the maxTimeMS property on toArray', async function () { + const col = client.db().collection('max_time_ms'); + await col.insertMany([{ agg_pipe: 1 }], { writeConcern: { w: 1 } }); + const cursor = col.find({ $where: 'sleep(100) || true' }).maxTimeMS(50); + const kBuiltOptions = getSymbolFrom(cursor, 'builtOptions'); + expect(cursor[kBuiltOptions]).to.have.property('maxTimeMS', 50); + + const error = await cursor.toArray().catch(error => error); + expect(error).to.be.instanceOf(MongoServerError); + + const findCommandEvent = commandStartedEvents.find(ev => ev.commandName === 'find'); + expect(findCommandEvent).to.have.nested.property('command.maxTimeMS', 50); + }); + + it('should correctly fail with maxTimeMS error', async function () { + const admin = client.db().admin(); + const col = client.db().collection('max_time_ms_5'); + + await col.insertMany([{ agg_pipe: 10 }], { writeConcern: { w: 1 } }); + + try { + const res = await admin.command({ + configureFailPoint: 'maxTimeAlwaysTimeOut', + mode: 'alwaysOn' + }); + expect(res).to.have.property('ok', 1); + + const error = await col + .find({}) + .maxTimeMS(10) + .toArray() + .catch(error => error); + + expect(error).to.be.instanceOf(MongoServerError); + } finally { + const res = await admin.command({ configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'off' }); + expect(res).to.have.property('ok', 1); + } + }); + + describe('awaitData, tailable, maxTimeMS, and maxAwaitTimeMS on cursors', () => { + const insertedDocs = [{ _id: 1 }]; + let client: MongoClient; + let cappedCollection: Collection<{ _id: number }>; + let cursor: FindCursor<{ _id: number }>; + let events: CommandStartedEvent[]; + + beforeEach(async function () { + client = this.configuration.newClient({ monitorCommands: true }); + await client + .db() + .dropCollection('cappedAt3') + .catch(() => null); + cappedCollection = await client + .db() + .createCollection('cappedAt3', { capped: true, size: 4096, max: 3 }); + cappedCollection.insertMany(insertedDocs); + + events = []; + client.on('commandStarted', event => + ['getMore', 'find'].includes(event.commandName) ? events.push(event) : null + ); + }); + + afterEach(async function () { + events = []; + await cursor?.close(); + await client?.close(); + }); + + const tailableValues = [true, false, undefined]; + const awaitDataValues = [true, false, undefined]; + const maxTimeMSValues = [100, 0, undefined]; + const maxAwaitTimeMSValues = [100, 0, undefined]; + + const tests = tailableValues.flatMap(tailable => + awaitDataValues.flatMap(awaitData => + maxAwaitTimeMSValues.flatMap(maxAwaitTimeMS => + maxTimeMSValues.flatMap(maxTimeMS => { + const awaitDataSet = Boolean(awaitData) === true; + const tailableSet = Boolean(tailable) === true; + const timeIsSetOnGetMore = typeof maxAwaitTimeMS === 'number'; + return [ + { + // Use JSON to drop explicit undefined + options: JSON.parse( + JSON.stringify({ tailable, awaitData, maxAwaitTimeMS, maxTimeMS }) + ), + outcome: { + // Cannot set 'awaitData' without also setting 'tailable' + isFindError: awaitDataSet && !tailableSet, + // cannot set maxTimeMS on getMore command for a non-awaitData cursor + isGetMoreError: timeIsSetOnGetMore && !awaitDataSet + } + } + ]; + }) + ) + ) + ); + + it('meta test: should setup test table correctly', () => { + expect(tests).to.have.lengthOf(81); + expect(tests.filter(t => t.outcome.isFindError)).to.have.lengthOf(18); + expect(tests.filter(t => t.outcome.isGetMoreError)).to.have.lengthOf(36); + expect( + tests.filter(t => { + return !t.outcome.isFindError && !t.outcome.isGetMoreError; + }) + ).to.have.lengthOf(27); + }); + + const metadata = { requires: { mongodb: '>=5', topology: ['replicaset'] as const } }; + for (const { options, outcome } of tests) { + let optionsString = inspect(options, { breakLength: Infinity }); + optionsString = optionsString.slice(1, optionsString.length - 1).trim(); + optionsString = optionsString === '' ? 'nothing set' : optionsString; + + // Each test runs the same find operation, but asserts different outcomes + const operation = async () => { + cursor = cappedCollection.find({ _id: { $gt: 0 } }, { ...options, batchSize: 1 }); + const findDocOrError: { _id: number } | Error = await cursor.next().catch(error => error); + const exhaustedByFind = !!cursor.id?.isZero(); + const getMoreDocOrError: { _id: number } | Error | null = await cursor + .tryNext() + .catch(error => error); + expect(events).to.have.length.of.at.least(1); // At least find must be sent + return { findDocOrError, exhaustedByFind, getMoreDocOrError }; + }; + + if (outcome.isFindError) { + it(`should error on find due to setting ${optionsString}`, metadata, async () => { + const { findDocOrError } = await operation(); + expect(findDocOrError).to.be.instanceOf(MongoServerError); + }); + } else if (outcome.isGetMoreError) { + it(`should error on getMore due to setting ${optionsString}`, metadata, async () => { + const { exhaustedByFind, getMoreDocOrError } = await operation(); + if (exhaustedByFind) { + expect(getMoreDocOrError).to.be.instanceOf(MongoCursorExhaustedError); + } else { + expect(getMoreDocOrError).to.be.instanceOf(MongoServerError); + } + }); + } else { + it(`should create find cursor with ${optionsString}`, metadata, async () => { + const { findDocOrError: findDoc, getMoreDocOrError: getMoreDoc } = await operation(); + + expect(findDoc).to.not.be.instanceOf(Error); + expect(getMoreDoc).to.not.be.instanceOf(Error); + + expect(findDoc).to.have.property('_id', 1); + + expect(events[0].command).to.be.an('object').that.has.a.property('find'); + const findCommand = events[0].command; + + if (typeof options.maxTimeMS === 'number') { + expect(findCommand).to.have.property('maxTimeMS', options.maxTimeMS); + } else { + expect(findCommand).to.not.have.property('maxTimeMS'); + } + + expect(getMoreDoc).to.be.null; + + expect(events[1].command).to.be.an('object').that.has.a.property('getMore'); + const getMoreCommand = events[1].command; + + if (typeof options.maxAwaitTimeMS === 'number') { + expect(getMoreCommand).to.have.property('maxTimeMS', options.maxAwaitTimeMS); + } else { + expect(getMoreCommand).to.not.have.property('maxTimeMS'); + } + }); + } + } + }); +}); diff --git a/test/integration/crud/maxtimems.test.js b/test/integration/crud/maxtimems.test.js deleted file mode 100644 index f7ca1acb4b..0000000000 --- a/test/integration/crud/maxtimems.test.js +++ /dev/null @@ -1,120 +0,0 @@ -'use strict'; -const { assert: test, setupDatabase } = require('../shared'); -const { expect } = require('chai'); - -describe('MaxTimeMS', function () { - before(function () { - return setupDatabase(this.configuration); - }); - - it('Should Correctly respect the maxtimeMs property on count', function (done) { - var configuration = this.configuration; - var client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - var db = client.db(configuration.db); - var col = db.collection('max_time_ms'); - - // Insert a couple of docs - var docs_1 = [{ agg_pipe: 1 }]; - - // Simple insert - col.insert(docs_1, { writeConcern: { w: 1 } }, function (err) { - expect(err).to.not.exist; - - // Execute a find command - col - .find({ $where: 'sleep(100) || true' }) - .maxTimeMS(50) - .count(function (err) { - test.ok(err != null); - client.close(done); - }); - }); - }); - }); - - it('Should Correctly respect the maxtimeMs property on toArray', { - metadata: { - requires: { - topology: ['single', 'replicaset'] - } - }, - - test: function (done) { - var configuration = this.configuration; - var client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - var db = client.db(configuration.db); - var col = db.collection('max_time_ms_2'); - - // Insert a couple of docs - var docs_1 = [{ agg_pipe: 1 }]; - - // Simple insert - col.insert(docs_1, { writeConcern: { w: 1 } }, function (err) { - expect(err).to.not.exist; - - // Execute a find command - col - .find({ $where: 'sleep(100) || true' }) - .maxTimeMS(50) - .toArray(function (err) { - test.ok(err != null); - client.close(done); - }); - }); - }); - } - }); - - it('Should Correctly fail with maxTimeMS error', { - // Add a tag that our runner can trigger on - // in this case we are setting that node needs to be higher than 0.10.X to run - metadata: { - requires: { - topology: ['single', 'replicaset'] - } - }, - - test: function (done) { - var configuration = this.configuration; - var client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - var db = client.db(configuration.db); - var col = db.collection('max_time_ms_5'); - - // Insert a couple of docs - var docs_1 = [{ agg_pipe: 10 }]; - - // Simple insert - col.insert(docs_1, { writeConcern: { w: 1 } }, function (err) { - expect(err).to.not.exist; - - db.admin().command( - { configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'alwaysOn' }, - function (err, result) { - expect(err).to.not.exist; - test.equal(1, result.ok); - - col - .find({}) - .maxTimeMS(10) - .toArray(function (err) { - test.ok(err != null); - - db.admin().command( - { configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'off' }, - function (err, result) { - expect(err).to.not.exist; - test.equal(1, result.ok); - client.close(done); - } - ); - }); - } - ); - }); - }); - } - }); -}); diff --git a/test/unit/operations/get_more.test.ts b/test/unit/operations/get_more.test.ts index c150aae800..c4802104c1 100644 --- a/test/unit/operations/get_more.test.ts +++ b/test/unit/operations/get_more.test.ts @@ -192,6 +192,54 @@ describe('GetMoreOperation', function () { } }); }); + + context('error cases', () => { + const server = new Server(new Topology([], {} as any), new ServerDescription(''), {} as any); + sinon.stub(server, 'command').yieldsRight(); + + it('should throw if the cursorId is undefined', async () => { + const getMoreOperation = new GetMoreOperation( + ns('db.collection'), + // @ts-expect-error: Testing undefined cursorId + undefined, + server, + options + ); + const error = await promisify(getMoreOperation.execute.bind(getMoreOperation))( + server, + undefined + ).catch(error => error); + expect(error).to.be.instanceOf(MongoRuntimeError); + }); + + it('should throw if the collection is undefined', async () => { + const getMoreOperation = new GetMoreOperation( + ns('db'), + Long.fromNumber(1), + server, + options + ); + const error = await promisify(getMoreOperation.execute.bind(getMoreOperation))( + server, + undefined + ).catch(error => error); + expect(error).to.be.instanceOf(MongoRuntimeError); + }); + + it('should throw if the cursorId is zero', async () => { + const getMoreOperation = new GetMoreOperation( + ns('db.collection'), + Long.fromNumber(0), + server, + options + ); + const error = await promisify(getMoreOperation.execute.bind(getMoreOperation))( + server, + undefined + ).catch(error => error); + expect(error).to.be.instanceOf(MongoRuntimeError); + }); + }); }); describe('#hasAspect', function () {