Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4103): respect BSON options when creating change streams #3247

Merged
merged 9 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -598,13 +598,11 @@ export class ChangeStream<
}
const pipeline = [{ $changeStream: changeStreamStageOptions }, ...this.pipeline];

const cursorOptions: ChangeStreamCursorOptions = filterOptions(options, CURSOR_OPTIONS);

const changeStreamCursor = new ChangeStreamCursor<TSchema, TChange>(
getTopology(this.parent),
this.namespace,
pipeline,
cursorOptions
options
);

for (const event of CHANGE_STREAM_EVENTS) {
Expand Down
99 changes: 98 additions & 1 deletion test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { strict as assert } from 'assert';
import { expect } from 'chai';
import * as crypto from 'crypto';
import { once } from 'events';
import * as sinon from 'sinon';
import { PassThrough, Transform } from 'stream';

import {
ChangeStream,
ChangeStreamOptions,
Collection,
Db,
Long,
MongoClient,
MongoNetworkError,
Expand All @@ -22,7 +24,7 @@ import {
TestBuilder,
UnifiedTestSuiteBuilder
} from '../../tools/utils';
import { delay, setupDatabase, withClient, withCursor } from '../shared';
import { delay, filterForCommands, setupDatabase, withClient, withCursor } from '../shared';

function withChangeStream(
callback: (collection: Collection, changeStream: ChangeStream, done: Mocha.Done) => void
Expand Down Expand Up @@ -1990,4 +1992,99 @@ describe('Change Streams', function () {
.toJSON()
)
.run();

describe('BSON Options', function () {
let client: MongoClient;
let db: Db;
let collection: Collection;
let cs: ChangeStream;
beforeEach(async function () {
client = await this.configuration
.newClient({ maxPoolSize: 1, monitorCommands: true })
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
.connect();
db = client.db('db');
collection = db.collection('collection');
});
afterEach(async function () {
await cs.close();
await client.close();
client = undefined;
db = undefined;
collection = undefined;
});
it('Should remain type long', async function () {
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
cs = collection.watch([], { promoteLongs: false });

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

const change = await willBeChange;
expect(change).to.have.nested.property('fullDocument.a').that.is.instanceOf(Long);
});

it('Should convert to number if promotelongs is true', async function () {
cs = collection.watch([], { promoteLongs: true });

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

const change = await willBeChange;

expect(typeof change.fullDocument.a).to.equal('number');
expect.fail('TEst 2 failing');
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
});

it('Should convert to number if no options passed', async function () {
cs = collection.watch([]);

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

const change = await willBeChange;
expect(typeof change.fullDocument.a).to.equal('number');
});

it('Should filter invalid options on aggregate command', async function () {
const started = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { sdfsdf: true };
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
cs = collection.watch([], doc);

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

await willBeChange;
expect(started[0].command).not.to.haveOwnProperty('sdfsdf');
});

it('Should filter invalid options on getMore command', async function () {
const started = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { sdfsdf: true };
cs = collection.watch([], doc);

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

await willBeChange;
expect(started[0].command).not.to.haveOwnProperty('sdfsdf');
});
});
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
});