From 536f7d449ee32e3564bdda0be765b1b39c939b93 Mon Sep 17 00:00:00 2001 From: Aleh Zasypkin Date: Wed, 23 Jan 2019 18:53:47 +0100 Subject: [PATCH] Switch back to observables. --- .../elasticsearch/elasticsearch_service.ts | 38 ++++--------- src/legacy/core_plugins/console/index.js | 25 ++++----- .../server/elasticsearch_proxy_config.js | 9 ++- .../core_plugins/elasticsearch/index.d.ts | 9 --- .../core_plugins/elasticsearch/index.js | 56 +++++++++++++------ .../elasticsearch/lib/health_check.js | 10 ++-- .../server/series_functions/es/index.js | 8 ++- .../series_functions/es/lib/build_request.js | 3 +- 8 files changed, 77 insertions(+), 81 deletions(-) diff --git a/src/core/server/elasticsearch/elasticsearch_service.ts b/src/core/server/elasticsearch/elasticsearch_service.ts index 80773aa73f0f8..4e2779b0cdf13 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.ts @@ -18,7 +18,7 @@ */ import { ConnectableObservable, Observable, Subscription } from 'rxjs'; -import { filter, first, publishReplay, switchMap } from 'rxjs/operators'; +import { filter, map, publishReplay, switchMap } from 'rxjs/operators'; import { CoreContext, CoreService } from '../../types'; import { Logger } from '../logging'; import { ClusterClient } from './cluster_client'; @@ -35,19 +35,12 @@ interface CoreClusterClients { export interface ElasticsearchServiceStartContract { // Required for the BWC only. readonly bwc: { - readonly config: ElasticsearchConfig; + readonly config$: Observable; }; - readonly apiVersion: ElasticsearchConfig['apiVersion']; - readonly requestTimeout: ElasticsearchConfig['requestTimeout']; - readonly shardTimeout: ElasticsearchConfig['shardTimeout']; - - readonly createClient: ( - type: string, - config?: Partial - ) => ClusterClient; - readonly adminClient: ClusterClient; - readonly dataClient: ClusterClient; + readonly createClient: (type: string, config: ElasticsearchClientConfig) => ClusterClient; + readonly adminClient$: Observable; + readonly dataClient$: Observable; } /** @internal */ @@ -99,25 +92,14 @@ export class ElasticsearchService implements CoreService clients.config)) }, - adminClient, - dataClient, + adminClient$: clients$.pipe(map(clients => clients.adminClient)), + dataClient$: clients$.pipe(map(clients => clients.dataClient)), - createClient: (type: string, clientConfig: Partial = {}) => { - return this.createClusterClient(type, { - ...(defaultConfig as ElasticsearchClientConfig), - ...clientConfig, - }); + createClient: (type: string, clientConfig: ElasticsearchClientConfig) => { + return this.createClusterClient(type, clientConfig); }, }; } diff --git a/src/legacy/core_plugins/console/index.js b/src/legacy/core_plugins/console/index.js index 517deca45b6bf..a8e562a0ccf31 100644 --- a/src/legacy/core_plugins/console/index.js +++ b/src/legacy/core_plugins/console/index.js @@ -18,6 +18,7 @@ */ import Boom from 'boom'; +import { first } from 'rxjs/operators'; import { resolve, join, sep } from 'path'; import url from 'url'; import { has, isEmpty, head, pick, isPlainObject } from 'lodash'; @@ -64,6 +65,7 @@ export default function (kibana) { const modules = resolve(__dirname, 'public/webpackShims/'); const src = resolve(__dirname, 'public/src/'); + let defaultVars; const apps = []; return new kibana.Plugin({ id: 'console', @@ -107,17 +109,23 @@ export default function (kibana) { ]; }, - init: function (server, options) { + async init(server, options) { server.expose('addExtensionSpecFilePath', addExtensionSpecFilePath); if (options.ssl && options.ssl.verify) { throw new Error('sense.ssl.verify is no longer supported.'); } const config = server.config(); - const bwcEsConfig = server.core.es.bwc.config; + const bwcEsConfig = await server.core.es.bwc.config$.pipe(first()).toPromise(); const proxyConfigCollection = new ProxyConfigCollection(options.proxyConfig); const proxyPathFilters = options.proxyFilter.map(str => new RegExp(str)); + defaultVars = { + elasticsearchUrl: url.format( + Object.assign(url.parse(head(bwcEsConfig.hosts)), { auth: false }) + ) + }; + server.route(createProxyRoute({ baseUrl: head(bwcEsConfig.hosts), pathFilters: proxyPathFilters, @@ -133,7 +141,7 @@ export default function (kibana) { } return { - ...getElasticsearchProxyConfig(server), + ...getElasticsearchProxyConfig(bwcEsConfig), headers, }; } @@ -159,16 +167,7 @@ export default function (kibana) { devTools: ['plugins/console/console'], styleSheetPaths: resolve(__dirname, 'public/index.scss'), - injectDefaultVars(server) { - return { - elasticsearchUrl: url.format( - Object.assign( - url.parse(head(server.core.es.bwc.config.hosts)), - { auth: false } - ) - ) - }; - }, + injectDefaultVars: () => defaultVars, noParse: [ join(modules, 'ace' + sep), diff --git a/src/legacy/core_plugins/console/server/elasticsearch_proxy_config.js b/src/legacy/core_plugins/console/server/elasticsearch_proxy_config.js index a23398e62164b..dd15a5d6b7e3f 100644 --- a/src/legacy/core_plugins/console/server/elasticsearch_proxy_config.js +++ b/src/legacy/core_plugins/console/server/elasticsearch_proxy_config.js @@ -25,8 +25,7 @@ import url from 'url'; const readFile = (file) => readFileSync(file, 'utf8'); -const createAgent = (server) => { - const bwcConfig = server.core.es.bwc.config; +const createAgent = (bwcConfig) => { const target = url.parse(_.head(bwcConfig.hosts)); if (!/^https/.test(target.protocol)) return new http.Agent(); @@ -68,9 +67,9 @@ const createAgent = (server) => { return new https.Agent(agentOptions); }; -export const getElasticsearchProxyConfig = (server) => { +export const getElasticsearchProxyConfig = (bwcConfig) => { return { - timeout: server.core.es.requestTimeout.asMilliseconds(), - agent: createAgent(server) + timeout: bwcConfig.requestTimeout.asMilliseconds(), + agent: createAgent(bwcConfig) }; }; diff --git a/src/legacy/core_plugins/elasticsearch/index.d.ts b/src/legacy/core_plugins/elasticsearch/index.d.ts index e08b835fdc0ee..2a909eff9d541 100644 --- a/src/legacy/core_plugins/elasticsearch/index.d.ts +++ b/src/legacy/core_plugins/elasticsearch/index.d.ts @@ -165,15 +165,6 @@ interface RequestHeaders { [name: string]: string; } -interface ElasticsearchClientLogging { - error(err: Error): void; - warning(message: string): void; - trace(method: string, options: { path: string }, query?: string, statusCode?: number): void; - info(): void; - debug(): void; - close(): void; -} - interface AssistantAPIClientParams extends GenericParams { path: '/_xpack/migration/assistance'; method: 'GET'; diff --git a/src/legacy/core_plugins/elasticsearch/index.js b/src/legacy/core_plugins/elasticsearch/index.js index 2de5ce745fa92..5e05fb326fd75 100644 --- a/src/legacy/core_plugins/elasticsearch/index.js +++ b/src/legacy/core_plugins/elasticsearch/index.js @@ -17,29 +17,44 @@ * under the License. */ +import { combineLatest } from 'rxjs'; +import { first, map } from 'rxjs/operators'; import healthCheck from './lib/health_check'; import { Cluster } from './lib/cluster'; import { createProxy } from './lib/create_proxy'; export default function (kibana) { + let defaultVars; + return new kibana.Plugin({ require: ['kibana'], - uiExports: { - injectDefaultVars(server) { - return { - esRequestTimeout: server.core.es.requestTimeout.asMilliseconds(), - esShardTimeout: server.core.es.shardTimeout.asMilliseconds(), - esApiVersion: server.core.es.apiVersion, - }; - } - }, - - init(server) { - const clusters = new Map(); + uiExports: { injectDefaultVars: () => defaultVars }, + + async init(server) { + // All methods that ES plugin exposes are synchronous so we should get the first + // value from all observables here to be able to synchronously return and create + // cluster clients afterwards. + const [bwcEsConfig, adminCluster, dataCluster] = await combineLatest( + server.core.es.bwc.config$, + server.core.es.adminClient$, + server.core.es.dataClient$ + ).pipe( + first(), + map(([config, adminClusterClient, dataClusterClient]) => [ + config, + new Cluster(adminClusterClient), + new Cluster(dataClusterClient) + ]) + ).toPromise(); - const adminCluster = new Cluster(server.core.es.adminClient); - const dataCluster = new Cluster(server.core.es.dataClient); + defaultVars = { + esRequestTimeout: bwcEsConfig.requestTimeout.asMilliseconds(), + esShardTimeout: bwcEsConfig.shardTimeout.asMilliseconds(), + esApiVersion: bwcEsConfig.apiVersion, + }; + + const clusters = new Map(); server.expose('getCluster', (name) => { if (name === 'admin') { return adminCluster; @@ -52,12 +67,19 @@ export default function (kibana) { return clusters.get(name); }); - server.expose('createCluster', (name, config) => { + server.expose('createCluster', (name, clientConfig = {}) => { if (clusters.has(name)) { throw new Error(`cluster '${name}' already exists`); } - const cluster = new Cluster(server.core.es.createClient(name, config)); + // We fill all the missing properties in the `clientConfig` using the default + // Elasticsearch config so that we don't depend on default values set and + // controlled by underlying Elasticsearch JS client. + const cluster = new Cluster(server.core.es.createClient(name, { + ...bwcEsConfig, + ...clientConfig, + })); + clusters.set(name, cluster); return cluster; @@ -73,7 +95,7 @@ export default function (kibana) { createProxy(server); // Set up the health check service and start it. - const { start, waitUntilReady } = healthCheck(this, server); + const { start, waitUntilReady } = healthCheck(this, server, bwcEsConfig.healthCheckDelay.asMilliseconds()); server.expose('waitUntilReady', waitUntilReady); start(); } diff --git a/src/legacy/core_plugins/elasticsearch/lib/health_check.js b/src/legacy/core_plugins/elasticsearch/lib/health_check.js index 3eba617043716..304f3b816214c 100644 --- a/src/legacy/core_plugins/elasticsearch/lib/health_check.js +++ b/src/legacy/core_plugins/elasticsearch/lib/health_check.js @@ -21,19 +21,17 @@ import Promise from 'bluebird'; import kibanaVersion from './kibana_version'; import { ensureEsVersion } from './ensure_es_version'; -export default function (plugin, server) { +export default function (plugin, server, requestDelay) { const adminCluster = server.plugins.elasticsearch.getCluster('admin'); const NoConnections = adminCluster.errors.NoConnections; const callAdminAsKibanaUser = adminCluster.callWithInternalUser; - const REQUEST_DELAY = server.core.es.bwc.config.healthCheckDelay.asMilliseconds(); - plugin.status.yellow('Waiting for Elasticsearch'); function waitForPong(callWithInternalUser) { return callWithInternalUser('ping').catch(function (err) { if (!(err instanceof NoConnections)) throw err; plugin.status.red(`Unable to connect to Elasticsearch.`); - return Promise.delay(REQUEST_DELAY).then(waitForPong.bind(null, callWithInternalUser)); + return Promise.delay(requestDelay).then(waitForPong.bind(null, callWithInternalUser)); }); } @@ -46,7 +44,7 @@ export default function (plugin, server) { function waitForEsVersion() { return ensureEsVersion(server, kibanaVersion.get()).catch(err => { plugin.status.red(err); - return Promise.delay(REQUEST_DELAY).then(waitForEsVersion); + return Promise.delay(requestDelay).then(waitForEsVersion); }); } @@ -80,7 +78,7 @@ export default function (plugin, server) { } function startorRestartChecking() { - scheduleCheck(stopChecking() ? REQUEST_DELAY : 1); + scheduleCheck(stopChecking() ? requestDelay : 1); } function stopChecking() { diff --git a/src/legacy/core_plugins/timelion/server/series_functions/es/index.js b/src/legacy/core_plugins/timelion/server/series_functions/es/index.js index 9de47a57e0ba6..d37dd9d9d0805 100644 --- a/src/legacy/core_plugins/timelion/server/series_functions/es/index.js +++ b/src/legacy/core_plugins/timelion/server/series_functions/es/index.js @@ -17,6 +17,7 @@ * under the License. */ +import { first, map } from 'rxjs/operators'; import { i18n } from '@kbn/i18n'; import _ from 'lodash'; import Datasource from '../../lib/classes/datasource'; @@ -126,7 +127,12 @@ export default new Datasource('es', { }); } - const body = buildRequest(config, tlConfig, scriptedFields); + const esShardTimeout = await tlConfig.server.core.es.bwc.config$.pipe( + first(), + map(config => config.shardTimeout.asMilliseconds()) + ).toPromise(); + + const body = buildRequest(config, tlConfig, scriptedFields, esShardTimeout); const { callWithRequest } = tlConfig.server.plugins.elasticsearch.getCluster('data'); const resp = await callWithRequest(tlConfig.request, 'search', body); diff --git a/src/legacy/core_plugins/timelion/server/series_functions/es/lib/build_request.js b/src/legacy/core_plugins/timelion/server/series_functions/es/lib/build_request.js index 37aab6af45fa7..68b6aa1e5108c 100644 --- a/src/legacy/core_plugins/timelion/server/series_functions/es/lib/build_request.js +++ b/src/legacy/core_plugins/timelion/server/series_functions/es/lib/build_request.js @@ -21,7 +21,7 @@ import _ from 'lodash'; import { buildAggBody } from './agg_body'; import createDateAgg from './create_date_agg'; -export default function buildRequest(config, tlConfig, scriptedFields) { +export default function buildRequest(config, tlConfig, scriptedFields, timeout) { const bool = { must: [] }; @@ -78,7 +78,6 @@ export default function buildRequest(config, tlConfig, scriptedFields) { } }; - const timeout = tlConfig.server.core.es.shardTimeout.asMilliseconds(); if (timeout) { request.timeout = `${timeout}ms`; }