Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
refactor(server-selection): add a monitoring loop into selection
Browse files Browse the repository at this point in the history
This also introduces a check in server selection for topology
connectedness, allowing us to remove the need for initial calls
to `connect` for topologies.
  • Loading branch information
mbroadst committed Aug 8, 2018
1 parent 30a394d commit 6d64b95
Showing 1 changed file with 42 additions and 5 deletions.
47 changes: 42 additions & 5 deletions lib/sdam/topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ let globalTopologyCounter = 0;
const TOPOLOGY_DEFAULTS = {
localThresholdMS: 15,
serverSelectionTimeoutMS: 10000,
heartbeatFrequencyMS: 30000
heartbeatFrequencyMS: 30000,
minHeartbeatIntervalMS: 500
};

/**
Expand Down Expand Up @@ -92,6 +93,7 @@ class Topology extends EventEmitter {
),
serverSelectionTimeoutMS: options.serverSelectionTimeoutMS,
heartbeatFrequencyMS: options.heartbeatFrequencyMS,
minHeartbeatIntervalMS: options.minHeartbeatIntervalMS,
// allow users to override the cursor factory
Cursor: options.cursorFactory || Cursor,
// the bson parser
Expand Down Expand Up @@ -158,6 +160,7 @@ class Topology extends EventEmitter {
);

connectServers(this, Array.from(this.s.description.servers.values()));
this.s.connected = true;
}

/**
Expand All @@ -170,6 +173,8 @@ class Topology extends EventEmitter {
// emit an event for close
this.emit('topologyClosed', new monitoring.TopologyClosedEvent(this.s.id));

this.s.connected = false;

if (typeof callback === 'function') {
callback(null, null);
}
Expand Down Expand Up @@ -394,7 +399,8 @@ function randomSelection(array) {
*
* @param {Topology} topology The topology to select servers from
* @param {function} selector The actual predicate used for selecting servers
* @param {object} options Optional settings
* @param {Number} timeout The max time we are willing wait for selection
* @param {Number} start A high precision timestamp for the start of the selection process
* @param {function} callback The callback used to convey errors or the resultant servers
*/
function selectServers(topology, selector, timeout, start, callback) {
Expand All @@ -419,10 +425,41 @@ function selectServers(topology, selector, timeout, start, callback) {
return callback(new MongoTimeoutError(`Server selection timed out after ${timeout} ms`));
}

// TODO: we need to kick off requests to each monitor to check immediately before looping
const retrySelection = () => {
// ensure all server monitors are attempting monitoring immediately
topology.s.servers.forEach(server => server.monitor());

const iterationTimer = setTimeout(() => {
callback(new MongoTimeoutError('Server selection timed out due to monitoring'));
}, topology.s.minHeartbeatIntervalMS);

topology.once('topologyDescriptionChanged', () => {
// successful iteration, clear the check timer
clearTimeout(iterationTimer);

// topology description has changed due to monitoring, reattempt server selection
selectServers(topology, selector, timeout, start, callback);
});
};

// ensure we are connected
if (!topology.s.connected) {
topology.connect();

// we want to make sure we're still within the requested timeout window
const failToConnectTimer = setTimeout(() => {
callback(new MongoTimeoutError('Server selection timed out waiting to connect'));
}, timeout - duration);

topology.once('connect', () => {
clearTimeout(failToConnectTimer);
retrySelection();
});

return;
}

// loop the server selection process
process.nextTick(() => selectServers(topology, selector, timeout, start, callback));
retrySelection();
}

/**
Expand Down

0 comments on commit 6d64b95

Please sign in to comment.