diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts index cc6b43b40da7b..13059ebd4fe80 100644 --- a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts @@ -110,7 +110,7 @@ describe('alert_conflict_resolver', () => { }); test('no errors in bulk results', async () => { - const { bulkRequest, bulkResponse } = getReqRes('c is is c is'); + const { bulkRequest, bulkResponse } = getReqRes('cs is is cs is'); await resolveAlertConflicts({ logger, esClient, @@ -163,18 +163,42 @@ describe('alert_conflict_resolver', () => { ); }); - test('one conflicted doc amonst other successes and errors', async () => { - const { bulkRequest, bulkResponse } = getReqRes('is c ic ie'); + test('one conflicted index doc amonst other successes and errors', async () => { + const { bulkRequest, bulkResponse } = getReqRes('is cs ic ie'); + esClient.mget.mockResolvedValueOnce({ docs: [getMGetResDoc(2, alertDoc)] }); + esClient.bulk.mockResolvedValueOnce({ errors: false, took: 0, items: [getBulkResItem(2)] }); - esClient.mget.mockResolvedValueOnce({ - docs: [getMGetResDoc(2, alertDoc)], + await resolveAlertConflicts({ + logger, + esClient, + bulkRequest, + bulkResponse, + ruleId, + ruleName, + ruleType, }); - esClient.bulk.mockResolvedValueOnce({ - errors: false, - took: 0, - items: [getBulkResItem(2)], - }); + expect(logger.error).toHaveBeenNthCalledWith( + 1, + `Error writing alerts ${ruleInfo}: 2 successful, 1 conflicts, 1 errors: hallo`, + logTags + ); + expect(logger.info).toHaveBeenNthCalledWith( + 1, + `Retrying bulk update of 1 conflicted alerts ${ruleInfo}`, + logTags + ); + expect(logger.info).toHaveBeenNthCalledWith( + 2, + `Retried bulk update of 1 conflicted alerts succeeded ${ruleInfo}`, + logTags + ); + }); + + test('one conflicted create doc amonst other successes and errors', async () => { + const { bulkRequest, bulkResponse } = getReqRes('is cs cc ie'); + esClient.mget.mockResolvedValueOnce({ docs: [getMGetResDoc(2, alertDoc)] }); + esClient.bulk.mockResolvedValueOnce({ errors: false, took: 0, items: [getBulkResItem(2)] }); await resolveAlertConflicts({ logger, @@ -204,7 +228,7 @@ describe('alert_conflict_resolver', () => { }); test('multiple conflicted doc amonst other successes and errors', async () => { - const { bulkRequest, bulkResponse } = getReqRes('is c ic ic ie ic'); + const { bulkRequest, bulkResponse } = getReqRes('is cs ic cc ie ic'); esClient.mget.mockResolvedValueOnce({ docs: [getMGetResDoc(2, alertDoc), getMGetResDoc(3, alertDoc), getMGetResDoc(5, alertDoc)], @@ -276,7 +300,9 @@ interface GetReqResResult { /** * takes as input a string of c, is, ic, ie tokens and builds appropriate * bulk request and response objects to use in the tests: - * - c: create, ignored by the resolve logic + * - cs: create with success + * - cc: create with conflict + * - ce: create with error but not conflict * - is: index with success * - ic: index with conflict * - ie: index with error but not conflict @@ -293,18 +319,30 @@ function getReqRes(bulkOps: string): GetReqResResult { if (ops[0] === '') return { bulkRequest, bulkResponse }; - const createOp = { create: {} }; - let id = 0; for (const op of ops) { id++; switch (op) { - // create, ignored by the resolve logic - case 'c': - bulkRequest.operations.push(createOp, alertDoc); + // create with success + case 'cs': + bulkRequest.operations.push(getCreateOp(id), alertDoc); bulkResponse.items.push(getResponseItem('create', id, false, 200)); break; + // index with conflict + case 'cc': + bulkResponse.errors = true; + bulkRequest.operations.push(getCreateOp(id), alertDoc); + bulkResponse.items.push(getResponseItem('create', id, true, 409)); + break; + + // index with error but not conflict + case 'ce': + bulkResponse.errors = true; + bulkRequest.operations.push(getCreateOp(id), alertDoc); + bulkResponse.items.push(getResponseItem('create', id, true, 418)); // I'm a teapot + break; + // index with success case 'is': bulkRequest.operations.push(getIndexOp(id), alertDoc); @@ -355,6 +393,16 @@ function getIndexOp(id: number) { }; } +function getCreateOp(id: number) { + return { + create: { + _id: `id-${id}`, + _index: `index-${id}`, + require_alias: false, + }, + }; +} + function getBulkResponse(): BulkResponse { return { errors: false, diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts index 55a3a885f1c71..1f815ac925c07 100644 --- a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts @@ -10,6 +10,12 @@ import { BulkResponse, BulkOperationContainer, MgetResponseItem, + BulkCreateOperation, + BulkIndexOperation, + BulkUpdateOperation, + BulkDeleteOperation, + BulkOperationType, + BulkResponseItem, } from '@elastic/elasticsearch/lib/api/types'; import { Logger, ElasticsearchClient } from '@kbn/core/server'; @@ -40,6 +46,13 @@ export interface ResolveAlertConflictsParams { ruleType: string; } +type BulkOperation = + | BulkCreateOperation + | BulkIndexOperation + | BulkUpdateOperation + | BulkDeleteOperation; + +type BulkItem = Partial>; interface NormalizedBulkRequest { op: BulkOperationContainer; doc: unknown; @@ -134,6 +147,28 @@ interface MakeBulkRequestResponse { error?: Error; } +function getBulkOperation(opContainer?: BulkOperationContainer): BulkOperation | undefined { + if (!opContainer) return undefined; + + const operation = + opContainer.create || opContainer.index || opContainer.update || opContainer.delete; + + if (!operation) { + throw new Error(`Missing bulk op in op container: ${JSON.stringify(opContainer)}`); + } + return operation; +} + +function getItemInfoFromBulk(item?: BulkItem): BulkResponseItem | undefined { + if (!item) return undefined; + + const info = item.create || item.index || item.update || item.delete; + if (!info) { + throw new Error(`Missing bulk op in bulk request: ${JSON.stringify(item)}`); + } + return info; +} + // make the bulk request to fix conflicts async function makeBulkRequest( esClient: ElasticsearchClient, @@ -146,7 +181,7 @@ async function makeBulkRequest( const bulkResponse = await esClient.bulk(updatedBulkRequest); - const errors = bulkResponse.items.filter((item) => item.index?.error).length; + const errors = bulkResponse.items.filter((item) => getItemInfoFromBulk(item)?.error).length; return { bulkRequest, bulkResponse, errors }; } @@ -156,7 +191,7 @@ async function refreshFieldsInDocs( freshResponses: MgetResponseItem[] ) { for (const [conflictRequest, freshResponse] of zip(conflictRequests, freshResponses)) { - if (!conflictRequest?.op.index || !freshResponse) continue; + if (!conflictRequest || !getBulkOperation(conflictRequest?.op) || !freshResponse) continue; // @ts-expect-error @elastic/elasticsearch _source is not in the type! const freshDoc = freshResponse._source; @@ -190,7 +225,10 @@ async function refreshFieldsInDocs( /** Update the OCC info in the conflict request with the fresh info. */ async function updateOCC(conflictRequests: NormalizedBulkRequest[], freshDocs: MgetResponseItem[]) { for (const [req, freshDoc] of zip(conflictRequests, freshDocs)) { - if (!req?.op.index || !freshDoc) continue; + if (!req) continue; + + const bulkOperation = getBulkOperation(req?.op); + if (!bulkOperation || !freshDoc) continue; // @ts-expect-error @elastic/elasticsearch _seq_no is not in the type! const seqNo: number | undefined = freshDoc._seq_no; @@ -200,8 +238,8 @@ async function updateOCC(conflictRequests: NormalizedBulkRequest[], freshDocs: M if (seqNo === undefined) throw new Error('Unexpected undefined seqNo'); if (primaryTerm === undefined) throw new Error('Unexpected undefined primaryTerm'); - req.op.index.if_seq_no = seqNo; - req.op.index.if_primary_term = primaryTerm; + bulkOperation.if_seq_no = seqNo; + bulkOperation.if_primary_term = primaryTerm; } } @@ -213,7 +251,8 @@ async function getFreshDocs( const docs: Array<{ _id: string; _index: string }> = []; conflictRequests.forEach((req) => { - const [id, index] = [req.op.index?._id, req.op.index?._index]; + const bulkOperation = getBulkOperation(req?.op); + const [id, index] = [bulkOperation?._id, bulkOperation?._index]; if (!id || !index) return; docs.push({ _id: id, _index: index }); @@ -245,9 +284,9 @@ function getConflictRequest( if (request.length === 0) return []; - // we only want op: index where the status was 409 / conflict + // pick out just the conflicts (409) const conflictRequest = zip(request, bulkResponse.items) - .filter(([_, res]) => res?.index?.status === 409) + .filter(([_, res]) => getItemInfoFromBulk(res)?.status === 409) .map(([req, _]) => req!); return conflictRequest; @@ -292,9 +331,9 @@ function getResponseStats(bulkResponse: BulkResponse): ResponseStatsResult { const sanitizedResponse = sanitizeBulkErrorResponse(bulkResponse) as BulkResponse; const stats: ResponseStatsResult = { success: 0, conflicts: 0, errors: 0, messages: [] }; for (const item of sanitizedResponse.items) { - const op = item.create || item.index || item.update || item.delete; + const op = getItemInfoFromBulk(item); if (op?.error) { - if (op?.status === 409 && op === item.index) { + if (op?.status === 409) { stats.conflicts++; } else { stats.errors++; diff --git a/x-pack/plugins/alerting/server/integration_tests/alert_conflicts.test.ts b/x-pack/plugins/alerting/server/integration_tests/alert_conflicts.test.ts new file mode 100644 index 0000000000000..16c5e09966db7 --- /dev/null +++ b/x-pack/plugins/alerting/server/integration_tests/alert_conflicts.test.ts @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import supertest from 'supertest'; + +import { DEFAULT_APP_CATEGORIES } from '@kbn/core/server'; +import { STACK_ALERTS_FEATURE_ID } from '@kbn/rule-data-utils'; +import { + type TestElasticsearchUtils, + type TestKibanaUtils, +} from '@kbn/core-test-helpers-kbn-server'; + +import { ExecutorType, IRuleTypeAlerts } from '..'; +import { setupTestServers } from './lib'; +import { RuleType, RawAlertInstance } from '../types'; +import type { RuleTypeRegistry } from '../rule_type_registry'; + +jest.mock('../rule_type_registry', () => { + const actual = jest.requireActual('../rule_type_registry'); + return { + ...actual, + RuleTypeRegistry: jest.fn().mockImplementation((opts) => { + return new actual.RuleTypeRegistry(opts); + }), + }; +}); + +jest.mock('../lib/determine_alerts_to_return', () => { + const actual = jest.requireActual('../lib/determine_alerts_to_return'); + return { + ...actual, + determineAlertsToReturn: jest.fn().mockImplementation((...args) => { + const result = new actual.determineAlertsToReturn(...args); + return determineAlertsToReturnAdapter(result); + }), + }; +}); + +describe('Handle conflicts when writing alert docs', () => { + let esServer: TestElasticsearchUtils; + let kibanaServer: TestKibanaUtils; + let ruleTypeRegistry: RuleTypeRegistry; + + beforeAll(async () => { + const setupResult = await setupTestServers(); + esServer = setupResult.esServer; + kibanaServer = setupResult.kibanaServer; + + const mockedRuleTypeRegistry = jest.requireMock('../rule_type_registry'); + expect(mockedRuleTypeRegistry.RuleTypeRegistry).toHaveBeenCalledTimes(1); + ruleTypeRegistry = mockedRuleTypeRegistry.RuleTypeRegistry.mock.results[0].value; + + ruleTypeRegistry.register(getConflictingAlertsRuleType()); + }); + + afterAll(async () => { + if (kibanaServer) { + await kibanaServer.stop(); + } + if (esServer) { + await esServer.stop(); + } + }); + + test('handle conflict when creating alert', async () => {}); +}); + +function determineAlertsToReturnAdapter({ + alertsToReturn, + recoveredAlertsToReturn, +}: { + alertsToReturn: Record; + recoveredAlertsToReturn: Record; +}): { + alertsToReturn: Record; + recoveredAlertsToReturn: Record; +} { + // creates copies of alerts, same uuid, but the id is different + for (const id of Object.keys(alertsToReturn)) { + alertsToReturn[`${id}-conflicted`] = alertsToReturn[id]; + } + return { alertsToReturn, recoveredAlertsToReturn }; +} + +function getExecutor(): ExecutorType<{}, {}, {}, {}, string, {}> { + return async function () { + return { state: {} }; + }; +} + +function getConflictingAlertsRuleType(): RuleType<{}, {}, {}, {}, {}, string, string, {}> { + return { + id: RULE_TYPE_ID, + name: RULE_TYPE_ID, + actionGroups: [{ id: ACTION_GROUP_ID, name: ACTION_GROUP_ID }], + defaultActionGroupId: ACTION_GROUP_ID, + validate: { params: { validate: () => ({}) } }, + actionVariables: {}, + minimumLicenseRequired: 'basic', + isExportable: true, + executor: getExecutor(), + category: DEFAULT_APP_CATEGORIES.management.id, + producer: STACK_ALERTS_FEATURE_ID, + doesSetRecoveryContext: true, + alerts: STACK_ALERTS_AAD_CONFIG, + }; +} + +async function httpGet(kibanaServer: TestKibanaUtils, path: string) { + await supertest(kibanaServer.coreSetup.http.server.listener).get(path).send(); +} + +async function httpPost(kibanaServer: TestKibanaUtils, path: string, body?: never) { + await supertest(kibanaServer.coreSetup.http.server.listener).post(path).send(body); +} + +async function wait(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export const STACK_ALERTS_AAD_CONFIG: IRuleTypeAlerts<{}> = { + context: 'jest', + mappings: { + fieldMap: {}, + }, + shouldWrite: true, + useEcs: true, +}; + +export const RULE_TYPE_ID = '...conflicting-alert-docs'; +export const ACTION_GROUP_ID = 'default'; diff --git a/x-pack/plugins/alerting/server/integration_tests/lib/setup_test_servers.ts b/x-pack/plugins/alerting/server/integration_tests/lib/setup_test_servers.ts index 4b722d5460213..d07519b90a6d2 100644 --- a/x-pack/plugins/alerting/server/integration_tests/lib/setup_test_servers.ts +++ b/x-pack/plugins/alerting/server/integration_tests/lib/setup_test_servers.ts @@ -5,6 +5,7 @@ * 2.0. */ +import supertest from 'supertest'; import { createTestServers, createRootWithCorePlugins } from '@kbn/core-test-helpers-kbn-server'; export async function setupTestServers(settings = {}) { @@ -32,6 +33,7 @@ export async function setupTestServers(settings = {}) { coreSetup, coreStart, stop: async () => await root.shutdown(), + getSupertest: () => supertest(coreSetup.http.server.listener), }, }; }