Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Monitoring] Support shipping directly to the monitoring cluster #57022

2 changes: 1 addition & 1 deletion x-pack/legacy/plugins/monitoring/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export const KIBANA_MONITORING_LOGGING_TAG = 'kibana-monitoring';
* The Monitoring API version is the expected API format that we export and expect to import.
* @type {string}
*/
export const MONITORING_SYSTEM_API_VERSION = '6';
export const MONITORING_SYSTEM_API_VERSION = '7';
/**
* The type name used within the Monitoring index to publish Kibana ops stats.
* @type {string}
Expand Down
41 changes: 41 additions & 0 deletions x-pack/legacy/plugins/monitoring/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,47 @@ export const config = Joi => {
interval: Joi.number().default(10000), // op status metrics get buffered at `ops.interval` and flushed to the bulk endpoint at this interval
}).default(),
}).default(),
elasticsearch: Joi.object({
customHeaders: Joi.object().default({}),
logQueries: Joi.boolean().default(false),
requestHeadersWhitelist: Joi.array()
.items()
.single()
.default(DEFAULT_REQUEST_HEADERS),
sniffOnStart: Joi.boolean().default(false),
sniffInterval: Joi.number()
.allow(false)
.default(false),
sniffOnConnectionFault: Joi.boolean().default(false),
hosts: Joi.array()
.items(Joi.string().uri({ scheme: ['http', 'https'] }))
.single(), // if empty, use Kibana's connection config
username: Joi.string(),
password: Joi.string(),
requestTimeout: Joi.number().default(30000),
pingTimeout: Joi.number().default(30000),
ssl: Joi.object({
verificationMode: Joi.string()
.valid('none', 'certificate', 'full')
.default('full'),
certificateAuthorities: Joi.array()
.single()
.items(Joi.string()),
certificate: Joi.string(),
key: Joi.string(),
keyPassphrase: Joi.string(),
keystore: Joi.object({
path: Joi.string(),
password: Joi.string(),
}).default(),
truststore: Joi.object({
path: Joi.string(),
password: Joi.string(),
}).default(),
alwaysPresentCertificate: Joi.boolean().default(false),
}).default(),
apiVersion: Joi.string().default('master'),
}).default(),
cluster_alerts: Joi.object({
enabled: Joi.boolean().default(true),
email_notifications: Joi.object({
Expand Down
7 changes: 5 additions & 2 deletions x-pack/legacy/plugins/monitoring/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const validConfigOptions: string[] = [
'monitoring.ui.container.logstash.enabled',
'monitoring.tests.cloud_detector.enabled',
'monitoring.kibana.collection.interval',
'monitoring.elasticsearch.hosts',
'monitoring.elasticsearch',
'monitoring.ui.elasticsearch.hosts',
'monitoring.ui.elasticsearch',
'monitoring.xpack_api_polling_frequency_millis',
Expand Down Expand Up @@ -77,7 +79,7 @@ export const monitoring = (kibana: LegacyPluginApi): LegacyPluginSpec => {
uiExports: getUiExports(),
deprecations,

init(server: Server) {
async init(server: Server) {
const serverConfig = server.config();
const { getOSInfo, plugins, injectUiAppVars } = server as typeof server & { getOSInfo?: any };
const log = (...args: Parameters<typeof server.log>) => server.log(...args);
Expand Down Expand Up @@ -127,7 +129,8 @@ export const monitoring = (kibana: LegacyPluginApi): LegacyPluginSpec => {
},
};

new Plugin().setup(coreSetup, pluginsSetup, __LEGACY);
const plugin = new Plugin();
await plugin.setup(coreSetup, pluginsSetup, __LEGACY);
},

postInit(server: Server) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ const KEY = 'monitoring.ui.elasticsearch';
* TODO: this code can be removed when this plugin is migrated to the Kibana Platform,
* at that point the ElasticsearchClient and ElasticsearchConfig should be used instead
*/
export const parseElasticsearchConfig = (config: any) => {
const es = config.get(KEY);
export const parseElasticsearchConfig = (config: any, configKey: string = KEY) => {
const es = config.get(configKey);
if (!es) {
return {};
}

const errorPrefix = `[config validation of [${KEY}].ssl]`;
const errorPrefix = `[config validation of [${configKey}].ssl]`;
if (es.ssl?.key && es.ssl?.keystore?.path) {
throw new Error(`${errorPrefix}: cannot use [key] when [keystore.path] is specified`);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

import { noop } from 'lodash';
import sinon from 'sinon';
import moment from 'moment';
import expect from '@kbn/expect';
import { BulkUploader } from '../bulk_uploader';
import { MONITORING_SYSTEM_API_VERSION } from '../../../common/constants';

const FETCH_INTERVAL = 300;
const CHECK_DELAY = 500;
Expand Down Expand Up @@ -52,6 +54,9 @@ describe('BulkUploader', () => {

server = {
log: sinon.spy(),
config: {
get: sinon.spy(),
},
elasticsearchPlugin: {
createCluster: () => cluster,
getCluster: () => cluster,
Expand Down Expand Up @@ -307,5 +312,92 @@ describe('BulkUploader', () => {
done();
}, CHECK_DELAY);
});

it('uses a direct connection to the monitoring cluster, when configured', done => {
const dateInIndex = '2020.02.10';
const oldNow = moment.now;
moment.now = () => 1581310800000;
const prodClusterUuid = '1sdfd5';
const prodCluster = {
callWithInternalUser: sinon
.stub()
.withArgs('monitoring.bulk')
.callsFake(arg => {
let resolution = null;
if (arg === 'info') {
resolution = { cluster_uuid: prodClusterUuid };
}
return new Promise(resolve => resolve(resolution));
}),
};
const monitoringCluster = {
callWithInternalUser: sinon
.stub()
.withArgs('bulk')
.callsFake(() => {
return new Promise(resolve => setTimeout(resolve, CHECK_DELAY + 1));
}),
};

const collectorFetch = sinon.stub().returns({
type: 'kibana_stats',
result: { type: 'kibana_stats', payload: { testData: 12345 } },
});

const collectors = new MockCollectorSet(server, [
{
fetch: collectorFetch,
isReady: () => true,
formatForBulkUpload: result => result,
isUsageCollector: false,
},
]);
const customServer = {
...server,
elasticsearchPlugin: {
createCluster: () => monitoringCluster,
getCluster: name => {
if (name === 'admin' || name === 'data') {
return prodCluster;
}
return monitoringCluster;
},
},
config: {
get: key => {
if (key === 'monitoring.elasticsearch') {
return {
hosts: ['http://localhost:9200'],
username: 'tester',
password: 'testing',
ssl: {},
};
}
return null;
},
},
};
const kbnServerStatus = { toJSON: () => ({ overall: { state: 'green' } }) };
const kbnServerVersion = 'master';
const uploader = new BulkUploader({
...customServer,
interval: FETCH_INTERVAL,
kbnServerStatus,
kbnServerVersion,
});
uploader.start(collectors);
setTimeout(() => {
uploader.stop();
const firstCallArgs = monitoringCluster.callWithInternalUser.firstCall.args;
expect(firstCallArgs[0]).to.be('bulk');
expect(firstCallArgs[1].body[0].index._index).to.be(
`.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${dateInIndex}`
);
expect(firstCallArgs[1].body[1].type).to.be('kibana_stats');
expect(firstCallArgs[1].body[1].cluster_uuid).to.be(prodClusterUuid);
moment.now = oldNow;
done();
}, CHECK_DELAY);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { defaultsDeep, uniq, compact } from 'lodash';
import { defaultsDeep, uniq, compact, get } from 'lodash';
import { callClusterFactory } from '../../../xpack_main';

import {
Expand All @@ -14,6 +14,8 @@ import {
} from '../../common/constants';

import { sendBulkPayload, monitoringBulk, getKibanaInfoForStats } from './lib';
import { parseElasticsearchConfig } from '../es_client/parse_elasticsearch_config';
import { hasMonitoringCluster } from '../es_client/instantiate_client';

const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];

Expand All @@ -39,6 +41,8 @@ export class BulkUploader {
throw new Error('interval number of milliseconds is required');
}

this._hasDirectConnectionToMonitoringCluster = false;
this._productionClusterUuid = null;
this._timer = null;
// Hold sending and fetching usage until monitoring.bulk is successful. This means that we
// send usage data on the second tick. But would save a lot of bandwidth fetching usage on
Expand All @@ -60,6 +64,19 @@ export class BulkUploader {
plugins: [monitoringBulk],
});

const directConfig = parseElasticsearchConfig(config, 'monitoring.elasticsearch');
if (hasMonitoringCluster(directConfig)) {
this._log.info(`Detected direct connection to monitoring cluster`);
this._hasDirectConnectionToMonitoringCluster = true;
this._cluster = elasticsearchPlugin.createCluster('monitoring-direct', directConfig);
elasticsearchPlugin
.getCluster('admin')
.callWithInternalUser('info')
.then(data => {
this._productionClusterUuid = get(data, 'cluster_uuid');
});
}

this._callClusterWithInternalUser = callClusterFactory({
plugins: { elasticsearch: elasticsearchPlugin },
}).getCallClusterInternal();
Expand Down Expand Up @@ -151,7 +168,6 @@ export class BulkUploader {

const data = await usageCollection.bulkFetch(this._callClusterWithInternalUser);
const payload = this.toBulkUploadFormat(compact(data), usageCollection);

if (payload) {
try {
this._log.debug(`Uploading bulk stats payload to the local cluster`);
Expand Down Expand Up @@ -182,7 +198,14 @@ export class BulkUploader {
}

async _onPayload(payload) {
return await sendBulkPayload(this._cluster, this._interval, payload);
return await sendBulkPayload(
this._cluster,
this._interval,
payload,
this._log,
this._hasDirectConnectionToMonitoringCluster,
this._productionClusterUuid
);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,64 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import moment from 'moment';
import { chunk, get } from 'lodash';
import {
MONITORING_SYSTEM_API_VERSION,
KIBANA_SYSTEM_ID,
KIBANA_STATS_TYPE_MONITORING,
KIBANA_SETTINGS_TYPE,
} from '../../../common/constants';

import { MONITORING_SYSTEM_API_VERSION, KIBANA_SYSTEM_ID } from '../../../common/constants';
const SUPPORTED_TYPES = [KIBANA_STATS_TYPE_MONITORING, KIBANA_SETTINGS_TYPE];
export function formatForNormalBulkEndpoint(payload, productionClusterUuid) {
const dateSuffix = moment.utc().format('YYYY.MM.DD');
return chunk(payload, 2).reduce((accum, chunk) => {
const type = get(chunk[0], 'index._type');
if (!type || !SUPPORTED_TYPES.includes(type)) {
return accum;
}

const { timestamp } = chunk[1];

accum.push({
index: {
_index: `.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${dateSuffix}`,
},
});
accum.push({
[type]: chunk[1],
type,
timestamp,
cluster_uuid: productionClusterUuid,
});
return accum;
}, []);
}

/*
* Send the Kibana usage data to the ES Monitoring Bulk endpoint
*/
export function sendBulkPayload(cluster, interval, payload) {
export async function sendBulkPayload(
cluster,
interval,
payload,
log,
hasDirectConnectionToMonitoringCluster = false,
productionClusterUuid = null
) {
if (hasDirectConnectionToMonitoringCluster) {
if (productionClusterUuid === null) {
log.warn(
`Unable to determine production cluster uuid to use for shipping monitoring data. Kibana monitoring data will appear in a standalone cluster in the Stack Monitoring UI.`
);
}
const formattedPayload = formatForNormalBulkEndpoint(payload, productionClusterUuid);
return await cluster.callWithInternalUser('bulk', {
body: formattedPayload,
});
}

return cluster.callWithInternalUser('monitoring.bulk', {
system_id: KIBANA_SYSTEM_ID,
system_api_version: MONITORING_SYSTEM_API_VERSION,
Expand Down
16 changes: 9 additions & 7 deletions x-pack/legacy/plugins/monitoring/server/plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { getLicenseExpiration } from './alerts/license_expiration';
import { parseElasticsearchConfig } from './es_client/parse_elasticsearch_config';

export class Plugin {
setup(_coreSetup, pluginsSetup, __LEGACY) {
async setup(_coreSetup, pluginsSetup, __LEGACY) {
const {
plugins,
_kbnServer: kbnServer,
Expand Down Expand Up @@ -59,6 +59,14 @@ export class Plugin {
*/
const elasticsearchConfig = parseElasticsearchConfig(config);

// Create the dedicated client
await instantiateClient({
log,
events,
elasticsearchConfig,
elasticsearchPlugin: plugins.elasticsearch,
});

xpackMainPlugin.status.once('green', async () => {
// first time xpack_main turns green
/*
Expand All @@ -67,12 +75,6 @@ export class Plugin {
const uiEnabled = config.get('monitoring.ui.enabled');

if (uiEnabled) {
await instantiateClient({
log,
events,
elasticsearchConfig,
elasticsearchPlugin: plugins.elasticsearch,
}); // Instantiate the dedicated ES client
await initMonitoringXpackInfo({
config,
log,
Expand Down