Skip to content

Commit

Permalink
Perform successful Elasticsearch version check before migrations (#51311
Browse files Browse the repository at this point in the history
) (#56600)

* Convert parts of Elasticsearch version check to ts
* Move ES version check to NP
* Improve types
* Wait till for compatible ES nodes before SO migrations
* Don't wait for ES compatibility if skipMigrations=true
* Legacy Elasticsearch plugin integration test
* Make ES compatibility check and migrations logging more visible
* Test for isCompatible=false when ES version check throws
* Start pollEsNodesVersion immediately
* Refactor pollEsNodesVersion
  • Loading branch information
rudolf authored Feb 3, 2020
1 parent a0f6edc commit b095246
Show file tree
Hide file tree
Showing 29 changed files with 769 additions and 655 deletions.
14 changes: 13 additions & 1 deletion src/core/server/elasticsearch/elasticsearch_config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,19 @@ const configSchema = schema.object({
),
apiVersion: schema.string({ defaultValue: DEFAULT_API_VERSION }),
healthCheck: schema.object({ delay: schema.duration({ defaultValue: 2500 }) }),
ignoreVersionMismatch: schema.boolean({ defaultValue: false }),
ignoreVersionMismatch: schema.conditional(
schema.contextRef('dev'),
false,
schema.boolean({
validate: rawValue => {
if (rawValue === true) {
return '"ignoreVersionMismatch" can only be set to true in development mode';
}
},
defaultValue: false,
}),
schema.boolean({ defaultValue: false })
),
});

