Skip to content

Commit

Permalink
implement changes
Browse files Browse the repository at this point in the history
  • Loading branch information
HanaPearlman committed Nov 5, 2020
1 parent 884c551 commit 80d7565
Show file tree
Hide file tree
Showing 19 changed files with 115 additions and 181 deletions.
3 changes: 1 addition & 2 deletions src/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ import {
} from './operations/list_databases';
import { executeOperation } from './operations/execute_operation';
import { RunCommandOperation, RunCommandOptions } from './operations/run_command';
import type { Callback } from './utils';
import { Callback, getTopology } from './utils';
import type { Document } from './bson';
import type { CommandOperationOptions } from './operations/command';
import type { Db } from './db';
import { getTopology } from './sdam/topology';

/** @internal */
export interface AdminPrivate {
Expand Down
8 changes: 4 additions & 4 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import { PromiseProvider } from '../promise_provider';
import { Long, ObjectId, Document } from '../bson';
import { MongoError, MongoWriteConcernError, AnyError, MongoClientClosedError } from '../error';
import { MongoError, MongoWriteConcernError, AnyError } from '../error';
import {
applyWriteConcern,
applyRetryableWrites,
executeLegacyOperation,
hasAtomicOperators,
Callback,
MongoDBNamespace,
maxWireVersion
maxWireVersion,
getTopology
} from '../utils';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
import { UpdateOperation } from '../operations/update';
import { DeleteOperation } from '../operations/delete';
import { WriteConcern } from '../write_concern';
import type { Collection } from '../collection';
import { getTopology, Topology } from '../sdam/topology';
import type { Topology } from '../sdam/topology';
import type { CommandOperationOptions } from '../operations/command';
import type { CollationOptions } from '../cmap/wire_protocol/write_command';
import type { Hint } from '../operations/operation';
Expand Down Expand Up @@ -909,7 +910,6 @@ export abstract class BulkOperationBase {
this.isOrdered = isOrdered;

const topology = getTopology(collection);
if (!topology) throw new MongoClientClosedError();
options = options == null ? {} : options;
// TODO Bring from driver information in isMaster
// Get the namespace for the write operations
Expand Down
34 changes: 16 additions & 18 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
import Denque = require('denque');
import { EventEmitter } from 'events';
import { MongoError, AnyError, isResumableError, MongoClientClosedError } from './error';
import { MongoError, AnyError, isResumableError } from './error';
import { Cursor, CursorOptions, CursorStream, CursorStreamOptions } from './cursor/cursor';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import { loadCollection, loadDb, loadMongoClient } from './dynamic_loaders';
import {
relayEvents,
maxWireVersion,
calculateDurationInMs,
now,
maybePromise,
MongoDBNamespace,
Callback
Callback,
getTopology
} from './utils';
import type { ReadPreference } from './read_preference';
import type { Timestamp, Document } from './bson';
import { getTopology, Topology } from './sdam/topology';
import type { Topology } from './sdam/topology';
import type { OperationParent } from './operations/command';
import type { CollationOptions } from './cmap/wire_protocol/write_command';
import { MongoClient } from './mongo_client';
import { Db } from './db';
import { Collection } from './collection';

const kResumeQueue = Symbol('resumeQueue');
const kCursorStream = Symbol('cursorStream');

Expand Down Expand Up @@ -171,10 +175,9 @@ export class ChangeStreamStream extends CursorStream {
export class ChangeStream extends EventEmitter {
pipeline: Document[];
options: ChangeStreamOptions;
parent: OperationParent;
parent: MongoClient | Db | Collection;
namespace: MongoDBNamespace;
type: symbol;
topology: Topology;
cursor?: ChangeStreamCursor;
closed: boolean;
streamOptions?: CursorStreamOptions;
Expand Down Expand Up @@ -211,16 +214,13 @@ export class ChangeStream extends EventEmitter {
) {
super();

const Collection = loadCollection();
const Db = loadDb();
const MongoClient = loadMongoClient();
// const Collection = loadCollection();
// const Db = loadDb();
// const MongoClient = loadMongoClient();

this.pipeline = pipeline;
this.options = options;

this.parent = parent;
this.namespace = parent.s.namespace;

if (parent instanceof Collection) {
this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
} else if (parent instanceof Db) {
Expand All @@ -233,10 +233,8 @@ export class ChangeStream extends EventEmitter {
);
}

const topology = getTopology(parent);
if (!topology) throw new MongoClientClosedError();
this.topology = topology;

this.parent = parent;
this.namespace = parent.s.namespace;
if (!this.options.readPreference && parent.readPreference) {
this.options.readPreference = parent.readPreference;
}
Expand Down Expand Up @@ -480,7 +478,7 @@ function createChangeStreamCursor(
const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(self.pipeline);
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
const changeStreamCursor = new ChangeStreamCursor(
self.topology,
getTopology(self.parent),
new AggregateOperation(self.parent, pipeline, options),
cursorOptions
);
Expand Down Expand Up @@ -590,7 +588,7 @@ function processNewChange(
}

function processError(changeStream: ChangeStream, error: AnyError, callback?: Callback) {
const topology = changeStream.topology;
const topology = getTopology(changeStream.parent);
const cursor = changeStream.cursor;

// If the change stream has been closed explicitly, do not process error.
Expand Down
22 changes: 10 additions & 12 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import {
checkCollectionName,
deprecateOptions,
MongoDBNamespace,
Callback
Callback,
getTopology
} from './utils';
import { ObjectId, Document, BSONSerializeOptions, resolveBSONOptions } from './bson';
import { MongoError, MongoClientClosedError } from './error';
import { MongoError } from './error';
import { UnorderedBulkOperation } from './bulk/unordered';
import { OrderedBulkOperation } from './bulk/ordered';
import { ChangeStream, ChangeStreamOptions } from './change_stream';
Expand Down Expand Up @@ -86,7 +87,6 @@ import type { PkFactory } from './mongo_client';
import type { Logger, LoggerOptions } from './logger';
import type { OperationParent } from './operations/command';
import type { Sort } from './sort';
import { getTopology } from './sdam/topology';

/** @public */
export interface Collection {
Expand Down Expand Up @@ -646,10 +646,8 @@ export class Collection implements OperationParent {
throw new TypeError('`options` parameter must not be function');
}

const topology = getTopology(this);
if (!topology) throw new MongoClientClosedError('Collection.prototype.find');
return new Cursor(
topology,
getTopology(this),
new FindOperation(this, this.s.namespace, filter, options),
options
);
Expand Down Expand Up @@ -858,9 +856,11 @@ export class Collection implements OperationParent {
* @param options - Optional settings for the command
*/
listIndexes(options?: ListIndexesOptions): CommandCursor {
const topology = getTopology(this);
if (!topology) throw new MongoClientClosedError('Collection.prototype.listIndexes');
const cursor = new CommandCursor(topology, new ListIndexesOperation(this, options), options);
const cursor = new CommandCursor(
getTopology(this),
new ListIndexesOperation(this, options),
options
);

return cursor;
}
Expand Down Expand Up @@ -1204,10 +1204,8 @@ export class Collection implements OperationParent {

options = options || {};

const topology = getTopology(this);
if (!topology) throw new MongoClientClosedError('Collection.prototype.aggregate');
return new AggregationCursor(
topology,
getTopology(this),
new AggregateOperation(this, pipeline, options),
options
);
Expand Down
31 changes: 9 additions & 22 deletions src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { loadAdmin } from './dynamic_loaders';
import { AggregationCursor, CommandCursor } from './cursor';
import { ObjectId, Code, Document, BSONSerializeOptions, resolveBSONOptions } from './bson';
import { ReadPreference, ReadPreferenceLike } from './read_preference';
import { MongoClientClosedError, MongoError } from './error';
import { MongoError } from './error';
import { Collection, CollectionOptions } from './collection';
import { ChangeStream, ChangeStreamOptions } from './change_stream';
import * as CONSTANTS from './constants';
Expand All @@ -15,7 +15,8 @@ import {
filterOptions,
mergeOptionsAndWriteConcern,
deprecateOptions,
MongoDBNamespace
MongoDBNamespace,
getTopology
} from './utils';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import { AddUserOperation, AddUserOptions } from './operations/add_user';
Expand Down Expand Up @@ -53,7 +54,6 @@ import { executeOperation } from './operations/execute_operation';
import { EvalOperation, EvalOptions } from './operations/eval';
import type { IndexInformationOptions } from './operations/common_functions';
import type { MongoClient, PkFactory } from './mongo_client';
import { getTopology, Topology } from './sdam/topology';
import type { OperationParent } from './operations/command';
import type { Admin } from './admin';

Expand Down Expand Up @@ -148,11 +148,11 @@ export class Db implements OperationParent {
/**
* Creates a new Db instance
*
* @param databaseName - The name of the database this instance represents.
* @param client - The MongoClient for the database.
* @param databaseName - The name of the database this instance represents.
* @param options - Optional settings for Db construction
*/
constructor(databaseName: string, client: MongoClient, options?: DbOptions) {
constructor(client: MongoClient, databaseName: string, options?: DbOptions) {
options = options || {};
emitDeprecatedOptionWarning(options, ['promiseLibrary']);

Expand Down Expand Up @@ -193,11 +193,6 @@ export class Db implements OperationParent {
return this.s.namespace.db;
}

// Topology
get topology(): Topology | undefined {
return this.s.client.topology;
}

// Options
get options(): DbOptions | undefined {
return this.s.options;
Expand Down Expand Up @@ -314,10 +309,8 @@ export class Db implements OperationParent {

options = options || {};

const topology = getTopology(this);
if (!topology) throw new MongoClientClosedError('Db.prototype.aggregate');
const cursor = new AggregationCursor(
topology,
getTopology(this),
new AggregateOperation(this, pipeline, options),
options
);
Expand Down Expand Up @@ -381,8 +374,7 @@ export class Db implements OperationParent {
}

// Did the user destroy the topology
const topology = getTopology(this);
if (topology && topology.isDestroyed()) {
if (getTopology(this).isDestroyed()) {
return callback(new MongoError('topology was destroyed'));
}

Expand Down Expand Up @@ -437,10 +429,8 @@ export class Db implements OperationParent {
filter = filter || {};
options = options || {};

const topology = getTopology(this);
if (!topology) throw new MongoClientClosedError('Db.prototype.listCollections');
return new CommandCursor(
topology,
getTopology(this),
new ListCollectionsOperation(this, filter, options),
options
);
Expand Down Expand Up @@ -785,10 +775,7 @@ export class Db implements OperationParent {

/** Unref all sockets */
unref(): void {
const topology = getTopology(this);
if (topology) {
topology.unref();
}
getTopology(this).unref();
}

/**
Expand Down
13 changes: 0 additions & 13 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,6 @@ export function isNetworkErrorBeforeHandshake(err: MongoNetworkError): boolean {
return err[kBeforeHandshake] === true;
}

/**
* An error thrown when an operation requiring a connected client was called on a closed client.
* @public
* @category Error
*/
export class MongoClientClosedError extends MongoError {
constructor(message?: string) {
message = `MongoClient must be connected to perform this operation ${message}`;
super(message);
this.name = 'MongoClientClosedError';
}
}

/**
* An error indicating an issue with the network, including TCP errors and timeouts.
* @public
Expand Down
3 changes: 1 addition & 2 deletions src/gridfs-stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
GridFSBucketReadStreamOptionsWithRevision
} from './download';
import { GridFSBucketWriteStream, GridFSBucketWriteStreamOptions, TFileId } from './upload';
import { executeLegacyOperation, Callback } from '../utils';
import { executeLegacyOperation, Callback, getTopology } from '../utils';
import { WriteConcernOptions, WriteConcern } from '../write_concern';
import type { Document } from '../bson';
import type { Db } from '../db';
Expand All @@ -16,7 +16,6 @@ import type { Cursor } from './../cursor/cursor';
import type { FindOptions } from './../operations/find';
import type { Sort } from '../sort';
import type { Logger } from '../logger';
import { getTopology } from '../sdam/topology';

const DEFAULT_GRIDFS_BUCKET_OPTIONS: {
bucketName: string;
Expand Down
8 changes: 5 additions & 3 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Db, DbOptions } from './db';
import { EventEmitter } from 'events';
import { ChangeStream, ChangeStreamOptions } from './change_stream';
import { ReadPreference, ReadPreferenceMode } from './read_preference';
import { MongoError, AnyError, MongoClientClosedError } from './error';
import { MongoError, AnyError } from './error';
import { WriteConcern, WriteConcernOptions } from './write_concern';
import { maybePromise, MongoDBNamespace, Callback } from './utils';
import { deprecate } from 'util';
Expand Down Expand Up @@ -403,10 +403,12 @@ export class MongoClient extends EventEmitter implements OperationParent {
}

// If no topology throw an error message
if (!this.topology) throw new MongoClientClosedError('MongoClient.prototype.db');
if (!this.topology) {
throw new MongoError('MongoClient must be connected before calling MongoClient.prototype.db');
}

// Return the db object
const db = new Db(dbName, this, finalOptions);
const db = new Db(this, dbName, finalOptions);

// Add the db to the cache
this.s.dbCache.set(dbName, db);
Expand Down
9 changes: 3 additions & 6 deletions src/operations/add_user.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import * as crypto from 'crypto';
import { Aspect, defineAspects } from './operation';
import { CommandOperation, CommandOperationOptions } from './command';
import { MongoClientClosedError, MongoError } from '../error';
import type { Callback } from '../utils';
import { MongoError } from '../error';
import { Callback, getTopology } from '../utils';
import type { Document } from '../bson';
import type { Server } from '../sdam/server';
import type { Db } from '../db';
import { getTopology } from '../sdam/topology';

/** @public */
export interface AddUserOptions extends CommandOperationOptions {
Expand Down Expand Up @@ -75,9 +74,7 @@ export class AddUserOperation extends CommandOperation<AddUserOptions, Document>
roles = ['dbOwner'];
}

const topology = getTopology(db);
if (!topology) return callback(new MongoClientClosedError());
const digestPassword = topology.lastIsMaster().maxWireVersion >= 7;
const digestPassword = getTopology(db).lastIsMaster().maxWireVersion >= 7;

let userPassword = password;

Expand Down
Loading

0 comments on commit 80d7565

Please sign in to comment.