Skip to content

Commit

Permalink
[ResponseOps][Alerting] fix alert conflict resolution to support create
Browse files Browse the repository at this point in the history
resolves: #190376

In PR #160572, we changed from
using just the bulk op `index` to using `create` when new alerts are
being created.

Unfortunately, the code to handle the bulk responses didn't take into
account that the bulk responses for `create`s need different handling
than `index`s.  Specifically, conflicts for `create` were being treated
as errors.

This PR changes the processing to consider additional ops besides just
`index`.
  • Loading branch information
pmuellr committed Dec 10, 2024
1 parent 2818a7c commit e7b6311
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 54 deletions.
50 changes: 25 additions & 25 deletions x-pack/plugins/alerting/server/alerts_client/alerts_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,33 +175,9 @@ export class AlertsClient<
return;
}

const queryByUuid = async (uuids: string[]) => {
const result = await this.search({
size: uuids.length,
seq_no_primary_term: true,
query: {
bool: {
filter: [
{
term: {
[ALERT_RULE_UUID]: this.options.rule.id,
},
},
{
terms: {
[ALERT_UUID]: uuids,
},
},
],
},
},
});
return result.hits;
};

try {
const results = await Promise.all(
chunk(uuidsToFetch, CHUNK_SIZE).map((uuidChunk: string[]) => queryByUuid(uuidChunk))
chunk(uuidsToFetch, CHUNK_SIZE).map((uuidChunk: string[]) => this.queryByUuid(uuidChunk))
);

for (const hit of results.flat()) {
Expand All @@ -225,6 +201,30 @@ export class AlertsClient<
}
}

public async queryByUuid(uuids: string[]) {
const result = await this.search({
size: uuids.length,
seq_no_primary_term: true,
query: {
bool: {
filter: [
{
term: {
[ALERT_RULE_UUID]: this.options.rule.id,
},
},
{
terms: {
[ALERT_UUID]: uuids,
},
},
],
},
},
});
return result.hits;
}

public async search<Aggregation = unknown>(
queryBody: SearchRequest['body']
): Promise<SearchResult<AlertData, Aggregation>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ describe('alert_conflict_resolver', () => {
});

