Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
refactor(cursor): merge and simplify nextFunction
Browse files Browse the repository at this point in the history
At first glance this just looks like its merging two functions into
a single, unweildy mega-function... which is sort of what it does.
But! This does a few things for us:

  - reduces an implicit required recursive call for every cursor
    by combining the initialization code with a call to the first
    find command

  - places the preparation of the find command (from WireProtocol)
    with the actual execution (`pool.write`), which allows us to
    better imagine moving those steps into the wireprotocol in
    lock-step. This is good because it takes us one step closer
    to actually having a consistent wire protocol api, and ceasing
    rogue command construction in the codebase

NODE-1785
  • Loading branch information
mbroadst committed Dec 10, 2018
1 parent 882064f commit f93309a
Showing 1 changed file with 149 additions and 181 deletions.
330 changes: 149 additions & 181 deletions lib/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,124 +209,6 @@ var handleCallback = function(callback, err, result) {
};

// Internal methods
Cursor.prototype._find = function(callback) {
var self = this;

if (self.logger.isDebug()) {
self.logger.debug(
f(
'issue initial query [%s] with flags [%s]',
JSON.stringify(self.cmd),
JSON.stringify(self.query)
)
);
}

var queryCallback = function(err, r) {
if (err) return callback(err);

// Get the raw message
var result = r.message;

// Query failure bit set
if (result.queryFailure) {
return callback(new MongoError(result.documents[0]), null);
}

// Check if we have a command cursor
if (
Array.isArray(result.documents) &&
result.documents.length === 1 &&
(!self.cmd.find || (self.cmd.find && self.cmd.virtual === false)) &&
(typeof result.documents[0].cursor !== 'string' ||
result.documents[0]['$err'] ||
result.documents[0]['errmsg'] ||
Array.isArray(result.documents[0].result))
) {
// We have a an error document return the error
if (result.documents[0]['$err'] || result.documents[0]['errmsg']) {
return callback(new MongoError(result.documents[0]), null);
}

// We have a cursor document
if (result.documents[0].cursor != null && typeof result.documents[0].cursor !== 'string') {
var id = result.documents[0].cursor.id;
// If we have a namespace change set the new namespace for getmores
if (result.documents[0].cursor.ns) {
self.ns = result.documents[0].cursor.ns;
}
// Promote id to long if needed
self.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id;
self.cursorState.lastCursorId = self.cursorState.cursorId;
self.cursorState.operationTime = result.documents[0].operationTime;
// If we have a firstBatch set it
if (Array.isArray(result.documents[0].cursor.firstBatch)) {
self.cursorState.documents = result.documents[0].cursor.firstBatch; //.reverse();
}

// Return after processing command cursor
return callback(null, result);
}

if (Array.isArray(result.documents[0].result)) {
self.cursorState.documents = result.documents[0].result;
self.cursorState.cursorId = Long.ZERO;
return callback(null, result);
}
}

// Otherwise fall back to regular find path
self.cursorState.cursorId = result.cursorId;
self.cursorState.documents = result.documents;
self.cursorState.lastCursorId = result.cursorId;

// Transform the results with passed in transformation method if provided
if (self.cursorState.transforms && typeof self.cursorState.transforms.query === 'function') {
self.cursorState.documents = self.cursorState.transforms.query(result);
}

// Return callback
callback(null, result);
};

// Options passed to the pool
var queryOptions = {};

// If we have a raw query decorate the function
if (self.options.raw || self.cmd.raw) {
// queryCallback.raw = self.options.raw || self.cmd.raw;
queryOptions.raw = self.options.raw || self.cmd.raw;
}

// Do we have documentsReturnedIn set on the query
if (typeof self.query.documentsReturnedIn === 'string') {
// queryCallback.documentsReturnedIn = self.query.documentsReturnedIn;
queryOptions.documentsReturnedIn = self.query.documentsReturnedIn;
}

// Add promote Long value if defined
if (typeof self.cursorState.promoteLongs === 'boolean') {
queryOptions.promoteLongs = self.cursorState.promoteLongs;
}

// Add promote values if defined
if (typeof self.cursorState.promoteValues === 'boolean') {
queryOptions.promoteValues = self.cursorState.promoteValues;
}

// Add promote values if defined
if (typeof self.cursorState.promoteBuffers === 'boolean') {
queryOptions.promoteBuffers = self.cursorState.promoteBuffers;
}

if (typeof self.cursorState.session === 'object') {
queryOptions.session = self.cursorState.session;
}

// Write the initial command out
self.server.s.pool.write(self.query, queryOptions, queryCallback);
};

