From 71f3312763147a1ccca4b87cbd863f27a65ec28b Mon Sep 17 00:00:00 2001 From: chrisronline Date: Thu, 6 Feb 2020 14:25:39 -0500 Subject: [PATCH 1/4] Support shipping directly to the monitoring cluster --- .../plugins/monitoring/common/constants.ts | 2 +- x-pack/legacy/plugins/monitoring/config.js | 41 +++++++++ x-pack/legacy/plugins/monitoring/index.js | 6 +- .../es_client/parse_elasticsearch_config.ts | 9 +- .../__tests__/bulk_uploader.js | 89 +++++++++++++++++++ .../server/kibana_monitoring/bulk_uploader.js | 27 +++++- .../lib/send_bulk_payload.js | 46 +++++++++- .../plugins/monitoring/server/plugin.js | 17 ++-- 8 files changed, 219 insertions(+), 18 deletions(-) diff --git a/x-pack/legacy/plugins/monitoring/common/constants.ts b/x-pack/legacy/plugins/monitoring/common/constants.ts index 53764f592dc15..db1456ce67155 100644 --- a/x-pack/legacy/plugins/monitoring/common/constants.ts +++ b/x-pack/legacy/plugins/monitoring/common/constants.ts @@ -17,7 +17,7 @@ export const KIBANA_MONITORING_LOGGING_TAG = 'kibana-monitoring'; * The Monitoring API version is the expected API format that we export and expect to import. * @type {string} */ -export const MONITORING_SYSTEM_API_VERSION = '6'; +export const MONITORING_SYSTEM_API_VERSION = '7'; /** * The type name used within the Monitoring index to publish Kibana ops stats. * @type {string} diff --git a/x-pack/legacy/plugins/monitoring/config.js b/x-pack/legacy/plugins/monitoring/config.js index 778b656c056f2..2a885168f38fe 100644 --- a/x-pack/legacy/plugins/monitoring/config.js +++ b/x-pack/legacy/plugins/monitoring/config.js @@ -81,6 +81,47 @@ export const config = Joi => { interval: Joi.number().default(10000), // op status metrics get buffered at `ops.interval` and flushed to the bulk endpoint at this interval }).default(), }).default(), + elasticsearch: Joi.object({ + customHeaders: Joi.object().default({}), + logQueries: Joi.boolean().default(false), + requestHeadersWhitelist: Joi.array() + .items() + .single() + .default(DEFAULT_REQUEST_HEADERS), + sniffOnStart: Joi.boolean().default(false), + sniffInterval: Joi.number() + .allow(false) + .default(false), + sniffOnConnectionFault: Joi.boolean().default(false), + hosts: Joi.array() + .items(Joi.string().uri({ scheme: ['http', 'https'] })) + .single(), // if empty, use Kibana's connection config + username: Joi.string(), + password: Joi.string(), + requestTimeout: Joi.number().default(30000), + pingTimeout: Joi.number().default(30000), + ssl: Joi.object({ + verificationMode: Joi.string() + .valid('none', 'certificate', 'full') + .default('full'), + certificateAuthorities: Joi.array() + .single() + .items(Joi.string()), + certificate: Joi.string(), + key: Joi.string(), + keyPassphrase: Joi.string(), + keystore: Joi.object({ + path: Joi.string(), + password: Joi.string(), + }).default(), + truststore: Joi.object({ + path: Joi.string(), + password: Joi.string(), + }).default(), + alwaysPresentCertificate: Joi.boolean().default(false), + }).default(), + apiVersion: Joi.string().default('master'), + }).default(), cluster_alerts: Joi.object({ enabled: Joi.boolean().default(true), email_notifications: Joi.object({ diff --git a/x-pack/legacy/plugins/monitoring/index.js b/x-pack/legacy/plugins/monitoring/index.js index ade172f527dab..7008b8d72f7e6 100644 --- a/x-pack/legacy/plugins/monitoring/index.js +++ b/x-pack/legacy/plugins/monitoring/index.js @@ -27,7 +27,7 @@ export const monitoring = kibana => id: 'monitoring', configPrefix: 'monitoring', publicDir: resolve(__dirname, 'public'), - init(server) { + async init(server) { const configs = [ 'monitoring.ui.enabled', 'monitoring.kibana.collection.enabled', @@ -41,6 +41,8 @@ export const monitoring = kibana => 'monitoring.kibana.collection.interval', 'monitoring.ui.elasticsearch.hosts', 'monitoring.ui.elasticsearch', + 'monitoring.elasticsearch.hosts', + 'monitoring.elasticsearch', 'monitoring.xpack_api_polling_frequency_millis', 'server.uuid', 'server.name', @@ -85,7 +87,7 @@ export const monitoring = kibana => }; const plugin = new Plugin(); - plugin.setup(serverFacade, plugins); + await plugin.setup(serverFacade, plugins); }, config, deprecations, diff --git a/x-pack/legacy/plugins/monitoring/server/es_client/parse_elasticsearch_config.ts b/x-pack/legacy/plugins/monitoring/server/es_client/parse_elasticsearch_config.ts index 728b3433bf06c..87b225e48c158 100644 --- a/x-pack/legacy/plugins/monitoring/server/es_client/parse_elasticsearch_config.ts +++ b/x-pack/legacy/plugins/monitoring/server/es_client/parse_elasticsearch_config.ts @@ -16,10 +16,13 @@ const KEY = 'monitoring.ui.elasticsearch'; * TODO: this code can be removed when this plugin is migrated to the Kibana Platform, * at that point the ElasticsearchClient and ElasticsearchConfig should be used instead */ -export const parseElasticsearchConfig = (config: any) => { - const es = config.get(KEY); +export const parseElasticsearchConfig = (config: any, configKey: string = KEY) => { + const es = config.get(configKey); + if (!es) { + return {}; + } - const errorPrefix = `[config validation of [${KEY}].ssl]`; + const errorPrefix = `[config validation of [${configKey}].ssl]`; if (es.ssl?.key && es.ssl?.keystore?.path) { throw new Error(`${errorPrefix}: cannot use [key] when [keystore.path] is specified`); } diff --git a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js index ef7d3f1224fab..826b4756f92d8 100644 --- a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js +++ b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js @@ -6,8 +6,10 @@ import { noop } from 'lodash'; import sinon from 'sinon'; +import moment from 'moment'; import expect from '@kbn/expect'; import { BulkUploader } from '../bulk_uploader'; +import { MONITORING_SYSTEM_API_VERSION } from '../../../common/constants'; const FETCH_INTERVAL = 300; const CHECK_DELAY = 500; @@ -52,6 +54,9 @@ describe('BulkUploader', () => { server = { log: sinon.spy(), + config: { + get: sinon.spy(), + }, elasticsearchPlugin: { createCluster: () => cluster, getCluster: () => cluster, @@ -307,5 +312,89 @@ describe('BulkUploader', () => { done(); }, CHECK_DELAY); }); + + it('uses a direct connection to the monitoring cluster, when configured', done => { + const prodClusterUuid = '1sdfd5'; + const prodCluster = { + callWithInternalUser: sinon + .stub() + .withArgs('monitoring.bulk') + .callsFake(arg => { + let resolution = null; + if (arg === 'info') { + resolution = { cluster_uuid: prodClusterUuid }; + } + return new Promise(resolve => resolve(resolution)); + }), + }; + const monitoringCluster = { + callWithInternalUser: sinon + .stub() + .withArgs('bulk') + .callsFake(() => { + return new Promise(resolve => setTimeout(resolve, CHECK_DELAY + 1)); + }), + }; + + const collectorFetch = sinon.stub().returns({ + type: 'kibana_stats', + result: { type: 'kibana_stats', payload: { testData: 12345 } }, + }); + + const collectors = new MockCollectorSet(server, [ + { + fetch: collectorFetch, + isReady: () => true, + formatForBulkUpload: result => result, + isUsageCollector: false, + }, + ]); + const customServer = { + ...server, + elasticsearchPlugin: { + createCluster: () => monitoringCluster, + getCluster: name => { + if (name === 'admin' || name === 'data') { + return prodCluster; + } + return monitoringCluster; + }, + }, + config: { + get: key => { + if (key === 'monitoring.elasticsearch') { + return { + hosts: ['http://localhost:9200'], + username: 'tester', + password: 'testing', + ssl: {}, + }; + } + return null; + }, + }, + }; + const kbnServerStatus = { toJSON: () => ({ overall: { state: 'green' } }) }; + const kbnServerVersion = 'master'; + const uploader = new BulkUploader({ + ...customServer, + interval: FETCH_INTERVAL, + kbnServerStatus, + kbnServerVersion, + }); + uploader.start(collectors); + setTimeout(() => { + uploader.stop(); + const firstCallArgs = monitoringCluster.callWithInternalUser.firstCall.args; + + expect(firstCallArgs[0]).to.be('bulk'); + expect(firstCallArgs[1].body[0].index._index).to.be( + `.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${moment().format('YYYY.MM.DD')}` + ); + expect(firstCallArgs[1].body[1].type).to.be('kibana_stats'); + expect(firstCallArgs[1].body[1].cluster_uuid).to.be(prodClusterUuid); + done(); + }, CHECK_DELAY); + }); }); }); diff --git a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js index cf68ec073bebc..23586ebe07138 100644 --- a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js +++ b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { defaultsDeep, uniq, compact } from 'lodash'; +import { defaultsDeep, uniq, compact, get } from 'lodash'; import { callClusterFactory } from '../../../xpack_main'; import { @@ -14,6 +14,8 @@ import { } from '../../common/constants'; import { sendBulkPayload, monitoringBulk, getKibanaInfoForStats } from './lib'; +import { parseElasticsearchConfig } from '../es_client/parse_elasticsearch_config'; +import { hasMonitoringCluster } from '../es_client/instantiate_client'; const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG]; @@ -39,6 +41,8 @@ export class BulkUploader { throw new Error('interval number of milliseconds is required'); } + this._hasDirectConnectionToMonitoringCluster = false; + this._productionClusterUuid = null; this._timer = null; // Hold sending and fetching usage until monitoring.bulk is successful. This means that we // send usage data on the second tick. But would save a lot of bandwidth fetching usage on @@ -60,6 +64,18 @@ export class BulkUploader { plugins: [monitoringBulk], }); + const directConfig = parseElasticsearchConfig(config, 'monitoring.elasticsearch'); + if (hasMonitoringCluster(directConfig)) { + this._hasDirectConnectionToMonitoringCluster = true; + this._cluster = elasticsearchPlugin.createCluster('monitoring-direct', directConfig); + elasticsearchPlugin + .getCluster('admin') + .callWithInternalUser('info') + .then(data => { + this._productionClusterUuid = get(data, 'cluster_uuid'); + }); + } + this._callClusterWithInternalUser = callClusterFactory({ plugins: { elasticsearch: elasticsearchPlugin }, }).getCallClusterInternal(); @@ -151,7 +167,6 @@ export class BulkUploader { const data = await usageCollection.bulkFetch(this._callClusterWithInternalUser); const payload = this.toBulkUploadFormat(compact(data), usageCollection); - if (payload) { try { this._log.debug(`Uploading bulk stats payload to the local cluster`); @@ -182,7 +197,13 @@ export class BulkUploader { } async _onPayload(payload) { - return await sendBulkPayload(this._cluster, this._interval, payload); + return await sendBulkPayload( + this._cluster, + this._interval, + payload, + this._hasDirectConnectionToMonitoringCluster, + this._productionClusterUuid + ); } /* diff --git a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js index 3e5c64905da0d..a1c0ca2f0b3a1 100644 --- a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js +++ b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js @@ -3,13 +3,55 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ +import moment from 'moment'; +import { chunk, get } from 'lodash'; +import { + MONITORING_SYSTEM_API_VERSION, + KIBANA_SYSTEM_ID, + KIBANA_STATS_TYPE_MONITORING, + KIBANA_SETTINGS_TYPE, +} from '../../../common/constants'; -import { MONITORING_SYSTEM_API_VERSION, KIBANA_SYSTEM_ID } from '../../../common/constants'; +const SUPPORTED_TYPES = [KIBANA_STATS_TYPE_MONITORING, KIBANA_SETTINGS_TYPE]; +export function formatForNormalBulkEndpoint(payload, productionClusterUuid) { + const dateSuffix = moment().format('YYYY.MM.DD'); + return chunk(payload, 2).reduce((accum, chunk) => { + const type = get(chunk[0], 'index._type'); + if (!type || !SUPPORTED_TYPES.includes(type)) { + return accum; + } + + accum.push({ + index: { + _index: `.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${dateSuffix}`, + }, + }); + accum.push({ + [type]: chunk[1], + type, + cluster_uuid: productionClusterUuid, + }); + return accum; + }, []); +} /* * Send the Kibana usage data to the ES Monitoring Bulk endpoint */ -export function sendBulkPayload(cluster, interval, payload) { +export async function sendBulkPayload( + cluster, + interval, + payload, + hasDirectConnectionToMonitoringCluster = false, + productionClusterUuid = null +) { + if (hasDirectConnectionToMonitoringCluster) { + const formattedPayload = formatForNormalBulkEndpoint(payload, productionClusterUuid); + return await cluster.callWithInternalUser('bulk', { + body: formattedPayload, + }); + } + return cluster.callWithInternalUser('monitoring.bulk', { system_id: KIBANA_SYSTEM_ID, system_api_version: MONITORING_SYSTEM_API_VERSION, diff --git a/x-pack/legacy/plugins/monitoring/server/plugin.js b/x-pack/legacy/plugins/monitoring/server/plugin.js index 50e5319a0f526..57b3f76a9576c 100644 --- a/x-pack/legacy/plugins/monitoring/server/plugin.js +++ b/x-pack/legacy/plugins/monitoring/server/plugin.js @@ -19,7 +19,7 @@ import { getLicenseExpiration } from './alerts/license_expiration'; import { parseElasticsearchConfig } from './es_client/parse_elasticsearch_config'; export class Plugin { - setup(core, plugins) { + async setup(core, plugins) { const kbnServer = core._kbnServer; const config = core.config(); const usageCollection = plugins.usageCollection; @@ -48,6 +48,14 @@ export class Plugin { */ const elasticsearchConfig = parseElasticsearchConfig(config); + // Create the dedicated client + await instantiateClient({ + log: core.log, + events: core.events, + elasticsearchConfig, + elasticsearchPlugin: plugins.elasticsearch, + }); + xpackMainPlugin.status.once('green', async () => { // first time xpack_main turns green /* @@ -56,12 +64,6 @@ export class Plugin { const uiEnabled = config.get('monitoring.ui.enabled'); if (uiEnabled) { - await instantiateClient({ - log: core.log, - events: core.events, - elasticsearchConfig, - elasticsearchPlugin: plugins.elasticsearch, - }); // Instantiate the dedicated ES client await initMonitoringXpackInfo({ config, log: core.log, @@ -96,6 +98,7 @@ export class Plugin { }, }); + console.log(kbnServer.status); const bulkUploader = initBulkUploader({ elasticsearchPlugin: plugins.elasticsearch, config, From 16b03e92b9610ac95bee017ff9b2c9937e023b28 Mon Sep 17 00:00:00 2001 From: chrisronline Date: Tue, 11 Feb 2020 12:11:18 -0500 Subject: [PATCH 2/4] Add timestamp --- .../server/kibana_monitoring/lib/send_bulk_payload.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js index a1c0ca2f0b3a1..a9c4125b55a79 100644 --- a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js +++ b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js @@ -21,6 +21,8 @@ export function formatForNormalBulkEndpoint(payload, productionClusterUuid) { return accum; } + const { timestamp } = chunk[1]; + accum.push({ index: { _index: `.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${dateSuffix}`, @@ -29,6 +31,7 @@ export function formatForNormalBulkEndpoint(payload, productionClusterUuid) { accum.push({ [type]: chunk[1], type, + timestamp, cluster_uuid: productionClusterUuid, }); return accum; From 319acef5683a4d9306fc721cd833159b472eef61 Mon Sep 17 00:00:00 2001 From: chrisronline Date: Thu, 13 Feb 2020 15:38:28 -0500 Subject: [PATCH 3/4] PR feedback --- .../server/kibana_monitoring/__tests__/bulk_uploader.js | 7 +++++-- .../monitoring/server/kibana_monitoring/bulk_uploader.js | 2 ++ .../server/kibana_monitoring/lib/send_bulk_payload.js | 6 ++++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js index 826b4756f92d8..2fefdbd4f0943 100644 --- a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js +++ b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js @@ -314,6 +314,9 @@ describe('BulkUploader', () => { }); it('uses a direct connection to the monitoring cluster, when configured', done => { + const dateInIndex = '2020.02.10'; + const oldNow = moment.now; + moment.now = () => 1581310800000; const prodClusterUuid = '1sdfd5'; const prodCluster = { callWithInternalUser: sinon @@ -386,13 +389,13 @@ describe('BulkUploader', () => { setTimeout(() => { uploader.stop(); const firstCallArgs = monitoringCluster.callWithInternalUser.firstCall.args; - expect(firstCallArgs[0]).to.be('bulk'); expect(firstCallArgs[1].body[0].index._index).to.be( - `.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${moment().format('YYYY.MM.DD')}` + `.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${dateInIndex}` ); expect(firstCallArgs[1].body[1].type).to.be('kibana_stats'); expect(firstCallArgs[1].body[1].cluster_uuid).to.be(prodClusterUuid); + moment.now = oldNow; done(); }, CHECK_DELAY); }); diff --git a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js index 23586ebe07138..7417e6ca804d9 100644 --- a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js +++ b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js @@ -66,6 +66,7 @@ export class BulkUploader { const directConfig = parseElasticsearchConfig(config, 'monitoring.elasticsearch'); if (hasMonitoringCluster(directConfig)) { + this._log.info(`Detected direct connection to monitoring cluster`); this._hasDirectConnectionToMonitoringCluster = true; this._cluster = elasticsearchPlugin.createCluster('monitoring-direct', directConfig); elasticsearchPlugin @@ -201,6 +202,7 @@ export class BulkUploader { this._cluster, this._interval, payload, + this._log, this._hasDirectConnectionToMonitoringCluster, this._productionClusterUuid ); diff --git a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js index a9c4125b55a79..da698d71fdaa9 100644 --- a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js +++ b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js @@ -45,10 +45,16 @@ export async function sendBulkPayload( cluster, interval, payload, + log, hasDirectConnectionToMonitoringCluster = false, productionClusterUuid = null ) { if (hasDirectConnectionToMonitoringCluster) { + if (productionClusterUuid === null) { + log.warn( + `Unable to determine production cluster uuid to use for shipping monitoring data. Kibana monitoring data will appear in a standalone cluster in the Stack Monitoring UI.` + ); + } const formattedPayload = formatForNormalBulkEndpoint(payload, productionClusterUuid); return await cluster.callWithInternalUser('bulk', { body: formattedPayload, From 02801385acc6abc5ecd7e0d5fede9494824386bf Mon Sep 17 00:00:00 2001 From: chrisronline Date: Thu, 13 Feb 2020 16:53:47 -0500 Subject: [PATCH 4/4] Use utc --- .../server/kibana_monitoring/lib/send_bulk_payload.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js index da698d71fdaa9..c378c0ad0fa08 100644 --- a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js +++ b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js @@ -14,7 +14,7 @@ import { const SUPPORTED_TYPES = [KIBANA_STATS_TYPE_MONITORING, KIBANA_SETTINGS_TYPE]; export function formatForNormalBulkEndpoint(payload, productionClusterUuid) { - const dateSuffix = moment().format('YYYY.MM.DD'); + const dateSuffix = moment.utc().format('YYYY.MM.DD'); return chunk(payload, 2).reduce((accum, chunk) => { const type = get(chunk[0], 'index._type'); if (!type || !SUPPORTED_TYPES.includes(type)) {