From bb0c52276eaff8c84f81b90241b037a473093d75 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Sat, 21 Apr 2018 19:10:47 -0400 Subject: [PATCH] feat(sdam-monitoring): add basic monitoring for new Topology type This adds an incomplete, but sufficient amount of SDAM monitoring to the new Topology type in order to pass the SDAM monitoring YAML tests. NODE-1259 --- lib/sdam/monitoring.js | 124 +++++++++++++++++++++++++ lib/sdam/topology.js | 142 ++++++++++++++++++++++++++++- lib/sdam/topology_description.js | 22 +++++ package.json | 1 + test/tests/unit/sdam_spec_tests.js | 120 +++++++++++++++++++++--- 5 files changed, 389 insertions(+), 20 deletions(-) create mode 100644 lib/sdam/monitoring.js diff --git a/lib/sdam/monitoring.js b/lib/sdam/monitoring.js new file mode 100644 index 000000000..255077038 --- /dev/null +++ b/lib/sdam/monitoring.js @@ -0,0 +1,124 @@ +'use strict'; + +/** + * Published when server description changes, but does NOT include changes to the RTT. + * + * @property {Object} topologyId A unique identifier for the topology + * @property {ServerAddress} address The address (host/port pair) of the server + * @property {ServerDescription} previousDescription The previous server description + * @property {ServerDescription} newDescription The new server description + */ +class ServerDescriptionChangedEvent { + constructor(topologyId, address, previousDescription, newDescription) { + Object.assign(this, { topologyId, address, previousDescription, newDescription }); + } +} + +/** + * Published when server is initialized. + * + * @property {Object} topologyId A unique identifier for the topology + * @property {ServerAddress} address The address (host/port pair) of the server + */ +class ServerOpeningEvent { + constructor(topologyId, address) { + Object.assign(this, { topologyId, address }); + } +} + +/** + * Published when server is closed. + * + * @property {ServerAddress} address The address (host/port pair) of the server + * @property {Object} topologyId A unique identifier for the topology + */ +class ServerClosedEvent { + constructor(topologyId, address) { + Object.assign(this, { topologyId, address }); + } +} + +/** + * Published when topology description changes. + * + * @property {Object} topologyId + * @property {TopologyDescription} previousDescription The old topology description + * @property {TopologyDescription} newDescription The new topology description + */ +class TopologyDescriptionChangedEvent { + constructor(topologyId, previousDescription, newDescription) { + Object.assign(this, { topologyId, previousDescription, newDescription }); + } +} + +/** + * Published when topology is initialized. + * + * @param {Object} topologyId A unique identifier for the topology + */ +class TopologyOpeningEvent { + constructor(topologyId) { + Object.assign(this, { topologyId }); + } +} + +/** + * Published when topology is closed. + * + * @param {Object} topologyId A unique identifier for the topology + */ +class TopologyClosedEvent { + constructor(topologyId) { + Object.assign(this, { topologyId }); + } +} + +/** + * Fired when the server monitor’s ismaster command is started - immediately before + * the ismaster command is serialized into raw BSON and written to the socket. + * + * @property {Object} connectionId The connection id for the command + */ +class ServerHeartbeatStartedEvent { + constructor(connectionId) { + Object.assign(this, { connectionId }); + } +} + +/** + * Fired when the server monitor’s ismaster succeeds. + * + * @param {Number} duration The execution time of the event + * @param {Object} reply The command reply + * @param {Object} connectionId The connection id for the command + */ +class ServerHeartbeatSucceededEvent { + constructor(duration, reply, connectionId) { + Object.assign(this, { duration, reply, connectionId }); + } +} + +/** + * Fired when the server monitor’s ismaster fails, either with an “ok: 0” or a socket exception. + * + * @param {Number} duration The execution time of the event + * @param {MongoError|Object} failure The command failure + * @param {Object} connectionId The connection id for the command + */ +class ServerHearbeatFailedEvent { + constructor(duration, failure, connectionId) { + Object.assign(this, { duration, failure, connectionId }); + } +} + +module.exports = { + ServerDescriptionChangedEvent, + ServerOpeningEvent, + ServerClosedEvent, + TopologyDescriptionChangedEvent, + TopologyOpeningEvent, + TopologyClosedEvent, + ServerHeartbeatStartedEvent, + ServerHeartbeatSucceededEvent, + ServerHearbeatFailedEvent +}; diff --git a/lib/sdam/topology.js b/lib/sdam/topology.js index 8a21ede58..886343a78 100644 --- a/lib/sdam/topology.js +++ b/lib/sdam/topology.js @@ -3,9 +3,23 @@ const EventEmitter = require('events'); const ServerDescription = require('./server_description').ServerDescription; const TopologyDescription = require('./topology_description').TopologyDescription; const TopologyType = require('./topology_description').TopologyType; +const monitoring = require('./monitoring'); + +// Global state +let globalTopologyCounter = 0; /** * A container of server instances representing a connection to a MongoDB topology. + * + * @fires Topology#serverOpening + * @fires Topology#serverClosed + * @fires Topology#serverDescriptionChanged + * @fires Topology#topologyOpening + * @fires Topology#topologyClosed + * @fires Topology#topologyDescriptionChanged + * @fires Topology#serverHeartbeatStarted + * @fires Topology#serverHeartbeatSucceeded + * @fires Topology#serverHeartbeatFailed */ class Topology extends EventEmitter { /** @@ -26,6 +40,7 @@ class Topology extends EventEmitter { ? TopologyType.ReplicaSetNoPrimary : TopologyType.Unknown; + const topologyId = globalTopologyCounter++; const serverDescriptions = seedlist.reduce((result, seed) => { const address = seed.port ? `${seed.host}:${seed.port}` : `${seed.host}:27017`; result[address] = new ServerDescription(address); @@ -33,6 +48,8 @@ class Topology extends EventEmitter { }, {}); this.s = { + // the id of this topology + id: topologyId, // passed in options options: Object.assign({}, options), // initial seedlist of servers to connect to @@ -58,11 +75,37 @@ class Topology extends EventEmitter { /** * Initiate server connect - * @method - * @param {array} [options.auth=null] Array of auth options to apply on connect + * + * @param {Object} [options] Optional settings + * @param {Array} [options.auth=null] Array of auth options to apply on connect */ connect(/* options */) { - return; + // emit SDAM monitoring events + this.emit('topologyOpening', new monitoring.TopologyOpeningEvent(this.s.id)); + + // emit an event for the topology change + this.emit( + 'topologyDescriptionChanged', + new monitoring.TopologyDescriptionChangedEvent( + this.s.id, + new TopologyDescription(TopologyType.Unknown), // initial is always Unknown + this.s.description + ) + ); + + // emit ServerOpeningEvents for each server in our topology + Object.keys(this.s.description.servers).forEach(serverAddress => { + // publish an open event for each ServerDescription created + this.emit('serverOpening', new monitoring.ServerOpeningEvent(this.s.id, serverAddress)); + }); + } + + /** + * Close this topology + */ + close() { + // emit an event for close + this.emit('topologyClosed', new monitoring.TopologyClosedEvent(this.s.id)); } /** @@ -81,10 +124,99 @@ class Topology extends EventEmitter { * @param {object} serverDescription the server to update */ update(serverDescription) { + // these will be used for monitoring events later + const previousTopologyDescription = this.s.description; + const previousServerDescription = this.s.description.servers[serverDescription.address]; + // first update the TopologyDescription this.s.description = this.s.description.update(serverDescription); + + // emit monitoring events for this change + this.emit( + 'serverDescriptionChanged', + new monitoring.ServerDescriptionChangedEvent( + this.s.id, + serverDescription.address, + previousServerDescription, + this.s.description.servers[serverDescription.address] + ) + ); + + this.emit( + 'topologyDescriptionChanged', + new monitoring.TopologyDescriptionChangedEvent( + this.s.id, + previousTopologyDescription, + this.s.description + ) + ); } } -module.exports.Topology = Topology; -module.exports.ServerDescription = ServerDescription; +/** + * A server opening SDAM monitoring event + * + * @event Topology#serverOpening + * @type {ServerOpeningEvent} + */ + +/** + * A server closed SDAM monitoring event + * + * @event Topology#serverClosed + * @type {ServerClosedEvent} + */ + +/** + * A server description SDAM change monitoring event + * + * @event Topology#serverDescriptionChanged + * @type {ServerDescriptionChangedEvent} + */ + +/** + * A topology open SDAM event + * + * @event Topology#topologyOpening + * @type {TopologyOpeningEvent} + */ + +/** + * A topology closed SDAM event + * + * @event Topology#topologyClosed + * @type {TopologyClosedEvent} + */ + +/** + * A topology structure SDAM change event + * + * @event Topology#topologyDescriptionChanged + * @type {TopologyDescriptionChangedEvent} + */ + +/** + * A topology serverHeartbeatStarted SDAM event + * + * @event Topology#serverHeartbeatStarted + * @type {ServerHeartbeatStartedEvent} + */ + +/** + * A topology serverHeartbeatFailed SDAM event + * + * @event Topology#serverHeartbeatFailed + * @type {ServerHearbeatFailedEvent} + */ + +/** + * A topology serverHeartbeatSucceeded SDAM change event + * + * @event Topology#serverHeartbeatSucceeded + * @type {ServerHeartbeatSucceededEvent} + */ + +module.exports = { + Topology, + ServerDescription +}; diff --git a/lib/sdam/topology_description.js b/lib/sdam/topology_description.js index dfbbebdeb..f7d76d857 100644 --- a/lib/sdam/topology_description.js +++ b/lib/sdam/topology_description.js @@ -2,6 +2,7 @@ const assert = require('assert'); const ServerType = require('./server_description').ServerType; const ServerDescription = require('./server_description').ServerDescription; +const ReadPreference = require('../topologies/read_preference'); // contstants related to compatability checks const MIN_SUPPORTED_SERVER_VERSION = '2.6'; @@ -195,6 +196,27 @@ class TopologyDescription { {} ); } + + /** + * Determines if the topology has a readable server available. See the table in the + * following section for behaviour rules. + * + * @param {ReadPreference} [readPreference] An optional read preference for determining if a readable server is present + * @return {Boolean} Whether there is a readable server in this topology + */ + hasReadableServer(/* readPreference */) { + // To be implemented when server selection is implemented + } + + /** + * Determines if the topology has a writable server available. See the table in the + * following section for behaviour rules. + * + * @return {Boolean} Whether there is a writable server in this topology + */ + hasWritableServer() { + return this.hasReadableServer(ReadPreference.primary); + } } function topologyTypeForServerType(serverType) { diff --git a/package.json b/package.json index 49c2f7b35..1b2ffb7a4 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ }, "devDependencies": { "chai": "^4.1.2", + "chai-subset": "^1.6.0", "co": "^4.6.0", "conventional-changelog-cli": "^1.3.5", "eslint": "^4.6.1", diff --git a/test/tests/unit/sdam_spec_tests.js b/test/tests/unit/sdam_spec_tests.js index 2618542d1..b97a6d8a5 100644 --- a/test/tests/unit/sdam_spec_tests.js +++ b/test/tests/unit/sdam_spec_tests.js @@ -1,10 +1,13 @@ 'use strict'; const fs = require('fs'); const path = require('path'); -const expect = require('chai').expect; +const chai = require('chai'); +const expect = chai.expect; const Topology = require('../../../lib/sdam/topology').Topology; const ServerDescription = require('../../../lib/sdam/server_description').ServerDescription; +const monitoring = require('../../../lib/sdam/monitoring'); const parse = require('../../../lib/uri_parser'); +chai.use(require('chai-subset')); const specDir = path.join(__dirname, '..', 'spec', 'server-discovery-and-monitoring'); function collectTests() { @@ -28,15 +31,23 @@ function collectTests() { return tests; } +const SKIPPED_TESTS = new Set([ + 'Monitoring a replica set with non member' // reenable once `Server` is integrated into new `Topology` +]); + describe('Server Discovery and Monitoring (spec)', function() { const specTests = collectTests(); Object.keys(specTests).forEach(specTestName => { - (specTestName === 'monitoring' ? describe.skip : describe)(specTestName, () => { + describe(specTestName, () => { specTests[specTestName].forEach(testData => { it(testData.description, { metadata: { requires: { topology: 'single' } }, test: function(done) { + if (SKIPPED_TESTS.has(testData.description)) { + return this.skip(); + } + executeSDAMTest(testData, done); } }); @@ -45,23 +56,97 @@ describe('Server Discovery and Monitoring (spec)', function() { }); }); -const OUTCOME_TRANSLATIONS = { - topologyType: 'type' -}; +const OUTCOME_TRANSLATIONS = new Map(); +OUTCOME_TRANSLATIONS.set('topologyType', 'type'); function translateOutcomeKey(key) { - if (OUTCOME_TRANSLATIONS.hasOwnProperty(key)) { - return OUTCOME_TRANSLATIONS[key]; + if (OUTCOME_TRANSLATIONS.has(key)) { + return OUTCOME_TRANSLATIONS.get(key); } return key; } +function convertOutcomeEvents(events) { + return events.map(event => { + const eventType = Object.keys(event)[0]; + const args = []; + Object.keys(event[eventType]).forEach(key => { + let argument = event[eventType][key]; + if (argument.servers) { + argument.servers = argument.servers.reduce((result, server) => { + result[server.address] = normalizeServerDescription(server); + return result; + }, {}); + } + + Object.keys(argument).forEach(key => { + if (OUTCOME_TRANSLATIONS.has(key)) { + argument[OUTCOME_TRANSLATIONS.get(key)] = argument[key]; + delete argument[key]; + } + }); + + args.push(argument); + }); + + // convert snake case to camelCase with capital first letter + let eventClass = eventType.replace(/_\w/g, c => c[1].toUpperCase()); + eventClass = eventClass.charAt(0).toUpperCase() + eventClass.slice(1); + args.unshift(null); + const eventConstructor = monitoring[eventClass]; + const eventInstance = new (Function.prototype.bind.apply(eventConstructor, args))(); + return eventInstance; + }); +} + +function replacePlaceholders(actual, expected) { + Object.keys(expected).forEach(key => { + if (expected[key] === 42 || expected[key] === '42') { + expect(actual).to.have.any.keys(key); + expect(actual[key]).to.exist; + actual[key] = expected[key]; + } + }); + + return actual; +} + +function normalizeServerDescription(serverDescription) { + if (serverDescription.type === 'PossiblePrimary') { + // Some single-threaded drivers care a lot about ordering potential primary + // servers, in order to speed up selection. We don't care, so we'll just mark + // it as `Unknown`. + serverDescription.type = 'Unknown'; + } + + return serverDescription; +} + function executeSDAMTest(testData, done) { parse(testData.uri, (err, parsedUri) => { if (err) return done(err); + // create the topology const topology = new Topology(parsedUri.hosts, parsedUri.options); + + // listen for SDAM monitoring events + const events = []; + [ + 'serverOpening', + 'serverClosed', + 'serverDescriptionChanged', + 'topologyOpening', + 'topologyClosed', + 'topologyDescriptionChanged', + 'serverHeartbeatStarted', + 'serverHeartbeatSucceeded', + 'serverHeartbeatFailed' + ].forEach(eventName => { + topology.on(eventName, event => events.push(event)); + }); + + // connect the topology topology.connect(testData.uri); testData.phases.forEach(phase => { @@ -83,21 +168,26 @@ function executeSDAMTest(testData, done) { Object.keys(expectedServers).forEach(serverName => { expect(actualServers).to.include.keys(serverName); - const expectedServer = expectedServers[serverName]; + const expectedServer = normalizeServerDescription(expectedServers[serverName]); const actualServer = actualServers[serverName]; - if (expectedServer.type === 'PossiblePrimary') { - // Some single-threaded drivers care a lot about ordering potential primary - // servers, in order to speed up selection. We don't care, so we'll just mark - // it as `Unknown`. - expectedServer.type = 'Unknown'; - } - expect(actualServer).to.deep.include(expectedServer); }); return; } + if (key === 'events') { + const expectedEvents = convertOutcomeEvents(outcomeValue); + expect(events).to.have.length(expectedEvents.length); + for (let i = 0; i < events.length; ++i) { + const expectedEvent = expectedEvents[i]; + const actualEvent = replacePlaceholders(events[i], expectedEvent); + expect(actualEvent).to.containSubset(expectedEvent); + } + + return; + } + expect(description).to.include.keys(translatedKey); expect(description[translatedKey]).to.eql(outcomeValue); });