Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: only store topology on MongoClient #2594

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
} from './operations/list_databases';
import { executeOperation } from './operations/execute_operation';
import { RunCommandOperation, RunCommandOptions } from './operations/run_command';
import type { Callback } from './utils';
import { Callback, getTopology } from './utils';
import type { Document } from './bson';
import type { CommandOperationOptions } from './operations/command';
import type { Db } from './db';
Expand Down Expand Up @@ -80,7 +80,7 @@ export class Admin {
options = Object.assign({ dbName: 'admin' }, options);

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new RunCommandOperation(this.s.db, command, options),
callback
);
Expand Down Expand Up @@ -204,7 +204,7 @@ export class Admin {
options = Object.assign({ dbName: 'admin' }, options);

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new AddUserOperation(this.s.db, username, password, options),
callback
);
Expand All @@ -230,7 +230,7 @@ export class Admin {
options = Object.assign({ dbName: 'admin' }, options);

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new RemoveUserOperation(this.s.db, username, options),
callback
);
Expand Down Expand Up @@ -260,7 +260,7 @@ export class Admin {
options = options || {};

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new ValidateCollectionOperation(this, collectionName, options),
callback
);
Expand All @@ -284,7 +284,7 @@ export class Admin {
options = options || {};

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new ListDatabasesOperation(this.s.db, options),
callback
);
Expand Down
5 changes: 3 additions & 2 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import {
hasAtomicOperators,
Callback,
MongoDBNamespace,
maxWireVersion
maxWireVersion,
getTopology
} from '../utils';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
Expand Down Expand Up @@ -908,7 +909,7 @@ export abstract class BulkOperationBase {
// determine whether bulkOperation is ordered or unordered
this.isOrdered = isOrdered;

const topology = collection.s.topology;
const topology = getTopology(collection);
options = options == null ? {} : options;
// TODO Bring from driver information in isMaster
// Get the namespace for the write operations
Expand Down
27 changes: 11 additions & 16 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,25 @@ import { EventEmitter } from 'events';
import { MongoError, AnyError, isResumableError } from './error';
import { Cursor, CursorOptions, CursorStream, CursorStreamOptions } from './cursor/cursor';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import { loadCollection, loadDb, loadMongoClient } from './dynamic_loaders';
import {
relayEvents,
maxWireVersion,
calculateDurationInMs,
now,
maybePromise,
MongoDBNamespace,
Callback
Callback,
getTopology
} from './utils';
import type { ReadPreference } from './read_preference';
import type { Timestamp, Document } from './bson';
import type { Topology } from './sdam/topology';
import type { OperationParent } from './operations/command';
import type { CollationOptions } from './cmap/wire_protocol/write_command';
import { MongoClient } from './mongo_client';
import { Db } from './db';
import { Collection } from './collection';

const kResumeQueue = Symbol('resumeQueue');
const kCursorStream = Symbol('cursorStream');

Expand Down Expand Up @@ -171,10 +175,9 @@ export class ChangeStreamStream extends CursorStream {
export class ChangeStream extends EventEmitter {
pipeline: Document[];
options: ChangeStreamOptions;
parent: OperationParent;
parent: MongoClient | Db | Collection;
namespace: MongoDBNamespace;
type: symbol;
topology: Topology;
cursor?: ChangeStreamCursor;
closed: boolean;
streamOptions?: CursorStreamOptions;
Expand Down Expand Up @@ -211,31 +214,23 @@ export class ChangeStream extends EventEmitter {
) {
super();

const Collection = loadCollection();
const Db = loadDb();
const MongoClient = loadMongoClient();

this.pipeline = pipeline;
this.options = options;

this.parent = parent;
this.namespace = parent.s.namespace;
if (parent instanceof Collection) {
this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
this.topology = parent.s.db.s.topology;
} else if (parent instanceof Db) {
this.type = CHANGE_DOMAIN_TYPES.DATABASE;
this.topology = parent.s.topology;
} else if (parent instanceof MongoClient) {
this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.topology = parent.topology!;
} else {
throw new TypeError(
'parent provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient'
);
}

this.parent = parent;
this.namespace = parent.s.namespace;
if (!this.options.readPreference && parent.readPreference) {
this.options.readPreference = parent.readPreference;
}
Expand Down Expand Up @@ -479,7 +474,7 @@ function createChangeStreamCursor(
const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(self.pipeline);
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
const changeStreamCursor = new ChangeStreamCursor(
self.topology,
getTopology(self.parent),
new AggregateOperation(self.parent, pipeline, options),
cursorOptions
);
Expand Down Expand Up @@ -589,7 +584,7 @@ function processNewChange(
}

function processError(changeStream: ChangeStream, error: AnyError, callback?: Callback) {
const topology = changeStream.topology;
const topology = getTopology(changeStream.parent);
const cursor = changeStream.cursor;

// If the change stream has been closed explicitly, do not process error.
Expand Down
Loading