From ccb39f0b537f04682763f1ade025c3c7dc528c86 Mon Sep 17 00:00:00 2001 From: Kwabena Ampofo Date: Thu, 12 May 2022 13:11:35 -0400 Subject: [PATCH 1/8] fix(NODE-4103): collection.watch doesn't respect promoteLongs option --- src/change_stream.ts | 5 +- .../change-streams/change_stream.test.ts | 103 +++++++++++++++++- 2 files changed, 104 insertions(+), 4 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index d48f4ccddc..5cc4c351e8 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -598,13 +598,12 @@ export class ChangeStream< } const pipeline = [{ $changeStream: changeStreamStageOptions }, ...this.pipeline]; - const cursorOptions: ChangeStreamCursorOptions = filterOptions(options, CURSOR_OPTIONS); - + // const cursorOptions: ChangeStreamCursorOptions = filterOptions(options, CURSOR_OPTIONS); const changeStreamCursor = new ChangeStreamCursor( getTopology(this.parent), this.namespace, pipeline, - cursorOptions + options ); for (const event of CHANGE_STREAM_EVENTS) { diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 139c0680ed..fdfd74629b 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1,13 +1,16 @@ import { strict as assert } from 'assert'; import { expect } from 'chai'; import * as crypto from 'crypto'; +import { once } from 'events'; import * as sinon from 'sinon'; import { PassThrough, Transform } from 'stream'; +import { promisify } from 'util'; import { ChangeStream, ChangeStreamOptions, Collection, + Db, Long, MongoClient, MongoNetworkError, @@ -22,7 +25,7 @@ import { TestBuilder, UnifiedTestSuiteBuilder } from '../../tools/utils'; -import { delay, setupDatabase, withClient, withCursor } from '../shared'; +import { delay, filterForCommands, setupDatabase, withClient, withCursor } from '../shared'; function withChangeStream( callback: (collection: Collection, changeStream: ChangeStream, done: Mocha.Done) => void @@ -1990,4 +1993,102 @@ describe('Change Streams', function () { .toJSON() ) .run(); + + describe.only('BSON Options', function () { + let client: MongoClient; + let db: Db; + let collection: Collection; + let cs: ChangeStream; + beforeEach(async function () { + client = await this.configuration + .newClient({ maxPoolSize: 1, monitorCommands: true }) + .connect(); + db = client.db('db'); + collection = db.collection('collection'); + }); + afterEach(async function () { + await cs.close(); + await client.close(); + client = undefined; + db = undefined; + collection = undefined; + }); + it('Should remain type long', async function () { + cs = collection.watch([], { promoteLongs: false }); + + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); + + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; + + const change = await willBeChange; + expect(change).to.have.nested.property('fullDocument.a').that.is.instanceOf(Long); + }); + + it('Should convert to number if promotelongs is true', async function () { + cs = collection.watch([], { promoteLongs: true }); + + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); + + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; + + const change = await willBeChange; + + expect(typeof change.fullDocument.a).to.equal('number'); + expect.fail('TEst 2 failing'); + }); + + it('Should convert to number if no options passed', async function () { + cs = collection.watch([]); + + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); + + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; + + const change = await willBeChange; + const test = typeof change.fullDocument.a; + console.log(typeof test, test, 'KOBBY-2'); + expect(typeof change.fullDocument.a).to.equal('number'); + // expect(change).to.have.nested.property('fullDocument.a').that.is.instanceOf('number'); + }); + + it.only('Should filter invalid options on aggregate command', async function () { + const started = []; + + client.on('commandStarted', filterForCommands(['aggregate'], started)); + const doc = { sdfsdf: true }; + cs = collection.watch([], doc); + + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); + + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; + + const change = await willBeChange; + expect(started[0].command).not.to.haveOwnProperty('sdfsdf'); + }); + + it('Should filter invalid options on getMore command', async function () { + const started = []; + + client.on('commandStarted', filterForCommands(['aggregate'], started)); + const doc = { sdfsdf: true }; + cs = collection.watch([], doc); + + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); + + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; + + const change = await willBeChange; + expect(started[0].command).not.to.haveOwnProperty('sdfsdf'); + }); + }); }); From 926d57a9393bad3ca5bd7bf4fc68ddbef07b12a3 Mon Sep 17 00:00:00 2001 From: Kwabena Ampofo Date: Mon, 16 May 2022 15:16:24 -0400 Subject: [PATCH 2/8] Remove comments, and .only --- test/integration/change-streams/change_stream.test.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index fdfd74629b..311b38425f 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -4,7 +4,6 @@ import * as crypto from 'crypto'; import { once } from 'events'; import * as sinon from 'sinon'; import { PassThrough, Transform } from 'stream'; -import { promisify } from 'util'; import { ChangeStream, @@ -1994,7 +1993,7 @@ describe('Change Streams', function () { ) .run(); - describe.only('BSON Options', function () { + describe('BSON Options', function () { let client: MongoClient; let db: Db; let collection: Collection; @@ -2052,12 +2051,10 @@ describe('Change Streams', function () { const change = await willBeChange; const test = typeof change.fullDocument.a; - console.log(typeof test, test, 'KOBBY-2'); expect(typeof change.fullDocument.a).to.equal('number'); - // expect(change).to.have.nested.property('fullDocument.a').that.is.instanceOf('number'); }); - it.only('Should filter invalid options on aggregate command', async function () { + it('Should filter invalid options on aggregate command', async function () { const started = []; client.on('commandStarted', filterForCommands(['aggregate'], started)); From f9e1c06b19c77e78c02f4b0b4f18fcc03856ff3d Mon Sep 17 00:00:00 2001 From: Kwabena Ampofo Date: Mon, 16 May 2022 16:13:57 -0400 Subject: [PATCH 3/8] remove commented out code --- src/change_stream.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 5cc4c351e8..0a2568baa6 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -598,7 +598,6 @@ export class ChangeStream< } const pipeline = [{ $changeStream: changeStreamStageOptions }, ...this.pipeline]; - // const cursorOptions: ChangeStreamCursorOptions = filterOptions(options, CURSOR_OPTIONS); const changeStreamCursor = new ChangeStreamCursor( getTopology(this.parent), this.namespace, From d9945c21e9a560849741572f17e961aae84db756 Mon Sep 17 00:00:00 2001 From: Kwabena Ampofo Date: Mon, 16 May 2022 16:56:05 -0400 Subject: [PATCH 4/8] Fix all linting errors --- test/integration/change-streams/change_stream.test.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 311b38425f..32886f7e9a 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2050,7 +2050,6 @@ describe('Change Streams', function () { expect(result).to.exist; const change = await willBeChange; - const test = typeof change.fullDocument.a; expect(typeof change.fullDocument.a).to.equal('number'); }); @@ -2067,7 +2066,7 @@ describe('Change Streams', function () { const result = await collection.insertOne({ a: Long.fromNumber(0) }); expect(result).to.exist; - const change = await willBeChange; + await willBeChange; expect(started[0].command).not.to.haveOwnProperty('sdfsdf'); }); @@ -2084,7 +2083,7 @@ describe('Change Streams', function () { const result = await collection.insertOne({ a: Long.fromNumber(0) }); expect(result).to.exist; - const change = await willBeChange; + await willBeChange; expect(started[0].command).not.to.haveOwnProperty('sdfsdf'); }); }); From 84780bfc57a4b0c25d62eadeee93735953a17e6f Mon Sep 17 00:00:00 2001 From: Kwabena Ampofo Date: Tue, 17 May 2022 15:30:02 -0400 Subject: [PATCH 5/8] Address all comments --- .../change-streams/change_stream.test.ts | 120 ++++++++++-------- 1 file changed, 67 insertions(+), 53 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 32886f7e9a..643b6960a0 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1999,9 +1999,7 @@ describe('Change Streams', function () { let collection: Collection; let cs: ChangeStream; beforeEach(async function () { - client = await this.configuration - .newClient({ maxPoolSize: 1, monitorCommands: true }) - .connect(); + client = await this.configuration.newClient({ monitorCommands: true }).connect(); db = client.db('db'); collection = db.collection('collection'); }); @@ -2012,79 +2010,95 @@ describe('Change Streams', function () { db = undefined; collection = undefined; }); - it('Should remain type long', async function () { - cs = collection.watch([], { promoteLongs: false }); - const willBeChange = once(cs, 'change').then(args => args[0]); - await once(cs.cursor, 'init'); + context('promoteLongs', () => { + context('when set to true', () => { + it('does not convert Longs to numbers', async function () { + cs = collection.watch([], { promoteLongs: true }); - const result = await collection.insertOne({ a: Long.fromNumber(0) }); - expect(result).to.exist; + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); - const change = await willBeChange; - expect(change).to.have.nested.property('fullDocument.a').that.is.instanceOf(Long); - }); + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; - it('Should convert to number if promotelongs is true', async function () { - cs = collection.watch([], { promoteLongs: true }); + const change = await willBeChange; - const willBeChange = once(cs, 'change').then(args => args[0]); - await once(cs.cursor, 'init'); + expect(typeof change.fullDocument.a).to.equal('number'); + }); + }); - const result = await collection.insertOne({ a: Long.fromNumber(0) }); - expect(result).to.exist; + context('when set to false', () => { + it('converts Long values to native numbers', async function () { + cs = collection.watch([], { promoteLongs: false }); - const change = await willBeChange; + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); - expect(typeof change.fullDocument.a).to.equal('number'); - expect.fail('TEst 2 failing'); - }); + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; + + const change = await willBeChange; + expect(change).to.have.nested.property('fullDocument.a').that.is.instanceOf(Long); + }); + }); - it('Should convert to number if no options passed', async function () { - cs = collection.watch([]); + context('when omitted', () => { + it('defaults to true', async function () { + cs = collection.watch([]); - const willBeChange = once(cs, 'change').then(args => args[0]); - await once(cs.cursor, 'init'); + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); - const result = await collection.insertOne({ a: Long.fromNumber(0) }); - expect(result).to.exist; + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; - const change = await willBeChange; - expect(typeof change.fullDocument.a).to.equal('number'); + const change = await willBeChange; + expect(typeof change.fullDocument.a).to.equal('number'); + }); + }); }); - it('Should filter invalid options on aggregate command', async function () { - const started = []; + context('invalid options', function () { + it('does not send invalid options on the aggregate command', { + metadata: { requires: { topology: '!single' } }, + test: async function () { + const started = []; - client.on('commandStarted', filterForCommands(['aggregate'], started)); - const doc = { sdfsdf: true }; - cs = collection.watch([], doc); + client.on('commandStarted', filterForCommands(['aggregate'], started)); + const doc = { invalidBSONOption: true }; + cs = collection.watch([], doc); - const willBeChange = once(cs, 'change').then(args => args[0]); - await once(cs.cursor, 'init'); + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); - const result = await collection.insertOne({ a: Long.fromNumber(0) }); - expect(result).to.exist; + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; - await willBeChange; - expect(started[0].command).not.to.haveOwnProperty('sdfsdf'); - }); + await willBeChange; + expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption'); + } + }); - it('Should filter invalid options on getMore command', async function () { - const started = []; + it('does not send invalid options on the getMore command', { + metadata: { requires: { topology: '!single' } }, + test: async function () { + const started = []; - client.on('commandStarted', filterForCommands(['aggregate'], started)); - const doc = { sdfsdf: true }; - cs = collection.watch([], doc); + client.on('commandStarted', filterForCommands(['aggregate'], started)); + const doc = { invalidBSONOption: true }; + cs = collection.watch([], doc); - const willBeChange = once(cs, 'change').then(args => args[0]); - await once(cs.cursor, 'init'); + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); - const result = await collection.insertOne({ a: Long.fromNumber(0) }); - expect(result).to.exist; + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; - await willBeChange; - expect(started[0].command).not.to.haveOwnProperty('sdfsdf'); + await willBeChange; + expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption'); + } + }); }); }); }); From a89eb1b793f356e119e0e93532b838f2d3f818ac Mon Sep 17 00:00:00 2001 From: Kwabena Ampofo Date: Wed, 18 May 2022 10:36:06 -0400 Subject: [PATCH 6/8] Add metadata to all BSON Options tests --- .../change-streams/change_stream.test.ts | 57 +++++++++++-------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 643b6960a0..b0e448b8a4 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2013,48 +2013,57 @@ describe('Change Streams', function () { context('promoteLongs', () => { context('when set to true', () => { - it('does not convert Longs to numbers', async function () { - cs = collection.watch([], { promoteLongs: true }); + it('does not convert Longs to numbers', { + metadata: { requires: { topology: '!single' } }, + test: async function () { + cs = collection.watch([], { promoteLongs: true }); - const willBeChange = once(cs, 'change').then(args => args[0]); - await once(cs.cursor, 'init'); + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); - const result = await collection.insertOne({ a: Long.fromNumber(0) }); - expect(result).to.exist; + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; - const change = await willBeChange; + const change = await willBeChange; - expect(typeof change.fullDocument.a).to.equal('number'); + expect(typeof change.fullDocument.a).to.equal('number'); + } }); }); context('when set to false', () => { - it('converts Long values to native numbers', async function () { - cs = collection.watch([], { promoteLongs: false }); + it('converts Long values to native numbers', { + metadata: { requires: { topology: '!single' } }, + test: async function () { + cs = collection.watch([], { promoteLongs: false }); - const willBeChange = once(cs, 'change').then(args => args[0]); - await once(cs.cursor, 'init'); + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); - const result = await collection.insertOne({ a: Long.fromNumber(0) }); - expect(result).to.exist; + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; - const change = await willBeChange; - expect(change).to.have.nested.property('fullDocument.a').that.is.instanceOf(Long); + const change = await willBeChange; + expect(change).to.have.nested.property('fullDocument.a').that.is.instanceOf(Long); + } }); }); context('when omitted', () => { - it('defaults to true', async function () { - cs = collection.watch([]); + it('defaults to true', { + metadata: { requires: { topology: '!single' } }, + test: async function () { + cs = collection.watch([]); - const willBeChange = once(cs, 'change').then(args => args[0]); - await once(cs.cursor, 'init'); + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); - const result = await collection.insertOne({ a: Long.fromNumber(0) }); - expect(result).to.exist; + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; - const change = await willBeChange; - expect(typeof change.fullDocument.a).to.equal('number'); + const change = await willBeChange; + expect(typeof change.fullDocument.a).to.equal('number'); + } }); }); }); From e41aeb0c4546298a83974218addd500ffd56df9f Mon Sep 17 00:00:00 2001 From: Kwabena Ampofo Date: Wed, 18 May 2022 13:49:25 -0400 Subject: [PATCH 7/8] Use db.createCollection to create collection, and db.dropCollection to remove collection after every test --- test/integration/change-streams/change_stream.test.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index b0e448b8a4..aee595ecf3 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1993,7 +1993,7 @@ describe('Change Streams', function () { ) .run(); - describe('BSON Options', function () { + describe.only('BSON Options', function () { let client: MongoClient; let db: Db; let collection: Collection; @@ -2001,9 +2001,10 @@ describe('Change Streams', function () { beforeEach(async function () { client = await this.configuration.newClient({ monitorCommands: true }).connect(); db = client.db('db'); - collection = db.collection('collection'); + collection = await db.createCollection('collection'); }); afterEach(async function () { + db.dropCollection('collection'); await cs.close(); await client.close(); client = undefined; From 21def293855a0b21aae4a2fd3b012ec92b5c9961 Mon Sep 17 00:00:00 2001 From: Kwabena Ampofo Date: Thu, 19 May 2022 13:01:38 -0400 Subject: [PATCH 8/8] Remove .only, await db.dropCollection --- test/integration/change-streams/change_stream.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index aee595ecf3..8dc15156bc 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1993,7 +1993,7 @@ describe('Change Streams', function () { ) .run(); - describe.only('BSON Options', function () { + describe('BSON Options', function () { let client: MongoClient; let db: Db; let collection: Collection; @@ -2004,7 +2004,7 @@ describe('Change Streams', function () { collection = await db.createCollection('collection'); }); afterEach(async function () { - db.dropCollection('collection'); + await db.dropCollection('collection'); await cs.close(); await client.close(); client = undefined;