Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[on hold] [ResponseOps] change AAD indices to data streams #156123

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,29 +143,23 @@ export const createConcreteWriteIndex = async ({
indexPatterns,
totalFieldsLimit,
}: CreateConcreteWriteIndexOpts) => {
logger.info(`Creating concrete write index - ${indexPatterns.name}`);
logger.info(`Creating data stream - ${indexPatterns.alias}`);

// check if a concrete write index already exists
let concreteIndices: ConcreteIndexInfo[] = [];
try {
// Specify both the index pattern for the backing indices and their aliases
// The alias prevents the request from finding other namespaces that could match the -* pattern
const response = await retryTransientEsErrors(
() =>
esClient.indices.getAlias({
index: indexPatterns.pattern,
name: indexPatterns.basePattern,
}),
() => esClient.indices.getDataStream({ name: indexPatterns.alias, expand_wildcards: 'all' }),
{ logger }
);

concreteIndices = Object.entries(response).flatMap(([index, { aliases }]) =>
Object.entries(aliases).map(([aliasName, aliasProperties]) => ({
index,
alias: aliasName,
isWriteIndex: aliasProperties.is_write_index ?? false,
}))
);
concreteIndices = response.data_streams.map((dataStream) => ({
index: dataStream.name,
alias: dataStream.name,
isWriteIndex: true,
}));

logger.debug(
`Found ${concreteIndices.length} concrete indices for ${
Expand All @@ -182,11 +176,14 @@ export const createConcreteWriteIndex = async ({
}
}

let concreteWriteIndicesExist = false;
// let concreteWriteIndicesExist = false;
const concreteWriteIndicesExist = concreteIndices.length > 0;

// if a concrete write index already exists, update the underlying mapping
if (concreteIndices.length > 0) {
await updateIndexMappings({ logger, esClient, totalFieldsLimit, concreteIndices });

/*
const concreteIndicesExist = concreteIndices.some(
(index) => index.alias === indexPatterns.alias
);
Expand All @@ -201,30 +198,26 @@ export const createConcreteWriteIndex = async ({
`Indices matching pattern ${indexPatterns.pattern} exist but none are set as the write index for alias ${indexPatterns.alias}`
);
}
*/
}

// check if a concrete write index already exists
if (!concreteWriteIndicesExist) {
try {
await retryTransientEsErrors(
() =>
esClient.indices.create({
index: indexPatterns.name,
body: {
aliases: {
[indexPatterns.alias]: {
is_write_index: true,
},
},
},
esClient.indices.createDataStream({
name: indexPatterns.alias,
}),
{ logger }
);
} catch (error) {
logger.error(`Error creating concrete write index - ${error.message}`);
throw error;
// If the index already exists and it's the write index for the alias,
// something else created it so suppress the error. If it's not the write
// index, that's bad, throw an error.
/*
if (error?.meta?.body?.error?.type === 'resource_already_exists_exception') {
const existingIndices = await retryTransientEsErrors(
() => esClient.indices.get({ index: indexPatterns.name }),
Expand All @@ -238,6 +231,7 @@ export const createConcreteWriteIndex = async ({
} else {
throw error;
}
*/
}
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ export const getIndexTemplate = ({
return {
name: indexPatterns.template,
body: {
index_patterns: [indexPatterns.pattern],
data_stream: { hidden: true },
index_patterns: [indexPatterns.alias],
composed_of: componentTemplateRefs,
template: {
settings: {
auto_expand_replicas: '0-1',
hidden: true,
'index.lifecycle': {
name: ilmPolicyName,
rollover_alias: indexPatterns.alias,
// rollover_alias: indexPatterns.alias,
},
'index.mapping.total_fields.limit': totalFieldsLimit,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ export const getIndexTemplateAndPattern = ({
const pattern = `${context}.alerts`;
const patternWithNamespace = `${pattern}-${concreteNamespace}`;
return {
template: `.alerts-${patternWithNamespace}-index-template`,
pattern: `.internal.alerts-${patternWithNamespace}-*`,
template: `.alerts-${patternWithNamespace}-index-template`, // still used
pattern: `.internal.alerts-${patternWithNamespace}-*`, // no longer used
basePattern: `.alerts-${pattern}-*`,
name: `.internal.alerts-${patternWithNamespace}-000001`,
alias: `.alerts-${patternWithNamespace}`,
name: `.internal.alerts-${patternWithNamespace}-000001`, // no longer used
alias: `.alerts-${patternWithNamespace}`, // data stream name
...(secondaryAlias ? { secondaryAlias: `${secondaryAlias}-${concreteNamespace}` } : {}),
};
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { errors } from '@elastic/elasticsearch';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type * as estypes from '@elastic/elasticsearch/lib/api/types';
import { Either, isLeft } from 'fp-ts/lib/Either';

import { ElasticsearchClient } from '@kbn/core/server';
Expand Down Expand Up @@ -118,6 +118,7 @@ export class RuleDataClient implements IRuleDataClient {
...request,
index: indexPattern,
ignore_unavailable: true,
seq_no_primary_term: true,
})) as unknown as ESSearchResponse<TAlertDoc, TSearchRequest>;
} catch (err) {
this.options.logger.error(`Error performing search in RuleDataClient - ${err.message}`);
Expand Down Expand Up @@ -235,13 +236,12 @@ export class RuleDataClient implements IRuleDataClient {
bulk: async (request: estypes.BulkRequest) => {
try {
if (this.clusterClient) {
const requestWithDefaultParameters = {
...request,
require_alias: true,
index: alias,
};
addCreateIndexBulkActionDoc(request, alias);
this.options.logger.debug(
`writing bulk data: alias: "${alias}" ${JSON.stringify(request, null, 4)}`
);

const response = await this.clusterClient.bulk(requestWithDefaultParameters, {
const response = await this.clusterClient.bulk(request, {
meta: true,
});

Expand All @@ -261,3 +261,13 @@ export class RuleDataClient implements IRuleDataClient {
};
}
}

function addCreateIndexBulkActionDoc(request: estypes.BulkRequest, alias: string) {
const docs: Array<Record<string, Record<string, unknown>>> = ((request as any).body as any) || [];

for (let index = 0; index < docs.length; index += 2) {
if (new Set(Object.keys(docs[index])).has('create')) {
docs[index].create._index = alias;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,14 @@ export const createLifecycleExecutor =
`[Rule Registry] Tracking ${allAlertIds.length} alerts (${newAlertIds.length} new, ${trackedAlertStates.length} previous)`
);

const trackedAlertsDataMap: Record<
string,
{ indexName: string; fields: Partial<ParsedTechnicalFields & ParsedExperimentalFields> }
> = {};
interface TrackedAlertData {
indexName: string;
fields: Partial<ParsedTechnicalFields & ParsedExperimentalFields>;
seqNo: number | undefined;
primaryTerm: number | undefined;
}

const trackedAlertsDataMap: Record<string, TrackedAlertData> = {};

if (trackedAlertStates.length) {
const result = await fetchExistingAlerts(
Expand All @@ -230,10 +234,18 @@ export const createLifecycleExecutor =
result.forEach((hit) => {
const alertInstanceId = hit._source ? hit._source[ALERT_INSTANCE_ID] : void 0;
if (alertInstanceId && hit._source) {
trackedAlertsDataMap[alertInstanceId] = {
indexName: hit._index,
fields: hit._source,
};
if (hit._seq_no == null) {
logger.error(`missing _seq_no on alert instance ${alertInstanceId}`);
} else if (hit._primary_term == null) {
logger.error(`missing _primary_term on alert instance ${alertInstanceId}`);
} else {
trackedAlertsDataMap[alertInstanceId] = {
indexName: hit._index,
fields: hit._source,
seqNo: hit._seq_no,
primaryTerm: hit._primary_term,
};
}
}
});
}
Expand Down Expand Up @@ -308,6 +320,8 @@ export const createLifecycleExecutor =

return {
indexName: alertData?.indexName,
seqNo: alertData?.seqNo,
primaryTerm: alertData?.primaryTerm,
event,
flappingHistory,
flapping,
Expand All @@ -334,11 +348,25 @@ export const createLifecycleExecutor =
if (allEventsToIndex.length > 0 && writeAlerts) {
logger.debug(`[Rule Registry] Preparing to index ${allEventsToIndex.length} alerts.`);

// ? { index: { _id: event[ALERT_UUID]!, _index: indexName, require_alias: false, if_seq_no: 0, if_primary_term: 1 } }
await ruleDataClientWriter.bulk({
body: allEventsToIndex.flatMap(({ event, indexName }) => [
body: allEventsToIndex.flatMap(({ event, indexName, seqNo, primaryTerm }) => [
indexName
? { index: { _id: event[ALERT_UUID]!, _index: indexName, require_alias: false } }
: { index: { _id: event[ALERT_UUID]! } },
? {
index: {
_id: event[ALERT_UUID]!,
_index: indexName,
if_seq_no: seqNo,
if_primary_term: primaryTerm,
require_alias: false,
},
}
: {
create: {
_id: event[ALERT_UUID]!,
require_alias: false,
},
},
event,
]),
refresh: 'wait_for',
Expand Down