const deprecations: ConfigDeprecationProvider = () => [
Expand Down
7 changes: 7 additions & 0 deletions src/core/server/elasticsearch/elasticsearch_service.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { IScopedClusterClient } from './scoped_cluster_client';
import { ElasticsearchConfig } from './elasticsearch_config';
import { ElasticsearchService } from './elasticsearch_service';
import { InternalElasticsearchServiceSetup, ElasticsearchServiceSetup } from './types';
import { NodesVersionCompatibility } from './version_check/ensure_es_version';

const createScopedClusterClientMock = (): jest.Mocked<IScopedClusterClient> => ({
callAsInternalUser: jest.fn(),
Expand Down Expand Up @@ -71,6 +72,12 @@ type MockedInternalElasticSearchServiceSetup = jest.Mocked<
const createInternalSetupContractMock = () => {
const setupContract: MockedInternalElasticSearchServiceSetup = {
...createSetupContractMock(),
esNodesCompatibility$: new BehaviorSubject<NodesVersionCompatibility>({
isCompatible: true,
incompatibleNodes: [],
warningNodes: [],
kibanaVersion: '8.0.0',
}),
legacy: {
config$: new BehaviorSubject({} as ElasticsearchConfig),
},
Expand Down
11 changes: 6 additions & 5 deletions src/core/server/elasticsearch/elasticsearch_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { httpServiceMock } from '../http/http_service.mock';
import { ElasticsearchConfig } from './elasticsearch_config';
import { ElasticsearchService } from './elasticsearch_service';
import { elasticsearchServiceMock } from './elasticsearch_service.mock';
import { duration } from 'moment';

let elasticsearchService: ElasticsearchService;
const configService = configServiceMock.create();
Expand All @@ -41,7 +42,7 @@ configService.atPath.mockReturnValue(
new BehaviorSubject({
hosts: ['http://1.2.3.4'],
healthCheck: {
delay: 2000,
delay: duration(2000),
},
ssl: {
verificationMode: 'none',
Expand Down Expand Up @@ -125,7 +126,7 @@ describe('#setup', () => {
const config = MockClusterClient.mock.calls[0][0];
expect(config).toMatchInlineSnapshot(`
Object {
"healthCheckDelay": 2000,
"healthCheckDelay": "PT2S",
"hosts": Array [
"http://8.8.8.8",
],
Expand All @@ -150,7 +151,7 @@ Object {
const config = MockClusterClient.mock.calls[0][0];
expect(config).toMatchInlineSnapshot(`
Object {
"healthCheckDelay": 2000,
"healthCheckDelay": "PT2S",
"hosts": Array [
"http://1.2.3.4",
],
Expand All @@ -174,7 +175,7 @@ Object {
new BehaviorSubject({
hosts: ['http://1.2.3.4', 'http://9.8.7.6'],
healthCheck: {
delay: 2000,
delay: duration(2000),
},
ssl: {
verificationMode: 'none',
Expand All @@ -196,7 +197,7 @@ Object {
const config = MockClusterClient.mock.calls[0][0];
expect(config).toMatchInlineSnapshot(`
Object {
"healthCheckDelay": 2000,
"healthCheckDelay": "PT2S",
"hosts": Array [
"http://8.8.8.8",
],
Expand Down
46 changes: 38 additions & 8 deletions src/core/server/elasticsearch/elasticsearch_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { ElasticsearchConfig, ElasticsearchConfigType } from './elasticsearch_co
import { InternalHttpServiceSetup, GetAuthHeaders } from '../http/';
import { InternalElasticsearchServiceSetup } from './types';
import { CallAPIOptions } from './api_types';
import { pollEsNodesVersion } from './version_check/ensure_es_version';

/** @internal */
interface CoreClusterClients {
Expand All @@ -46,9 +47,17 @@ interface SetupDeps {
export class ElasticsearchService implements CoreService<InternalElasticsearchServiceSetup> {
private readonly log: Logger;
private readonly config$: Observable<ElasticsearchConfig>;
private subscription?: Subscription;
private subscriptions: {
client?: Subscription;
esNodesCompatibility?: Subscription;
} = {
client: undefined,
esNodesCompatibility: undefined,
};
private kibanaVersion: string;

constructor(private readonly coreContext: CoreContext) {
this.kibanaVersion = coreContext.env.packageInfo.version;
this.log = coreContext.logger.get('elasticsearch-service');
this.config$ = coreContext.configService
.atPath<ElasticsearchConfigType>('elasticsearch')
Expand All @@ -60,7 +69,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe

const clients$ = this.config$.pipe(
filter(() => {
if (this.subscription !== undefined) {
if (this.subscriptions.client !== undefined) {
this.log.error('Clients cannot be changed after they are created');
return false;
}
Expand Down Expand Up @@ -91,7 +100,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
publishReplay(1)
) as ConnectableObservable<CoreClusterClients>;

this.subscription = clients$.connect();
this.subscriptions.client = clients$.connect();

const config = await this.config$.pipe(first()).toPromise();

Expand Down Expand Up @@ -149,11 +158,31 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
},
};

const esNodesCompatibility$ = pollEsNodesVersion({
callWithInternalUser: adminClient.callAsInternalUser,
log: this.log,
ignoreVersionMismatch: config.ignoreVersionMismatch,
esVersionCheckInterval: config.healthCheckDelay.asMilliseconds(),
kibanaVersion: this.kibanaVersion,
}).pipe(publishReplay(1));

this.subscriptions.esNodesCompatibility = (esNodesCompatibility$ as ConnectableObservable<
unknown
>).connect();

// TODO: Move to Status Service https://github.com/elastic/kibana/issues/41983
esNodesCompatibility$.subscribe(({ isCompatible, message }) => {
if (!isCompatible && message) {
this.log.error(message);
}
});

return {
legacy: { config$: clients$.pipe(map(clients => clients.config)) },

adminClient,
dataClient,
esNodesCompatibility$,

createClient: (type: string, clientConfig: Partial<ElasticsearchClientConfig> = {}) => {
const finalConfig = merge({}, config, clientConfig);
Expand All @@ -166,11 +195,12 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe

public async stop() {
this.log.debug('Stopping elasticsearch service');

if (this.subscription !== undefined) {
this.subscription.unsubscribe();
this.subscription = undefined;
}
// TODO(TS-3.7-ESLINT)
// eslint-disable-next-line no-unused-expressions
this.subscriptions.client?.unsubscribe();
// eslint-disable-next-line no-unused-expressions
this.subscriptions.esNodesCompatibility?.unsubscribe();
this.subscriptions = { client: undefined, esNodesCompatibility: undefined };
}

private createClusterClient(
Expand Down
2 changes: 2 additions & 0 deletions src/core/server/elasticsearch/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { Observable } from 'rxjs';
import { ElasticsearchConfig } from './elasticsearch_config';
import { ElasticsearchClientConfig } from './elasticsearch_client_config';
import { IClusterClient, ICustomClusterClient } from './cluster_client';
import { NodesVersionCompatibility } from './version_check/ensure_es_version';

/**
* @public
Expand Down Expand Up @@ -77,4 +78,5 @@ export interface InternalElasticsearchServiceSetup extends ElasticsearchServiceS
readonly legacy: {
readonly config$: Observable<ElasticsearchConfig>;
};
esNodesCompatibility$: Observable<NodesVersionCompatibility>;
}
Loading

0 comments on commit b095246

Please sign in to comment.