Cursor.prototype._getmore = function(callback) {
if (this.logger.isDebug())
this.logger.debug(f('schedule getMore call for query [%s]', JSON.stringify(this.query)));
Expand Down Expand Up @@ -591,42 +473,7 @@ var nextFunction = function(self, callback) {
return initializeCursor(self, callback);
}

// If we don't have a cursorId execute the first query
if (self.cursorState.cursorId == null) {
// Check if pool is dead and return if not possible to
// execute the query against the db
if (isConnectionDead(self, callback)) return;

// Check if topology is destroyed
if (self.topology.isDestroyed())
return callback(
new MongoNetworkError('connection destroyed, not possible to instantiate cursor')
);

// query, cmd, options, cursorState, callback
self._find(function(err) {
if (err) return handleCallback(callback, err, null);

if (self.cursorState.cursorId && self.cursorState.cursorId.isZero() && self._endSession) {
self._endSession();
}

if (
self.cursorState.documents.length === 0 &&
self.cursorState.cursorId &&
self.cursorState.cursorId.isZero() &&
!self.cmd.tailable &&
!self.cmd.awaitData
) {
return setCursorNotified(self, callback);
}

nextFunction(self, callback);
});
} else if (
self.cursorState.limit > 0 &&
self.cursorState.currentLimit >= self.cursorState.limit
) {
if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
// Ensure we kill the cursor on the server
self.kill();
// Set cursor in dead and notified state
Expand Down Expand Up @@ -749,6 +596,41 @@ var nextFunction = function(self, callback) {
}
};

function buildFindCommandQueryOptions(cursor) {
const queryOptions = {};

// If we have a raw query decorate the function
if (cursor.options.raw || cursor.cmd.raw) {
queryOptions.raw = cursor.options.raw || cursor.cmd.raw;
}

// Do we have documentsReturnedIn set on the query
if (typeof cursor.query.documentsReturnedIn === 'string') {
queryOptions.documentsReturnedIn = cursor.query.documentsReturnedIn;
}

// Add promote Long value if defined
if (typeof cursor.cursorState.promoteLongs === 'boolean') {
queryOptions.promoteLongs = cursor.cursorState.promoteLongs;
}

// Add promote values if defined
if (typeof cursor.cursorState.promoteValues === 'boolean') {
queryOptions.promoteValues = cursor.cursorState.promoteValues;
}

// Add promote values if defined
if (typeof cursor.cursorState.promoteBuffers === 'boolean') {
queryOptions.promoteBuffers = cursor.cursorState.promoteBuffers;
}

if (typeof cursor.cursorState.session === 'object') {
queryOptions.session = cursor.cursorState.session;
}

return queryOptions;
}

function initializeCursor(cursor, callback) {
// Topology is not connected, save the call in the provided store to be
// Executed at some point when the handler deems it's reconnected
Expand Down Expand Up @@ -778,49 +660,135 @@ function initializeCursor(cursor, callback) {

return cursor.topology.selectServer(cursor.options, (err, server) => {
if (err) {
// Handle the error and add object to next method call
if (cursor.disconnectHandler != null) {
return cursor.disconnectHandler.addObjectAndMethod(
'cursor',
cursor,
'next',
[callback],
callback
);
const disconnectHandler = cursor.disconnectHandler;
if (disconnectHandler != null) {
return disconnectHandler.addObjectAndMethod('cursor', cursor, 'next', [callback], callback);
}

return callback(err);
}

cursor.server = server;

// Set as init
cursor.cursorState.init = true;

// error if collation not supported
if (collationNotSupported(cursor.server, cursor.cmd)) {
return callback(new MongoError(`server ${cursor.server.name} does not support collation`));
}

try {
cursor.query = cursor.server.wireProtocolHandler.command(
cursor.bson,
cursor.ns,
cursor.cmd,
cursor.cursorState,
cursor.topology,
cursor.options
);
cursor.query = cursor.server.wireProtocolHandler.command(
cursor.bson,
cursor.ns,
cursor.cmd,
cursor.cursorState,
cursor.topology,
cursor.options
);

if (cursor.query instanceof MongoError) {
return callback(cursor.query);
}

function done() {
if (
cursor.cursorState.cursorId &&
cursor.cursorState.cursorId.isZero() &&
cursor._endSession
) {
cursor._endSession();
}

if (cursor.query instanceof MongoError) {
return callback(cursor.query);
if (
cursor.cursorState.documents.length === 0 &&
cursor.cursorState.cursorId &&
cursor.cursorState.cursorId.isZero() &&
!cursor.cmd.tailable &&
!cursor.cmd.awaitData
) {
return setCursorNotified(cursor, callback);
}

// call `nextFunction` again now that we are initialized
nextFunction(cursor, callback);
} catch (err) {
return callback(err);
}

// NOTE: this is a special internal method for cloning a cursor, consider removing
if (cursor.cursorState.cursorId != null) {
return done();
}

if (cursor.logger.isDebug()) {
cursor.logger.debug(
`issue initial query [${JSON.stringify(cursor.cmd)}] with flags [${JSON.stringify(
cursor.query
)}]`
);
}

const queryOptions = buildFindCommandQueryOptions(cursor);
cursor.server.s.pool.write(cursor.query, queryOptions, (err, r) => {
if (err) return callback(err);

const result = r.message;
if (result.queryFailure) {
return callback(new MongoError(result.documents[0]), null);
}

// Check if we have a command cursor
if (
Array.isArray(result.documents) &&
result.documents.length === 1 &&
(!cursor.cmd.find || (cursor.cmd.find && cursor.cmd.virtual === false)) &&
(typeof result.documents[0].cursor !== 'string' ||
result.documents[0]['$err'] ||
result.documents[0]['errmsg'] ||
Array.isArray(result.documents[0].result))
) {
// We have an error document, return the error
if (result.documents[0]['$err'] || result.documents[0]['errmsg']) {
return callback(new MongoError(result.documents[0]), null);
}

// We have a cursor document
if (result.documents[0].cursor != null && typeof result.documents[0].cursor !== 'string') {
var id = result.documents[0].cursor.id;
// If we have a namespace change set the new namespace for getmores
if (result.documents[0].cursor.ns) {
cursor.ns = result.documents[0].cursor.ns;
}
// Promote id to long if needed
cursor.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id;
cursor.cursorState.lastCursorId = cursor.cursorState.cursorId;
cursor.cursorState.operationTime = result.documents[0].operationTime;
// If we have a firstBatch set it
if (Array.isArray(result.documents[0].cursor.firstBatch)) {
cursor.cursorState.documents = result.documents[0].cursor.firstBatch; //.reverse();
}

// Return after processing command cursor
return done(result);
}

if (Array.isArray(result.documents[0].result)) {
cursor.cursorState.documents = result.documents[0].result;
cursor.cursorState.cursorId = Long.ZERO;
return done(result);
}
}

// Otherwise fall back to regular find path
cursor.cursorState.cursorId = result.cursorId;
cursor.cursorState.documents = result.documents;
cursor.cursorState.lastCursorId = result.cursorId;

// Transform the results with passed in transformation method if provided
if (
cursor.cursorState.transforms &&
typeof cursor.cursorState.transforms.query === 'function'
) {
cursor.cursorState.documents = cursor.cursorState.transforms.query(result);
}

// Return callback
done(result);
});
});
}

Expand Down

0 comments on commit f93309a

Please sign in to comment.