Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: only store topology on MongoClient #2594

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type { Callback } from './utils';
import type { Document } from './bson';
import type { CommandOperationOptions } from './operations/command';
import type { Db } from './db';
import { getTopology } from './sdam/topology';

/** @internal */
export interface AdminPrivate {
Expand Down Expand Up @@ -80,7 +81,7 @@ export class Admin {
options = Object.assign({ dbName: 'admin' }, options);

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new RunCommandOperation(this.s.db, command, options),
callback
);
Expand Down Expand Up @@ -204,7 +205,7 @@ export class Admin {
options = Object.assign({ dbName: 'admin' }, options);

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new AddUserOperation(this.s.db, username, password, options),
callback
);
Expand All @@ -230,7 +231,7 @@ export class Admin {
options = Object.assign({ dbName: 'admin' }, options);

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new RemoveUserOperation(this.s.db, username, options),
callback
);
Expand Down Expand Up @@ -260,7 +261,7 @@ export class Admin {
options = options || {};

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new ValidateCollectionOperation(this, collectionName, options),
callback
);
Expand All @@ -284,7 +285,7 @@ export class Admin {
options = options || {};

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new ListDatabasesOperation(this.s.db, options),
callback
);
Expand Down
7 changes: 4 additions & 3 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { PromiseProvider } from '../promise_provider';
import { Long, ObjectId, Document } from '../bson';
import { MongoError, MongoWriteConcernError, AnyError } from '../error';
import { MongoError, MongoWriteConcernError, AnyError, MongoClientClosedError } from '../error';
import {
applyWriteConcern,
applyRetryableWrites,
Expand All @@ -16,7 +16,7 @@ import { UpdateOperation } from '../operations/update';
import { DeleteOperation } from '../operations/delete';
import { WriteConcern } from '../write_concern';
import type { Collection } from '../collection';
import type { Topology } from '../sdam/topology';
import { getTopology, Topology } from '../sdam/topology';
import type { CommandOperationOptions } from '../operations/command';
import type { CollationOptions } from '../cmap/wire_protocol/write_command';
import type { Hint } from '../operations/operation';
Expand Down Expand Up @@ -908,7 +908,8 @@ export abstract class BulkOperationBase {
// determine whether bulkOperation is ordered or unordered
this.isOrdered = isOrdered;

const topology = collection.s.topology;
const topology = getTopology(collection);
if (!topology) throw new MongoClientClosedError();
options = options == null ? {} : options;
// TODO Bring from driver information in isMaster
// Get the namespace for the write operations
Expand Down
13 changes: 7 additions & 6 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Denque = require('denque');
import { EventEmitter } from 'events';
import { MongoError, AnyError, isResumableError } from './error';
import { MongoError, AnyError, isResumableError, MongoClientClosedError } from './error';
import { Cursor, CursorOptions, CursorStream, CursorStreamOptions } from './cursor/cursor';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import { loadCollection, loadDb, loadMongoClient } from './dynamic_loaders';
Expand All @@ -15,7 +15,7 @@ import {
} from './utils';
import type { ReadPreference } from './read_preference';
import type { Timestamp, Document } from './bson';
import type { Topology } from './sdam/topology';
import { getTopology, Topology } from './sdam/topology';
import type { OperationParent } from './operations/command';
import type { CollationOptions } from './cmap/wire_protocol/write_command';
const kResumeQueue = Symbol('resumeQueue');
Expand Down Expand Up @@ -220,22 +220,23 @@ export class ChangeStream extends EventEmitter {

this.parent = parent;
this.namespace = parent.s.namespace;

if (parent instanceof Collection) {
this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
this.topology = parent.s.db.s.topology;
} else if (parent instanceof Db) {
this.type = CHANGE_DOMAIN_TYPES.DATABASE;
this.topology = parent.s.topology;
} else if (parent instanceof MongoClient) {
this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.topology = parent.topology!;
} else {
throw new TypeError(
'parent provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient'
);
}

const topology = getTopology(parent);
if (!topology) throw new MongoClientClosedError();
this.topology = topology;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be able to use getTopology to remove the need to attach a topology property on ChangeStream as well


if (!this.options.readPreference && parent.readPreference) {
this.options.readPreference = parent.readPreference;
}
Expand Down
Loading