test('no errors in bulk results', async () => {
const { bulkRequest, bulkResponse } = getReqRes('c is is c is');
const { bulkRequest, bulkResponse } = getReqRes('cs is is cs is');
await resolveAlertConflicts({
logger,
esClient,
Expand Down Expand Up @@ -163,18 +163,42 @@ describe('alert_conflict_resolver', () => {
);
});

test('one conflicted doc amonst other successes and errors', async () => {
const { bulkRequest, bulkResponse } = getReqRes('is c ic ie');
test('one conflicted index doc amonst other successes and errors', async () => {
const { bulkRequest, bulkResponse } = getReqRes('is cs ic ie');
esClient.mget.mockResolvedValueOnce({ docs: [getMGetResDoc(2, alertDoc)] });
esClient.bulk.mockResolvedValueOnce({ errors: false, took: 0, items: [getBulkResItem(2)] });

esClient.mget.mockResolvedValueOnce({
docs: [getMGetResDoc(2, alertDoc)],
await resolveAlertConflicts({
logger,
esClient,
bulkRequest,
bulkResponse,
ruleId,
ruleName,
ruleType,
});

esClient.bulk.mockResolvedValueOnce({
errors: false,
took: 0,
items: [getBulkResItem(2)],
});
expect(logger.error).toHaveBeenNthCalledWith(
1,
`Error writing alerts ${ruleInfo}: 2 successful, 1 conflicts, 1 errors: hallo`,
logTags
);
expect(logger.info).toHaveBeenNthCalledWith(
1,
`Retrying bulk update of 1 conflicted alerts ${ruleInfo}`,
logTags
);
expect(logger.info).toHaveBeenNthCalledWith(
2,
`Retried bulk update of 1 conflicted alerts succeeded ${ruleInfo}`,
logTags
);
});

test('one conflicted create doc amonst other successes and errors', async () => {
const { bulkRequest, bulkResponse } = getReqRes('is cs cc ie');
esClient.mget.mockResolvedValueOnce({ docs: [getMGetResDoc(2, alertDoc)] });
esClient.bulk.mockResolvedValueOnce({ errors: false, took: 0, items: [getBulkResItem(2)] });

await resolveAlertConflicts({
logger,
Expand Down Expand Up @@ -204,7 +228,7 @@ describe('alert_conflict_resolver', () => {
});

test('multiple conflicted doc amonst other successes and errors', async () => {
const { bulkRequest, bulkResponse } = getReqRes('is c ic ic ie ic');
const { bulkRequest, bulkResponse } = getReqRes('is cs ic cc ie ic');

esClient.mget.mockResolvedValueOnce({
docs: [getMGetResDoc(2, alertDoc), getMGetResDoc(3, alertDoc), getMGetResDoc(5, alertDoc)],
Expand Down Expand Up @@ -276,7 +300,9 @@ interface GetReqResResult {
/**
* takes as input a string of c, is, ic, ie tokens and builds appropriate
* bulk request and response objects to use in the tests:
* - c: create, ignored by the resolve logic
* - cs: create with success
* - cc: create with conflict
* - ce: create with error but not conflict
* - is: index with success
* - ic: index with conflict
* - ie: index with error but not conflict
Expand All @@ -293,18 +319,30 @@ function getReqRes(bulkOps: string): GetReqResResult {

if (ops[0] === '') return { bulkRequest, bulkResponse };

const createOp = { create: {} };

let id = 0;
for (const op of ops) {
id++;
switch (op) {
// create, ignored by the resolve logic
case 'c':
bulkRequest.operations.push(createOp, alertDoc);
// create with success
case 'cs':
bulkRequest.operations.push(getCreateOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('create', id, false, 200));
break;

// index with conflict
case 'cc':
bulkResponse.errors = true;
bulkRequest.operations.push(getCreateOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('create', id, true, 409));
break;

// index with error but not conflict
case 'ce':
bulkResponse.errors = true;
bulkRequest.operations.push(getCreateOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('create', id, true, 418)); // I'm a teapot
break;

// index with success
case 'is':
bulkRequest.operations.push(getIndexOp(id), alertDoc);
Expand Down Expand Up @@ -355,6 +393,16 @@ function getIndexOp(id: number) {
};
}

function getCreateOp(id: number) {
return {
create: {
_id: `id-${id}`,
_index: `index-${id}`,
require_alias: false,
},
};
}

function getBulkResponse(): BulkResponse {
return {
errors: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import {
BulkResponse,
BulkOperationContainer,
MgetResponseItem,
BulkCreateOperation,
BulkIndexOperation,
BulkUpdateOperation,
BulkDeleteOperation,
BulkOperationType,
BulkResponseItem,
} from '@elastic/elasticsearch/lib/api/types';

import { Logger, ElasticsearchClient } from '@kbn/core/server';
Expand Down Expand Up @@ -40,6 +46,13 @@ export interface ResolveAlertConflictsParams {
ruleType: string;
}

type BulkOperation =
| BulkCreateOperation
| BulkIndexOperation
| BulkUpdateOperation
| BulkDeleteOperation;

type BulkItem = Partial<Record<BulkOperationType, BulkResponseItem>>;
interface NormalizedBulkRequest {
op: BulkOperationContainer;
doc: unknown;
Expand All @@ -61,11 +74,11 @@ export async function resolveAlertConflicts(params: ResolveAlertConflictsParams)
}

async function resolveAlertConflicts_(params: ResolveAlertConflictsParams): Promise<void> {
const { logger, esClient, bulkRequest, bulkResponse, ruleId, ruleType, ruleName } = params;
const { logger, esClient, bulkRequest, bulkResponse, ruleId, ruleType } = params;
if (bulkRequest.operations && bulkRequest.operations?.length === 0) return;
if (bulkResponse.items && bulkResponse.items?.length === 0) return;

const ruleInfoMessage = `for ${ruleType}:${ruleId} '${ruleName}'`;
const ruleInfoMessage = `for ${ruleType}:${ruleId}`;
const logTags = { tags: [ruleType, ruleId, 'resolve-alert-conflicts'] };

// get numbers for a summary log message
Expand All @@ -82,6 +95,9 @@ async function resolveAlertConflicts_(params: ResolveAlertConflictsParams): Prom
const conflictRequest = getConflictRequest(bulkRequest, bulkResponse);
if (conflictRequest.length === 0) return;

// conflicts from creates need to be retried as updates
convertCreatesToindexes(conflictRequest);

// get the fresh versions of those docs
const freshDocs = await getFreshDocs(esClient, conflictRequest);

Expand Down Expand Up @@ -134,6 +150,28 @@ interface MakeBulkRequestResponse {
error?: Error;
}

function getBulkOperation(opContainer?: BulkOperationContainer): BulkOperation | undefined {
if (!opContainer) return undefined;

const operation =
opContainer.create || opContainer.index || opContainer.update || opContainer.delete;

if (!operation) {
throw new Error(`Missing bulk op in op container: ${JSON.stringify(opContainer)}`);
}
return operation;
}

function getItemInfoFromBulk(item?: BulkItem): BulkResponseItem | undefined {
if (!item) return undefined;

const info = item.create || item.index || item.update || item.delete;
if (!info) {
throw new Error(`Missing bulk op in bulk request: ${JSON.stringify(item)}`);
}
return info;
}

// make the bulk request to fix conflicts
async function makeBulkRequest(
esClient: ElasticsearchClient,
Expand All @@ -146,7 +184,7 @@ async function makeBulkRequest(

const bulkResponse = await esClient.bulk(updatedBulkRequest);

const errors = bulkResponse.items.filter((item) => item.index?.error).length;
const errors = bulkResponse.items.filter((item) => getItemInfoFromBulk(item)?.error).length;
return { bulkRequest, bulkResponse, errors };
}

Expand All @@ -156,7 +194,7 @@ async function refreshFieldsInDocs(
freshResponses: MgetResponseItem[]
) {
for (const [conflictRequest, freshResponse] of zip(conflictRequests, freshResponses)) {
if (!conflictRequest?.op.index || !freshResponse) continue;
if (!conflictRequest || !getBulkOperation(conflictRequest?.op) || !freshResponse) continue;

// @ts-expect-error @elastic/elasticsearch _source is not in the type!
const freshDoc = freshResponse._source;
Expand Down Expand Up @@ -190,7 +228,10 @@ async function refreshFieldsInDocs(
/** Update the OCC info in the conflict request with the fresh info. */
async function updateOCC(conflictRequests: NormalizedBulkRequest[], freshDocs: MgetResponseItem[]) {
for (const [req, freshDoc] of zip(conflictRequests, freshDocs)) {
if (!req?.op.index || !freshDoc) continue;
if (!req) continue;

const bulkOperation = getBulkOperation(req?.op);
if (!bulkOperation || !freshDoc) continue;

// @ts-expect-error @elastic/elasticsearch _seq_no is not in the type!
const seqNo: number | undefined = freshDoc._seq_no;
Expand All @@ -200,8 +241,8 @@ async function updateOCC(conflictRequests: NormalizedBulkRequest[], freshDocs: M
if (seqNo === undefined) throw new Error('Unexpected undefined seqNo');
if (primaryTerm === undefined) throw new Error('Unexpected undefined primaryTerm');

req.op.index.if_seq_no = seqNo;
req.op.index.if_primary_term = primaryTerm;
bulkOperation.if_seq_no = seqNo;
bulkOperation.if_primary_term = primaryTerm;
}
}

Expand All @@ -213,7 +254,8 @@ async function getFreshDocs(
const docs: Array<{ _id: string; _index: string }> = [];

conflictRequests.forEach((req) => {
const [id, index] = [req.op.index?._id, req.op.index?._index];
const bulkOperation = getBulkOperation(req?.op);
const [id, index] = [bulkOperation?._id, bulkOperation?._index];
if (!id || !index) return;

docs.push({ _id: id, _index: index });
Expand Down Expand Up @@ -245,9 +287,9 @@ function getConflictRequest(

if (request.length === 0) return [];

// we only want op: index where the status was 409 / conflict
// pick out just the conflicts (409)
const conflictRequest = zip(request, bulkResponse.items)
.filter(([_, res]) => res?.index?.status === 409)
.filter(([_, res]) => getItemInfoFromBulk(res)?.status === 409)
.map(([req, _]) => req!);

return conflictRequest;
Expand Down Expand Up @@ -280,6 +322,29 @@ function normalizeRequest(bulkRequest: BulkRequest) {
return result;
}

function convertCreatesToindexes(conflictRequest: NormalizedBulkRequest[]): void {
for (const req of conflictRequest) {
if (!req.op.create) continue;
}
const uuids = conflictRequest.filter((req) => req.op.create).map((req) => req.op.create?._id);

// search for alert docs of these uuids

for (const req of conflictRequest) {
if (!req.op.create) continue;

req.op.index = {
_id: req.op.create._id,
_index: index,
if_seq_no: seqNo,
if_primary_term: primaryTerm,
require_alias: false,
};

delete req.op.create;
}
}

interface ResponseStatsResult {
success: number;
conflicts: number;
Expand All @@ -292,9 +357,9 @@ function getResponseStats(bulkResponse: BulkResponse): ResponseStatsResult {
const sanitizedResponse = sanitizeBulkErrorResponse(bulkResponse) as BulkResponse;
const stats: ResponseStatsResult = { success: 0, conflicts: 0, errors: 0, messages: [] };
for (const item of sanitizedResponse.items) {
const op = item.create || item.index || item.update || item.delete;
const op = getItemInfoFromBulk(item);
if (op?.error) {
if (op?.status === 409 && op === item.index) {
if (op?.status === 409) {
stats.conflicts++;
} else {
stats.errors++;
Expand Down
Loading

0 comments on commit e7b6311

Please sign in to comment.