Skip to content

Commit

Permalink
Address PR comments, add error handling inside rolloverAliasIfNeeded
Browse files Browse the repository at this point in the history
  • Loading branch information
marshallmain committed Jun 21, 2021
1 parent 33461c1 commit 19b9eaf
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 55 deletions.
63 changes: 41 additions & 22 deletions x-pack/plugins/rule_registry/server/rule_data_client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* 2.0.
*/

import { IndicesRolloverResponse } from '@elastic/elasticsearch/api/types';
import { ResponseError } from '@elastic/elasticsearch/lib/errors';
import { get } from 'lodash';
import { IndexPatternsFetcher } from '../../../../../src/plugins/data/server';
Expand Down Expand Up @@ -72,7 +73,6 @@ export class RuleDataClient implements IRuleDataClient {
async getWriter(options: { namespace?: string } = {}): Promise<RuleDataWriter> {
const { namespace } = options;
const alias = getNamespacedAlias({ alias: this.options.alias, namespace });
await this.createOrUpdateWriteTarget({ namespace });
await this.rolloverAliasIfNeeded({ namespace });
return {
bulk: async (request) => {
Expand All @@ -90,7 +90,7 @@ export class RuleDataClient implements IRuleDataClient {
response.body.items.length > 0 &&
response.body.items?.[0]?.index?.error?.type === 'index_not_found_exception'
) {
return this.createOrUpdateWriteTarget({ namespace }).then(() => {
return this.createWriteTargetIfNeeded({ namespace }).then(() => {
return clusterClient.bulk(requestWithDefaultParameters);
});
}
Expand All @@ -106,24 +106,41 @@ export class RuleDataClient implements IRuleDataClient {
async rolloverAliasIfNeeded({ namespace }: { namespace?: string }) {
const clusterClient = await this.getClusterClient();
const alias = getNamespacedAlias({ alias: this.options.alias, namespace });
const { body: simulatedRollover } = await clusterClient.indices.rollover({
alias,
dry_run: true,
});
const writeIndexMapping = await clusterClient.indices.get({
index: simulatedRollover.old_index,
});
let simulatedRollover: IndicesRolloverResponse;
try {
({ body: simulatedRollover } = await clusterClient.indices.rollover({
alias,
dry_run: true,
}));
} catch (err) {
if (err?.meta?.body?.error?.type !== 'index_not_found_exception') {
throw err;
}
return;
}

const [writeIndexMapping, simulatedIndexMapping] = await Promise.all([
clusterClient.indices.get({
index: simulatedRollover.old_index,
}),
clusterClient.indices.simulateIndexTemplate({
name: simulatedRollover.new_index,
}),
]);
const currentVersions = get(writeIndexMapping, [
'body',
simulatedRollover.old_index,
'mappings',
'_meta',
'versions',
]);
const { body: simulatedIndex } = await clusterClient.indices.simulateIndexTemplate({
name: simulatedRollover.new_index,
});
const targetVersions = get(simulatedIndex, ['template', 'mappings', '_meta', 'versions']);
const targetVersions = get(simulatedIndexMapping, [
'body',
'template',
'mappings',
'_meta',
'versions',
]);
const componentTemplateRemoved =
Object.keys(currentVersions).length > Object.keys(targetVersions).length;
const componentTemplateUpdated = Object.entries(targetVersions).reduce(
Expand All @@ -137,18 +154,20 @@ export class RuleDataClient implements IRuleDataClient {
);
const needRollover = componentTemplateRemoved || componentTemplateUpdated;
if (needRollover) {
await clusterClient.indices.rollover({
alias,
body: {
conditions: {
max_age: '5s',
},
},
});
try {
await clusterClient.indices.rollover({
alias,
new_index: simulatedRollover.new_index,
});
} catch (err) {
if (err?.meta?.body?.error?.type !== 'resource_already_exists_exception') {
throw err;
}
}
}
}

async createOrUpdateWriteTarget({ namespace }: { namespace?: string }) {
async createWriteTargetIfNeeded({ namespace }: { namespace?: string }) {
const alias = getNamespacedAlias({ alias: this.options.alias, namespace });

const clusterClient = await this.getClusterClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export interface RuleDataWriter {
export interface IRuleDataClient {
getReader(options?: { namespace?: string }): RuleDataReader;
getWriter(options?: { namespace?: string }): Promise<RuleDataWriter>;
createOrUpdateWriteTarget(options: { namespace?: string }): Promise<void>;
createWriteTargetIfNeeded(options: { namespace?: string }): Promise<void>;
}

export interface RuleDataClientConstructorOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import { ClusterPutComponentTemplate } from '@elastic/elasticsearch/api/requestParams';
import { estypes } from '@elastic/elasticsearch';
import { ElasticsearchClient, Logger } from 'kibana/server';
import { merge } from 'lodash';
import { technicalComponentTemplate } from '../../common/assets/component_templates/technical_component_template';
import {
DEFAULT_ILM_POLICY_ID,
Expand Down Expand Up @@ -109,24 +110,15 @@ export class RuleDataPluginService {

const clusterClient = await this.getClusterClient();
this.options.logger.debug(`Installing component template ${template.name}`);
return clusterClient.cluster.putComponentTemplate({
...template,
body: {
...template.body,
template: {
...template.body.template,
mappings: {
...template.body.template.mappings,
_meta: {
...template.body.template.mappings._meta,
versions: {
[template.name]: templateVersion,
},
},
},
const mergedTemplate = merge(
{
body: {
template: { mappings: { _meta: { versions: { [template.name]: templateVersion } } } },
},
},
});
template
);
return clusterClient.cluster.putComponentTemplate(mergedTemplate);
}

private async _createOrUpdateIndexTemplate({
Expand All @@ -140,24 +132,15 @@ export class RuleDataPluginService {

const clusterClient = await this.getClusterClient();
this.options.logger.debug(`Installing index template ${template.name}`);
return clusterClient.indices.putIndexTemplate({
...template,
body: {
...template.body,
template: {
...template.body?.template,
mappings: {
...template.body?.template?.mappings,
_meta: {
...template.body?.template?.mappings?._meta,
versions: {
[template.name]: templateVersion,
},
},
},
const mergedTemplate = merge(
{
body: {
template: { mappings: { _meta: { versions: { [template.name]: templateVersion } } } },
},
},
});
template
);
return clusterClient.indices.putIndexTemplate(mergedTemplate);
}

private async _createOrUpdateLifecyclePolicy(policy: estypes.IlmPutLifecycleRequest) {
Expand Down

0 comments on commit 19b9eaf

Please sign in to comment.