Skip to content

Commit

Permalink
Switch back to observables.
Browse files Browse the repository at this point in the history
  • Loading branch information
azasypkin committed Jan 24, 2019
1 parent bf37868 commit 536f7d4
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 81 deletions.
38 changes: 10 additions & 28 deletions src/core/server/elasticsearch/elasticsearch_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

import { ConnectableObservable, Observable, Subscription } from 'rxjs';
import { filter, first, publishReplay, switchMap } from 'rxjs/operators';
import { filter, map, publishReplay, switchMap } from 'rxjs/operators';
import { CoreContext, CoreService } from '../../types';
import { Logger } from '../logging';
import { ClusterClient } from './cluster_client';
Expand All @@ -35,19 +35,12 @@ interface CoreClusterClients {
export interface ElasticsearchServiceStartContract {
// Required for the BWC only.
readonly bwc: {
readonly config: ElasticsearchConfig;
readonly config$: Observable<ElasticsearchConfig>;
};

readonly apiVersion: ElasticsearchConfig['apiVersion'];
readonly requestTimeout: ElasticsearchConfig['requestTimeout'];
readonly shardTimeout: ElasticsearchConfig['shardTimeout'];

readonly createClient: (
type: string,
config?: Partial<ElasticsearchClientConfig>
) => ClusterClient;
readonly adminClient: ClusterClient;
readonly dataClient: ClusterClient;
readonly createClient: (type: string, config: ElasticsearchClientConfig) => ClusterClient;
readonly adminClient$: Observable<ClusterClient>;
readonly dataClient$: Observable<ClusterClient>;
}

/** @internal */
Expand Down Expand Up @@ -99,25 +92,14 @@ export class ElasticsearchService implements CoreService<ElasticsearchServiceSta

this.subscription = clients$.connect();

const { config: defaultConfig, adminClient, dataClient } = await clients$
.pipe(first())
.toPromise();

return {
bwc: { config: defaultConfig },

apiVersion: defaultConfig.apiVersion,
requestTimeout: defaultConfig.requestTimeout,
shardTimeout: defaultConfig.shardTimeout,
bwc: { config$: clients$.pipe(map(clients => clients.config)) },

adminClient,
dataClient,
adminClient$: clients$.pipe(map(clients => clients.adminClient)),
dataClient$: clients$.pipe(map(clients => clients.dataClient)),

createClient: (type: string, clientConfig: Partial<ElasticsearchClientConfig> = {}) => {
return this.createClusterClient(type, {
...(defaultConfig as ElasticsearchClientConfig),
...clientConfig,
});
createClient: (type: string, clientConfig: ElasticsearchClientConfig) => {
return this.createClusterClient(type, clientConfig);
},
};
}
Expand Down
25 changes: 12 additions & 13 deletions src/legacy/core_plugins/console/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

import Boom from 'boom';
import { first } from 'rxjs/operators';
import { resolve, join, sep } from 'path';
import url from 'url';
import { has, isEmpty, head, pick, isPlainObject } from 'lodash';
Expand Down Expand Up @@ -64,6 +65,7 @@ export default function (kibana) {
const modules = resolve(__dirname, 'public/webpackShims/');
const src = resolve(__dirname, 'public/src/');

let defaultVars;
const apps = [];
return new kibana.Plugin({
id: 'console',
Expand Down Expand Up @@ -107,17 +109,23 @@ export default function (kibana) {
];
},

init: function (server, options) {
async init(server, options) {
server.expose('addExtensionSpecFilePath', addExtensionSpecFilePath);
if (options.ssl && options.ssl.verify) {
throw new Error('sense.ssl.verify is no longer supported.');
}

const config = server.config();
const bwcEsConfig = server.core.es.bwc.config;
const bwcEsConfig = await server.core.es.bwc.config$.pipe(first()).toPromise();
const proxyConfigCollection = new ProxyConfigCollection(options.proxyConfig);
const proxyPathFilters = options.proxyFilter.map(str => new RegExp(str));

defaultVars = {
elasticsearchUrl: url.format(
Object.assign(url.parse(head(bwcEsConfig.hosts)), { auth: false })
)
};

server.route(createProxyRoute({
baseUrl: head(bwcEsConfig.hosts),
pathFilters: proxyPathFilters,
Expand All @@ -133,7 +141,7 @@ export default function (kibana) {
}

return {
...getElasticsearchProxyConfig(server),
...getElasticsearchProxyConfig(bwcEsConfig),
headers,
};
}
Expand All @@ -159,16 +167,7 @@ export default function (kibana) {
devTools: ['plugins/console/console'],
styleSheetPaths: resolve(__dirname, 'public/index.scss'),

injectDefaultVars(server) {
return {
elasticsearchUrl: url.format(
Object.assign(
url.parse(head(server.core.es.bwc.config.hosts)),
{ auth: false }
)
)
};
},
injectDefaultVars: () => defaultVars,

noParse: [
join(modules, 'ace' + sep),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import url from 'url';

const readFile = (file) => readFileSync(file, 'utf8');

const createAgent = (server) => {
const bwcConfig = server.core.es.bwc.config;
const createAgent = (bwcConfig) => {
const target = url.parse(_.head(bwcConfig.hosts));
if (!/^https/.test(target.protocol)) return new http.Agent();

Expand Down Expand Up @@ -68,9 +67,9 @@ const createAgent = (server) => {
return new https.Agent(agentOptions);
};

export const getElasticsearchProxyConfig = (server) => {
export const getElasticsearchProxyConfig = (bwcConfig) => {
return {
timeout: server.core.es.requestTimeout.asMilliseconds(),
agent: createAgent(server)
timeout: bwcConfig.requestTimeout.asMilliseconds(),
agent: createAgent(bwcConfig)
};
};
9 changes: 0 additions & 9 deletions src/legacy/core_plugins/elasticsearch/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,6 @@ interface RequestHeaders {
[name: string]: string;
}

interface ElasticsearchClientLogging {
error(err: Error): void;
warning(message: string): void;
trace(method: string, options: { path: string }, query?: string, statusCode?: number): void;
info(): void;
debug(): void;
close(): void;
}

interface AssistantAPIClientParams extends GenericParams {
path: '/_xpack/migration/assistance';
method: 'GET';
Expand Down
56 changes: 39 additions & 17 deletions src/legacy/core_plugins/elasticsearch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,44 @@
* under the License.
*/

import { combineLatest } from 'rxjs';
import { first, map } from 'rxjs/operators';
import healthCheck from './lib/health_check';
import { Cluster } from './lib/cluster';
import { createProxy } from './lib/create_proxy';

export default function (kibana) {
let defaultVars;

return new kibana.Plugin({
require: ['kibana'],

uiExports: {
injectDefaultVars(server) {
return {
esRequestTimeout: server.core.es.requestTimeout.asMilliseconds(),
esShardTimeout: server.core.es.shardTimeout.asMilliseconds(),
esApiVersion: server.core.es.apiVersion,
};
}
},

init(server) {
const clusters = new Map();
uiExports: { injectDefaultVars: () => defaultVars },

async init(server) {
// All methods that ES plugin exposes are synchronous so we should get the first
// value from all observables here to be able to synchronously return and create
// cluster clients afterwards.
const [bwcEsConfig, adminCluster, dataCluster] = await combineLatest(
server.core.es.bwc.config$,
server.core.es.adminClient$,
server.core.es.dataClient$
).pipe(
first(),
map(([config, adminClusterClient, dataClusterClient]) => [
config,
new Cluster(adminClusterClient),
new Cluster(dataClusterClient)
])
).toPromise();

const adminCluster = new Cluster(server.core.es.adminClient);
const dataCluster = new Cluster(server.core.es.dataClient);
defaultVars = {
esRequestTimeout: bwcEsConfig.requestTimeout.asMilliseconds(),
esShardTimeout: bwcEsConfig.shardTimeout.asMilliseconds(),
esApiVersion: bwcEsConfig.apiVersion,
};

const clusters = new Map();
server.expose('getCluster', (name) => {
if (name === 'admin') {
return adminCluster;
Expand All @@ -52,12 +67,19 @@ export default function (kibana) {
return clusters.get(name);
});

server.expose('createCluster', (name, config) => {
server.expose('createCluster', (name, clientConfig = {}) => {
if (clusters.has(name)) {
throw new Error(`cluster '${name}' already exists`);
}

const cluster = new Cluster(server.core.es.createClient(name, config));
// We fill all the missing properties in the `clientConfig` using the default
// Elasticsearch config so that we don't depend on default values set and
// controlled by underlying Elasticsearch JS client.
const cluster = new Cluster(server.core.es.createClient(name, {
...bwcEsConfig,
...clientConfig,
}));

clusters.set(name, cluster);

return cluster;
Expand All @@ -73,7 +95,7 @@ export default function (kibana) {
createProxy(server);

// Set up the health check service and start it.
const { start, waitUntilReady } = healthCheck(this, server);
const { start, waitUntilReady } = healthCheck(this, server, bwcEsConfig.healthCheckDelay.asMilliseconds());
server.expose('waitUntilReady', waitUntilReady);
start();
}
Expand Down
10 changes: 4 additions & 6 deletions src/legacy/core_plugins/elasticsearch/lib/health_check.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@ import Promise from 'bluebird';
import kibanaVersion from './kibana_version';
import { ensureEsVersion } from './ensure_es_version';

export default function (plugin, server) {
export default function (plugin, server, requestDelay) {
const adminCluster = server.plugins.elasticsearch.getCluster('admin');
const NoConnections = adminCluster.errors.NoConnections;
const callAdminAsKibanaUser = adminCluster.callWithInternalUser;

const REQUEST_DELAY = server.core.es.bwc.config.healthCheckDelay.asMilliseconds();

plugin.status.yellow('Waiting for Elasticsearch');
function waitForPong(callWithInternalUser) {
return callWithInternalUser('ping').catch(function (err) {
if (!(err instanceof NoConnections)) throw err;
plugin.status.red(`Unable to connect to Elasticsearch.`);
return Promise.delay(REQUEST_DELAY).then(waitForPong.bind(null, callWithInternalUser));
return Promise.delay(requestDelay).then(waitForPong.bind(null, callWithInternalUser));
});
}

Expand All @@ -46,7 +44,7 @@ export default function (plugin, server) {
function waitForEsVersion() {
return ensureEsVersion(server, kibanaVersion.get()).catch(err => {
plugin.status.red(err);
return Promise.delay(REQUEST_DELAY).then(waitForEsVersion);
return Promise.delay(requestDelay).then(waitForEsVersion);
});
}

Expand Down Expand Up @@ -80,7 +78,7 @@ export default function (plugin, server) {
}

function startorRestartChecking() {
scheduleCheck(stopChecking() ? REQUEST_DELAY : 1);
scheduleCheck(stopChecking() ? requestDelay : 1);
}

function stopChecking() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* under the License.
*/

import { first, map } from 'rxjs/operators';
import { i18n } from '@kbn/i18n';
import _ from 'lodash';
import Datasource from '../../lib/classes/datasource';
Expand Down Expand Up @@ -126,7 +127,12 @@ export default new Datasource('es', {
});
}

const body = buildRequest(config, tlConfig, scriptedFields);
const esShardTimeout = await tlConfig.server.core.es.bwc.config$.pipe(
first(),
map(config => config.shardTimeout.asMilliseconds())
).toPromise();

const body = buildRequest(config, tlConfig, scriptedFields, esShardTimeout);

const { callWithRequest } = tlConfig.server.plugins.elasticsearch.getCluster('data');
const resp = await callWithRequest(tlConfig.request, 'search', body);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import _ from 'lodash';
import { buildAggBody } from './agg_body';
import createDateAgg from './create_date_agg';

export default function buildRequest(config, tlConfig, scriptedFields) {
export default function buildRequest(config, tlConfig, scriptedFields, timeout) {

const bool = { must: [] };

Expand Down Expand Up @@ -78,7 +78,6 @@ export default function buildRequest(config, tlConfig, scriptedFields) {
}
};

const timeout = tlConfig.server.core.es.shardTimeout.asMilliseconds();
if (timeout) {
request.timeout = `${timeout}ms`;
}
Expand Down

0 comments on commit 536f7d4

Please sign in to comment.