diff --git a/x-pack/legacy/plugins/monitoring/common/constants.ts b/x-pack/legacy/plugins/monitoring/common/constants.ts index 1fc322a0de395..1fb6acdb915b8 100644 --- a/x-pack/legacy/plugins/monitoring/common/constants.ts +++ b/x-pack/legacy/plugins/monitoring/common/constants.ts @@ -17,7 +17,7 @@ export const KIBANA_MONITORING_LOGGING_TAG = 'kibana-monitoring'; * The Monitoring API version is the expected API format that we export and expect to import. * @type {string} */ -export const MONITORING_SYSTEM_API_VERSION = '6'; +export const MONITORING_SYSTEM_API_VERSION = '7'; /** * The type name used within the Monitoring index to publish Kibana ops stats. * @type {string} diff --git a/x-pack/legacy/plugins/monitoring/config.js b/x-pack/legacy/plugins/monitoring/config.js index ec4c00ccbea11..bd35d5271132d 100644 --- a/x-pack/legacy/plugins/monitoring/config.js +++ b/x-pack/legacy/plugins/monitoring/config.js @@ -84,6 +84,47 @@ export const config = Joi => { interval: Joi.number().default(10000), // op status metrics get buffered at `ops.interval` and flushed to the bulk endpoint at this interval }).default(), }).default(), + elasticsearch: Joi.object({ + customHeaders: Joi.object().default({}), + logQueries: Joi.boolean().default(false), + requestHeadersWhitelist: Joi.array() + .items() + .single() + .default(DEFAULT_REQUEST_HEADERS), + sniffOnStart: Joi.boolean().default(false), + sniffInterval: Joi.number() + .allow(false) + .default(false), + sniffOnConnectionFault: Joi.boolean().default(false), + hosts: Joi.array() + .items(Joi.string().uri({ scheme: ['http', 'https'] })) + .single(), // if empty, use Kibana's connection config + username: Joi.string(), + password: Joi.string(), + requestTimeout: Joi.number().default(30000), + pingTimeout: Joi.number().default(30000), + ssl: Joi.object({ + verificationMode: Joi.string() + .valid('none', 'certificate', 'full') + .default('full'), + certificateAuthorities: Joi.array() + .single() + .items(Joi.string()), + certificate: Joi.string(), + key: Joi.string(), + keyPassphrase: Joi.string(), + keystore: Joi.object({ + path: Joi.string(), + password: Joi.string(), + }).default(), + truststore: Joi.object({ + path: Joi.string(), + password: Joi.string(), + }).default(), + alwaysPresentCertificate: Joi.boolean().default(false), + }).default(), + apiVersion: Joi.string().default('master'), + }).default(), cluster_alerts: Joi.object({ enabled: Joi.boolean().default(true), email_notifications: Joi.object({ diff --git a/x-pack/legacy/plugins/monitoring/index.ts b/x-pack/legacy/plugins/monitoring/index.ts index c596beb117971..1186fde52dc46 100644 --- a/x-pack/legacy/plugins/monitoring/index.ts +++ b/x-pack/legacy/plugins/monitoring/index.ts @@ -44,6 +44,8 @@ const validConfigOptions: string[] = [ 'monitoring.ui.container.logstash.enabled', 'monitoring.tests.cloud_detector.enabled', 'monitoring.kibana.collection.interval', + 'monitoring.elasticsearch.hosts', + 'monitoring.elasticsearch', 'monitoring.ui.elasticsearch.hosts', 'monitoring.ui.elasticsearch', 'monitoring.xpack_api_polling_frequency_millis', @@ -77,7 +79,7 @@ export const monitoring = (kibana: LegacyPluginApi): LegacyPluginSpec => { uiExports: getUiExports(), deprecations, - init(server: Server) { + async init(server: Server) { const serverConfig = server.config(); const { getOSInfo, plugins, injectUiAppVars } = server as typeof server & { getOSInfo?: any }; const log = (...args: Parameters) => server.log(...args); @@ -127,7 +129,8 @@ export const monitoring = (kibana: LegacyPluginApi): LegacyPluginSpec => { }, }; - new Plugin().setup(coreSetup, pluginsSetup, __LEGACY); + const plugin = new Plugin(); + await plugin.setup(coreSetup, pluginsSetup, __LEGACY); }, postInit(server: Server) { diff --git a/x-pack/legacy/plugins/monitoring/server/es_client/parse_elasticsearch_config.ts b/x-pack/legacy/plugins/monitoring/server/es_client/parse_elasticsearch_config.ts index 728b3433bf06c..87b225e48c158 100644 --- a/x-pack/legacy/plugins/monitoring/server/es_client/parse_elasticsearch_config.ts +++ b/x-pack/legacy/plugins/monitoring/server/es_client/parse_elasticsearch_config.ts @@ -16,10 +16,13 @@ const KEY = 'monitoring.ui.elasticsearch'; * TODO: this code can be removed when this plugin is migrated to the Kibana Platform, * at that point the ElasticsearchClient and ElasticsearchConfig should be used instead */ -export const parseElasticsearchConfig = (config: any) => { - const es = config.get(KEY); +export const parseElasticsearchConfig = (config: any, configKey: string = KEY) => { + const es = config.get(configKey); + if (!es) { + return {}; + } - const errorPrefix = `[config validation of [${KEY}].ssl]`; + const errorPrefix = `[config validation of [${configKey}].ssl]`; if (es.ssl?.key && es.ssl?.keystore?.path) { throw new Error(`${errorPrefix}: cannot use [key] when [keystore.path] is specified`); } diff --git a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js index ef7d3f1224fab..2fefdbd4f0943 100644 --- a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js +++ b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js @@ -6,8 +6,10 @@ import { noop } from 'lodash'; import sinon from 'sinon'; +import moment from 'moment'; import expect from '@kbn/expect'; import { BulkUploader } from '../bulk_uploader'; +import { MONITORING_SYSTEM_API_VERSION } from '../../../common/constants'; const FETCH_INTERVAL = 300; const CHECK_DELAY = 500; @@ -52,6 +54,9 @@ describe('BulkUploader', () => { server = { log: sinon.spy(), + config: { + get: sinon.spy(), + }, elasticsearchPlugin: { createCluster: () => cluster, getCluster: () => cluster, @@ -307,5 +312,92 @@ describe('BulkUploader', () => { done(); }, CHECK_DELAY); }); + + it('uses a direct connection to the monitoring cluster, when configured', done => { + const dateInIndex = '2020.02.10'; + const oldNow = moment.now; + moment.now = () => 1581310800000; + const prodClusterUuid = '1sdfd5'; + const prodCluster = { + callWithInternalUser: sinon + .stub() + .withArgs('monitoring.bulk') + .callsFake(arg => { + let resolution = null; + if (arg === 'info') { + resolution = { cluster_uuid: prodClusterUuid }; + } + return new Promise(resolve => resolve(resolution)); + }), + }; + const monitoringCluster = { + callWithInternalUser: sinon + .stub() + .withArgs('bulk') + .callsFake(() => { + return new Promise(resolve => setTimeout(resolve, CHECK_DELAY + 1)); + }), + }; + + const collectorFetch = sinon.stub().returns({ + type: 'kibana_stats', + result: { type: 'kibana_stats', payload: { testData: 12345 } }, + }); + + const collectors = new MockCollectorSet(server, [ + { + fetch: collectorFetch, + isReady: () => true, + formatForBulkUpload: result => result, + isUsageCollector: false, + }, + ]); + const customServer = { + ...server, + elasticsearchPlugin: { + createCluster: () => monitoringCluster, + getCluster: name => { + if (name === 'admin' || name === 'data') { + return prodCluster; + } + return monitoringCluster; + }, + }, + config: { + get: key => { + if (key === 'monitoring.elasticsearch') { + return { + hosts: ['http://localhost:9200'], + username: 'tester', + password: 'testing', + ssl: {}, + }; + } + return null; + }, + }, + }; + const kbnServerStatus = { toJSON: () => ({ overall: { state: 'green' } }) }; + const kbnServerVersion = 'master'; + const uploader = new BulkUploader({ + ...customServer, + interval: FETCH_INTERVAL, + kbnServerStatus, + kbnServerVersion, + }); + uploader.start(collectors); + setTimeout(() => { + uploader.stop(); + const firstCallArgs = monitoringCluster.callWithInternalUser.firstCall.args; + expect(firstCallArgs[0]).to.be('bulk'); + expect(firstCallArgs[1].body[0].index._index).to.be( + `.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${dateInIndex}` + ); + expect(firstCallArgs[1].body[1].type).to.be('kibana_stats'); + expect(firstCallArgs[1].body[1].cluster_uuid).to.be(prodClusterUuid); + moment.now = oldNow; + done(); + }, CHECK_DELAY); + }); }); }); diff --git a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js index cf68ec073bebc..7417e6ca804d9 100644 --- a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js +++ b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { defaultsDeep, uniq, compact } from 'lodash'; +import { defaultsDeep, uniq, compact, get } from 'lodash'; import { callClusterFactory } from '../../../xpack_main'; import { @@ -14,6 +14,8 @@ import { } from '../../common/constants'; import { sendBulkPayload, monitoringBulk, getKibanaInfoForStats } from './lib'; +import { parseElasticsearchConfig } from '../es_client/parse_elasticsearch_config'; +import { hasMonitoringCluster } from '../es_client/instantiate_client'; const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG]; @@ -39,6 +41,8 @@ export class BulkUploader { throw new Error('interval number of milliseconds is required'); } + this._hasDirectConnectionToMonitoringCluster = false; + this._productionClusterUuid = null; this._timer = null; // Hold sending and fetching usage until monitoring.bulk is successful. This means that we // send usage data on the second tick. But would save a lot of bandwidth fetching usage on @@ -60,6 +64,19 @@ export class BulkUploader { plugins: [monitoringBulk], }); + const directConfig = parseElasticsearchConfig(config, 'monitoring.elasticsearch'); + if (hasMonitoringCluster(directConfig)) { + this._log.info(`Detected direct connection to monitoring cluster`); + this._hasDirectConnectionToMonitoringCluster = true; + this._cluster = elasticsearchPlugin.createCluster('monitoring-direct', directConfig); + elasticsearchPlugin + .getCluster('admin') + .callWithInternalUser('info') + .then(data => { + this._productionClusterUuid = get(data, 'cluster_uuid'); + }); + } + this._callClusterWithInternalUser = callClusterFactory({ plugins: { elasticsearch: elasticsearchPlugin }, }).getCallClusterInternal(); @@ -151,7 +168,6 @@ export class BulkUploader { const data = await usageCollection.bulkFetch(this._callClusterWithInternalUser); const payload = this.toBulkUploadFormat(compact(data), usageCollection); - if (payload) { try { this._log.debug(`Uploading bulk stats payload to the local cluster`); @@ -182,7 +198,14 @@ export class BulkUploader { } async _onPayload(payload) { - return await sendBulkPayload(this._cluster, this._interval, payload); + return await sendBulkPayload( + this._cluster, + this._interval, + payload, + this._log, + this._hasDirectConnectionToMonitoringCluster, + this._productionClusterUuid + ); } /* diff --git a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js index 3e5c64905da0d..c378c0ad0fa08 100644 --- a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js +++ b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js @@ -3,13 +3,64 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ +import moment from 'moment'; +import { chunk, get } from 'lodash'; +import { + MONITORING_SYSTEM_API_VERSION, + KIBANA_SYSTEM_ID, + KIBANA_STATS_TYPE_MONITORING, + KIBANA_SETTINGS_TYPE, +} from '../../../common/constants'; -import { MONITORING_SYSTEM_API_VERSION, KIBANA_SYSTEM_ID } from '../../../common/constants'; +const SUPPORTED_TYPES = [KIBANA_STATS_TYPE_MONITORING, KIBANA_SETTINGS_TYPE]; +export function formatForNormalBulkEndpoint(payload, productionClusterUuid) { + const dateSuffix = moment.utc().format('YYYY.MM.DD'); + return chunk(payload, 2).reduce((accum, chunk) => { + const type = get(chunk[0], 'index._type'); + if (!type || !SUPPORTED_TYPES.includes(type)) { + return accum; + } + + const { timestamp } = chunk[1]; + + accum.push({ + index: { + _index: `.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${dateSuffix}`, + }, + }); + accum.push({ + [type]: chunk[1], + type, + timestamp, + cluster_uuid: productionClusterUuid, + }); + return accum; + }, []); +} /* * Send the Kibana usage data to the ES Monitoring Bulk endpoint */ -export function sendBulkPayload(cluster, interval, payload) { +export async function sendBulkPayload( + cluster, + interval, + payload, + log, + hasDirectConnectionToMonitoringCluster = false, + productionClusterUuid = null +) { + if (hasDirectConnectionToMonitoringCluster) { + if (productionClusterUuid === null) { + log.warn( + `Unable to determine production cluster uuid to use for shipping monitoring data. Kibana monitoring data will appear in a standalone cluster in the Stack Monitoring UI.` + ); + } + const formattedPayload = formatForNormalBulkEndpoint(payload, productionClusterUuid); + return await cluster.callWithInternalUser('bulk', { + body: formattedPayload, + }); + } + return cluster.callWithInternalUser('monitoring.bulk', { system_id: KIBANA_SYSTEM_ID, system_api_version: MONITORING_SYSTEM_API_VERSION, diff --git a/x-pack/legacy/plugins/monitoring/server/plugin.js b/x-pack/legacy/plugins/monitoring/server/plugin.js index c2aed7365f3af..304d2c08a1688 100644 --- a/x-pack/legacy/plugins/monitoring/server/plugin.js +++ b/x-pack/legacy/plugins/monitoring/server/plugin.js @@ -19,7 +19,7 @@ import { getLicenseExpiration } from './alerts/license_expiration'; import { parseElasticsearchConfig } from './es_client/parse_elasticsearch_config'; export class Plugin { - setup(_coreSetup, pluginsSetup, __LEGACY) { + async setup(_coreSetup, pluginsSetup, __LEGACY) { const { plugins, _kbnServer: kbnServer, @@ -59,6 +59,14 @@ export class Plugin { */ const elasticsearchConfig = parseElasticsearchConfig(config); + // Create the dedicated client + await instantiateClient({ + log, + events, + elasticsearchConfig, + elasticsearchPlugin: plugins.elasticsearch, + }); + xpackMainPlugin.status.once('green', async () => { // first time xpack_main turns green /* @@ -67,12 +75,6 @@ export class Plugin { const uiEnabled = config.get('monitoring.ui.enabled'); if (uiEnabled) { - await instantiateClient({ - log, - events, - elasticsearchConfig, - elasticsearchPlugin: plugins.elasticsearch, - }); // Instantiate the dedicated ES client await initMonitoringXpackInfo({ config, log,