diff --git a/x-pack/plugins/ml/common/constants/alerts.ts b/x-pack/plugins/ml/common/constants/alerts.ts index a801872c5444d..2192b2b504b59 100644 --- a/x-pack/plugins/ml/common/constants/alerts.ts +++ b/x-pack/plugins/ml/common/constants/alerts.ts @@ -21,26 +21,57 @@ export const TOP_N_BUCKETS_COUNT = 1; export const ALL_JOBS_SELECTION = '*'; -export const HEALTH_CHECK_NAMES: Record = { - datafeed: i18n.translate('xpack.ml.alertTypes.jobsHealthAlertingRule.datafeedCheckName', { - defaultMessage: 'Datafeed is not started', - }), - mml: i18n.translate('xpack.ml.alertTypes.jobsHealthAlertingRule.mmlCheckName', { - defaultMessage: 'Model memory limit reached', - }), - errorMessages: i18n.translate( - 'xpack.ml.alertTypes.jobsHealthAlertingRule.errorMessagesCheckName', - { +export const HEALTH_CHECK_NAMES: Record = { + datafeed: { + name: i18n.translate('xpack.ml.alertTypes.jobsHealthAlertingRule.datafeedCheckName', { + defaultMessage: 'Datafeed is not started', + }), + description: i18n.translate( + 'xpack.ml.alertTypes.jobsHealthAlertingRule.datafeedCheckDescription', + { + defaultMessage: 'Get alerted if the corresponding datafeed of the job is not started', + } + ), + }, + mml: { + name: i18n.translate('xpack.ml.alertTypes.jobsHealthAlertingRule.mmlCheckName', { + defaultMessage: 'Model memory limit reached', + }), + description: i18n.translate('xpack.ml.alertTypes.jobsHealthAlertingRule.mmlCheckDescription', { + defaultMessage: 'Get alerted when job reaches soft or hard model memory limit.', + }), + }, + delayedData: { + name: i18n.translate('xpack.ml.alertTypes.jobsHealthAlertingRule.delayedDataCheckName', { + defaultMessage: 'Data delay has occurred', + }), + description: i18n.translate( + 'xpack.ml.alertTypes.jobsHealthAlertingRule.delayedDataCheckDescription', + { + defaultMessage: 'Get alerted if a job missed data due to data delay.', + } + ), + }, + errorMessages: { + name: i18n.translate('xpack.ml.alertTypes.jobsHealthAlertingRule.errorMessagesCheckName', { defaultMessage: 'There are errors in the job messages', - } - ), - behindRealtime: i18n.translate( - 'xpack.ml.alertTypes.jobsHealthAlertingRule.behindRealtimeCheckName', - { + }), + description: i18n.translate( + 'xpack.ml.alertTypes.jobsHealthAlertingRule.errorMessagesCheckDescription', + { + defaultMessage: 'There are errors in the job messages', + } + ), + }, + behindRealtime: { + name: i18n.translate('xpack.ml.alertTypes.jobsHealthAlertingRule.behindRealtimeCheckName', { defaultMessage: 'Job is running behind real-time', - } - ), - delayedData: i18n.translate('xpack.ml.alertTypes.jobsHealthAlertingRule.delayedDataCheckName', { - defaultMessage: 'Data delay has occurred', - }), + }), + description: i18n.translate( + 'xpack.ml.alertTypes.jobsHealthAlertingRule.behindRealtimeCheckDescription', + { + defaultMessage: 'Job is running behind real-time', + } + ), + }, }; diff --git a/x-pack/plugins/ml/common/types/alerts.ts b/x-pack/plugins/ml/common/types/alerts.ts index 877bb2d293365..408c424f327d5 100644 --- a/x-pack/plugins/ml/common/types/alerts.ts +++ b/x-pack/plugins/ml/common/types/alerts.ts @@ -109,7 +109,7 @@ export interface JobAlertingRuleStats { alerting_rules?: MlAnomalyDetectionAlertRule[]; } -interface CommonHealthCheckConfig { +export interface CommonHealthCheckConfig { enabled: boolean; } diff --git a/x-pack/plugins/ml/common/types/annotations.ts b/x-pack/plugins/ml/common/types/annotations.ts index 0417e769e9ed5..6234444322a5b 100644 --- a/x-pack/plugins/ml/common/types/annotations.ts +++ b/x-pack/plugins/ml/common/types/annotations.ts @@ -86,7 +86,12 @@ export interface Annotation { annotation: string; job_id: string; type: ANNOTATION_TYPE.ANNOTATION | ANNOTATION_TYPE.COMMENT; - event?: string; + event?: + | 'user' + | 'delayed_data' + | 'model_snapshot_stored' + | 'model_change' + | 'categorization_status_change'; detector_index?: number; partition_field_name?: string; partition_field_value?: string; diff --git a/x-pack/plugins/ml/common/util/alerts.test.ts b/x-pack/plugins/ml/common/util/alerts.test.ts index dda9fd449467b..84205e6806133 100644 --- a/x-pack/plugins/ml/common/util/alerts.test.ts +++ b/x-pack/plugins/ml/common/util/alerts.test.ts @@ -90,6 +90,11 @@ describe('getResultJobsHealthRuleConfig', () => { mml: { enabled: true, }, + delayedData: { + docsCount: 1, + enabled: true, + timeInterval: null, + }, }); }); test('returns config with overridden values based on provided configuration', () => { @@ -97,6 +102,10 @@ describe('getResultJobsHealthRuleConfig', () => { getResultJobsHealthRuleConfig({ mml: { enabled: false }, errorMessages: { enabled: true }, + delayedData: { + enabled: true, + docsCount: 1, + }, }) ).toEqual({ datafeed: { @@ -105,6 +114,11 @@ describe('getResultJobsHealthRuleConfig', () => { mml: { enabled: false, }, + delayedData: { + docsCount: 1, + enabled: true, + timeInterval: null, + }, }); }); }); diff --git a/x-pack/plugins/ml/common/util/alerts.ts b/x-pack/plugins/ml/common/util/alerts.ts index 86afd70ad7474..7328c2a4dcc71 100644 --- a/x-pack/plugins/ml/common/util/alerts.ts +++ b/x-pack/plugins/ml/common/util/alerts.ts @@ -54,7 +54,7 @@ export function getTopNBuckets(job: Job): number { return Math.ceil(narrowBucketLength / bucketSpan.asSeconds()); } -const implementedTests = ['datafeed', 'mml'] as JobsHealthTests[]; +const implementedTests = ['datafeed', 'mml', 'delayedData'] as JobsHealthTests[]; /** * Returns tests configuration combined with default values. @@ -70,6 +70,8 @@ export function getResultJobsHealthRuleConfig(config: JobsHealthRuleTestsConfig) }, delayedData: { enabled: config?.delayedData?.enabled ?? true, + docsCount: config?.delayedData?.docsCount ?? 1, + timeInterval: config?.delayedData?.timeInterval ?? null, }, behindRealtime: { enabled: config?.behindRealtime?.enabled ?? true, diff --git a/x-pack/plugins/ml/public/alerting/jobs_health_rule/register_jobs_health_alerting_rule.ts b/x-pack/plugins/ml/public/alerting/jobs_health_rule/register_jobs_health_alerting_rule.ts index dc4b10102e4f1..f6446b454a877 100644 --- a/x-pack/plugins/ml/public/alerting/jobs_health_rule/register_jobs_health_alerting_rule.ts +++ b/x-pack/plugins/ml/public/alerting/jobs_health_rule/register_jobs_health_alerting_rule.ts @@ -12,6 +12,7 @@ import { PluginSetupContract as AlertingSetup } from '../../../../alerting/publi import { ML_ALERT_TYPES } from '../../../common/constants/alerts'; import { MlAnomalyDetectionJobsHealthRuleParams } from '../../../common/types/alerts'; import { getResultJobsHealthRuleConfig } from '../../../common/util/alerts'; +import { validateLookbackInterval } from '../validators'; export function registerJobsHealthAlertingRule( triggersActionsUi: TriggersAndActionsUIPublicPluginSetup, @@ -32,6 +33,7 @@ export function registerJobsHealthAlertingRule( errors: { includeJobs: new Array(), testsConfig: new Array(), + delayedData: new Array(), } as Record, }; @@ -53,6 +55,31 @@ export function registerJobsHealthAlertingRule( ); } + if ( + !!resultTestConfig.delayedData.timeInterval && + validateLookbackInterval(resultTestConfig.delayedData.timeInterval) + ) { + validationResult.errors.delayedData.push( + i18n.translate( + 'xpack.ml.alertTypes.jobsHealthAlertingRule.testsConfig.delayedData.timeIntervalErrorMessage', + { + defaultMessage: 'Invalid time interval', + } + ) + ); + } + + if (resultTestConfig.delayedData.docsCount === 0) { + validationResult.errors.delayedData.push( + i18n.translate( + 'xpack.ml.alertTypes.jobsHealthAlertingRule.testsConfig.delayedData.docsCountErrorMessage', + { + defaultMessage: 'Invalid number of documents', + } + ) + ); + } + return validationResult; }, requiresAppContext: false, @@ -68,6 +95,9 @@ export function registerJobsHealthAlertingRule( \\{\\{#memory_status\\}\\}Memory status: \\{\\{memory_status\\}\\} \\{\\{/memory_status\\}\\} \\{\\{#log_time\\}\\}Memory logging time: \\{\\{log_time\\}\\} \\{\\{/log_time\\}\\} \\{\\{#failed_category_count\\}\\}Failed category count: \\{\\{failed_category_count\\}\\} \\{\\{/failed_category_count\\}\\} + \\{\\{#annotation\\}\\}Annotation: \\{\\{annotation\\}\\} \\{\\{/annotation\\}\\} + \\{\\{#missed_docs_count\\}\\}Number of missed documents: \\{\\{missed_docs_count\\}\\} \\{\\{/missed_docs_count\\}\\} + \\{\\{#end_timestamp\\}\\}Latest finalized bucket with missing docs: \\{\\{end_timestamp\\}\\} \\{\\{/end_timestamp\\}\\} \\{\\{/context.results\\}\\} `, } diff --git a/x-pack/plugins/ml/public/alerting/jobs_health_rule/tests_selection_control.tsx b/x-pack/plugins/ml/public/alerting/jobs_health_rule/tests_selection_control.tsx index 07fbc63346cd4..b78c963670da2 100644 --- a/x-pack/plugins/ml/public/alerting/jobs_health_rule/tests_selection_control.tsx +++ b/x-pack/plugins/ml/public/alerting/jobs_health_rule/tests_selection_control.tsx @@ -5,12 +5,22 @@ * 2.0. */ -import React, { FC, Fragment, useCallback } from 'react'; -import { i18n } from '@kbn/i18n'; -import { EuiFormFieldset, EuiFormRow, EuiSpacer, EuiSwitch } from '@elastic/eui'; +import React, { FC, useCallback } from 'react'; +import { + EuiDescribedFormGroup, + EuiFieldNumber, + EuiForm, + EuiFormRow, + EuiIcon, + EuiSpacer, + EuiSwitch, + EuiToolTip, +} from '@elastic/eui'; +import { FormattedMessage } from '@kbn/i18n/react'; import { JobsHealthRuleTestsConfig, JobsHealthTests } from '../../../common/types/alerts'; import { getResultJobsHealthRuleConfig } from '../../../common/util/alerts'; import { HEALTH_CHECK_NAMES } from '../../../common/constants/alerts'; +import { TimeIntervalControl } from '../time_interval_control'; interface TestsSelectionControlProps { config: JobsHealthRuleTestsConfig; @@ -18,53 +28,127 @@ interface TestsSelectionControlProps { errors?: string[]; } -export const TestsSelectionControl: FC = ({ - config, - onChange, - errors, -}) => { - const uiConfig = getResultJobsHealthRuleConfig(config); +export const TestsSelectionControl: FC = React.memo( + ({ config, onChange, errors }) => { + const uiConfig = getResultJobsHealthRuleConfig(config); - const updateCallback = useCallback( - (update: Partial>) => { - onChange({ - ...(config ?? {}), - ...update, - }); - }, - [onChange, config] - ); + const updateCallback = useCallback( + (update: Partial>) => { + onChange({ + ...(config ?? {}), + ...update, + }); + }, + [onChange, config] + ); - return ( - - {Object.entries(uiConfig).map(([name, conf], i) => { - return ( - - + {(Object.entries(uiConfig) as Array< + [JobsHealthTests, typeof uiConfig[JobsHealthTests]] + >).map(([name, conf], i) => { + return ( + {HEALTH_CHECK_NAMES[name]?.name}} + description={HEALTH_CHECK_NAMES[name]?.description} > - - - - - ); - })} - - ); -}; + + + } + onChange={updateCallback.bind(null, { + [name]: { + ...uiConfig[name], + enabled: !uiConfig[name].enabled, + }, + })} + checked={uiConfig[name].enabled} + /> + + + + {name === 'delayedData' ? ( + <> + + + + } + > + + + + } + > + { + updateCallback({ + [name]: { + ...uiConfig[name], + docsCount: Number(e.target.value), + }, + }); + }} + min={1} + /> + + + + + + + + } + > + + + + } + value={uiConfig.delayedData.timeInterval} + onChange={(e) => { + updateCallback({ + [name]: { + ...uiConfig[name], + timeInterval: e, + }, + }); + }} + /> + + + + ) : null} + + ); + })} + + ); + } +); diff --git a/x-pack/plugins/ml/public/alerting/time_interval_control.tsx b/x-pack/plugins/ml/public/alerting/time_interval_control.tsx index 8030d340a3774..4ab73500958ae 100644 --- a/x-pack/plugins/ml/public/alerting/time_interval_control.tsx +++ b/x-pack/plugins/ml/public/alerting/time_interval_control.tsx @@ -27,7 +27,7 @@ export const TimeIntervalControl: FC = ({ const validationErrors = useMemo(() => validators(value), [value]); - const isInvalid = value !== undefined && !!validationErrors; + const isInvalid = !!value && !!validationErrors; return ( { const mlClient = ({ @@ -20,12 +23,15 @@ describe('JobsHealthService', () => { jobs = [ ({ job_id: 'test_job_01', + analysis_config: { bucket_span: '1h' }, } as unknown) as MlJob, ({ job_id: 'test_job_02', + analysis_config: { bucket_span: '15m' }, } as unknown) as MlJob, ({ job_id: 'test_job_03', + analysis_config: { bucket_span: '8m' }, } as unknown) as MlJob, ]; } @@ -34,6 +40,7 @@ describe('JobsHealthService', () => { jobs = [ ({ job_id: jobIds[0], + analysis_config: { bucket_span: '1h' }, } as unknown) as MlJob, ]; } @@ -84,13 +91,32 @@ describe('JobsHealthService', () => { return Promise.resolve( jobIds.map((j) => { return { + job_id: j, datafeed_id: j.replace('job', 'datafeed'), + query_delay: '3m', }; }) ); }), } as unknown) as jest.Mocked; + const annotationService = ({ + getDelayedDataAnnotations: jest.fn().mockImplementation(({ jobIds }: { jobIds: string[] }) => { + return Promise.resolve( + jobIds.map((jobId) => { + return { + job_id: jobId, + annotation: `Datafeed has missed ${ + jobId === 'test_job_01' ? 11 : 8 + } documents due to ingest latency, latest bucket with missing data is [2021-07-30T13:50:00.000Z]. Consider increasing query_delay`, + modified_time: 1627660295141, + end_timestamp: 1627653300000, + }; + }) + ); + }), + } as unknown) as jest.Mocked; + const logger = ({ warn: jest.fn(), info: jest.fn(), @@ -100,13 +126,19 @@ describe('JobsHealthService', () => { const jobHealthService: JobsHealthService = jobsHealthServiceProvider( mlClient, datafeedsService, + annotationService, logger ); - beforeEach(() => {}); + let dateNowSpy: jest.SpyInstance; + + beforeEach(() => { + dateNowSpy = jest.spyOn(Date, 'now').mockImplementation(() => MOCK_DATE_NOW); + }); afterEach(() => { jest.clearAllMocks(); + dateNowSpy.mockRestore(); }); test('returns empty results when no jobs provided', async () => { @@ -131,7 +163,11 @@ describe('JobsHealthService', () => { enabled: false, }, behindRealtime: null, - delayedData: null, + delayedData: { + enabled: false, + docsCount: null, + timeInterval: null, + }, errorMessages: null, mml: { enabled: false, @@ -149,6 +185,54 @@ describe('JobsHealthService', () => { expect(executionResult).toEqual([]); }); + test('takes into account delayed data params', async () => { + const executionResult = await jobHealthService.getTestsResults('testRule_04', { + testsConfig: { + delayedData: { + enabled: true, + docsCount: 10, + timeInterval: '4h', + }, + behindRealtime: { enabled: false, timeInterval: null }, + mml: { enabled: false }, + datafeed: { enabled: false }, + errorMessages: { enabled: false }, + }, + includeJobs: { + jobIds: [], + groupIds: ['test_group'], + }, + excludeJobs: { + jobIds: ['test_job_03'], + groupIds: [], + }, + }); + + expect(annotationService.getDelayedDataAnnotations).toHaveBeenCalledWith({ + jobIds: ['test_job_01', 'test_job_02'], + // 1487076708000 - 4h + earliestMs: 1487062308000, + }); + + expect(executionResult).toEqual([ + { + name: 'Data delay has occurred', + context: { + results: [ + { + job_id: 'test_job_01', + annotation: + 'Datafeed has missed 11 documents due to ingest latency, latest bucket with missing data is [2021-07-30T13:50:00.000Z]. Consider increasing query_delay', + end_timestamp: 1627653300000, + missed_docs_count: 11, + }, + ], + message: '1 job is suffering from delayed data.', + }, + }, + ]); + }); + test('returns results based on provided selection', async () => { const executionResult = await jobHealthService.getTestsResults('testRule_03', { testsConfig: null, @@ -169,11 +253,17 @@ describe('JobsHealthService', () => { 'test_job_01', 'test_job_02', ]); + expect(datafeedsService.getDatafeedByJobId).toHaveBeenCalledTimes(1); expect(mlClient.getJobStats).toHaveBeenCalledWith({ job_id: 'test_job_01,test_job_02' }); expect(mlClient.getDatafeedStats).toHaveBeenCalledWith({ datafeed_id: 'test_datafeed_01,test_datafeed_02', }); expect(mlClient.getJobStats).toHaveBeenCalledTimes(1); + expect(annotationService.getDelayedDataAnnotations).toHaveBeenCalledWith({ + jobIds: ['test_job_01', 'test_job_02'], + earliestMs: 1487069268000, + }); + expect(executionResult).toEqual([ { name: 'Datafeed is not started', @@ -203,6 +293,28 @@ describe('JobsHealthService', () => { '1 job reached the hard model memory limit. Assign the job more memory and restore from a snapshot from prior to reaching the hard limit.', }, }, + { + name: 'Data delay has occurred', + context: { + results: [ + { + job_id: 'test_job_01', + annotation: + 'Datafeed has missed 11 documents due to ingest latency, latest bucket with missing data is [2021-07-30T13:50:00.000Z]. Consider increasing query_delay', + end_timestamp: 1627653300000, + missed_docs_count: 11, + }, + { + job_id: 'test_job_02', + annotation: + 'Datafeed has missed 8 documents due to ingest latency, latest bucket with missing data is [2021-07-30T13:50:00.000Z]. Consider increasing query_delay', + end_timestamp: 1627653300000, + missed_docs_count: 8, + }, + ], + message: '2 jobs are suffering from delayed data.', + }, + }, ]); }); }); diff --git a/x-pack/plugins/ml/server/lib/alerts/jobs_health_service.ts b/x-pack/plugins/ml/server/lib/alerts/jobs_health_service.ts index 5c34a84abd971..52e17fed7a414 100644 --- a/x-pack/plugins/ml/server/lib/alerts/jobs_health_service.ts +++ b/x-pack/plugins/ml/server/lib/alerts/jobs_health_service.ts @@ -5,10 +5,11 @@ * 2.0. */ +import { memoize, keyBy } from 'lodash'; import { KibanaRequest, SavedObjectsClientContract } from 'kibana/server'; import { i18n } from '@kbn/i18n'; import { Logger } from 'kibana/server'; -import { MlJobStats } from '@elastic/elasticsearch/api/types'; +import { MlJob } from '@elastic/elasticsearch/api/types'; import { MlClient } from '../ml_client'; import { AnomalyDetectionJobsHealthRuleParams, @@ -20,10 +21,18 @@ import { DatafeedStats } from '../../../common/types/anomaly_detection_jobs'; import { GetGuards } from '../../shared_services/shared_services'; import { AnomalyDetectionJobsHealthAlertContext, + DelayedDataResponse, MmlTestResponse, NotStartedDatafeedResponse, } from './register_jobs_monitoring_rule_type'; -import { getResultJobsHealthRuleConfig } from '../../../common/util/alerts'; +import { + getResultJobsHealthRuleConfig, + resolveLookbackInterval, +} from '../../../common/util/alerts'; +import { AnnotationService } from '../../models/annotation_service/annotation'; +import { annotationServiceProvider } from '../../models/annotation_service'; +import { parseInterval } from '../../../common/util/parse_interval'; +import { isDefined } from '../../../common/types/guards'; interface TestResult { name: string; @@ -35,14 +44,18 @@ type TestsResults = TestResult[]; export function jobsHealthServiceProvider( mlClient: MlClient, datafeedsService: DatafeedsService, + annotationService: AnnotationService, logger: Logger ) { /** - * Extracts result list of job ids based on included and excluded selection of jobs and groups. + * Extracts result list of jobs based on included and excluded selection of jobs and groups. * @param includeJobs * @param excludeJobs */ - const getResultJobIds = async (includeJobs: JobSelection, excludeJobs?: JobSelection | null) => { + const getResultJobs = async ( + includeJobs: JobSelection, + excludeJobs?: JobSelection | null + ): Promise => { const jobAndGroupIds = [...(includeJobs.jobIds ?? []), ...(includeJobs.groupIds ?? [])]; const includeAllJobs = jobAndGroupIds.some((id) => id === ALL_JOBS_SELECTION); @@ -54,7 +67,7 @@ export function jobsHealthServiceProvider( }) ).body.jobs; - let resultJobIds = jobsResponse.map((v) => v.job_id); + let resultJobs = jobsResponse; if (excludeJobs && (!!excludeJobs.jobIds.length || !!excludeJobs?.groupIds.length)) { const excludedJobAndGroupIds = [ @@ -69,33 +82,41 @@ export function jobsHealthServiceProvider( const excludedJobsIds: Set = new Set(excludedJobsResponse.map((v) => v.job_id)); - resultJobIds = resultJobIds.filter((v) => !excludedJobsIds.has(v)); + resultJobs = resultJobs.filter((v) => !excludedJobsIds.has(v.job_id)); } - return resultJobIds; + return resultJobs; }; - const getJobStats: (jobIds: string[]) => Promise = (() => { - const cachedStats = new Map(); + /** + * Resolves the timestamp for delayed data check. + * + * @param timeInterval - Custom time interval provided by the user. + * @param defaultLookbackInterval - Interval derived from the jobs and datefeeds configs. + */ + const getDelayedDataLookbackTimestamp = ( + timeInterval: string | null, + defaultLookbackInterval: string + ): number => { + const currentTimestamp = Date.now(); - return async (jobIds: string[]) => { - if (jobIds.every((j) => cachedStats.has(j))) { - logger.debug(`Return jobs stats from cache`); - return Array.from(cachedStats.values()); - } + const defaultLookbackTimestamp = + currentTimestamp - parseInterval(defaultLookbackInterval)!.asMilliseconds(); - const { - body: { jobs: jobsStats }, - } = await mlClient.getJobStats({ job_id: jobIds.join(',') }); + const customIntervalOffsetTimestamp = timeInterval + ? currentTimestamp - parseInterval(timeInterval)!.asMilliseconds() + : null; - // update cache - jobsStats.forEach((v) => { - cachedStats.set(v.job_id, v); - }); + return Math.min(...[defaultLookbackTimestamp, customIntervalOffsetTimestamp].filter(isDefined)); + }; - return jobsStats; - }; - })(); + const getJobIds = memoize((jobs: MlJob[]) => jobs.map((j) => j.job_id)); + + const getDatafeeds = memoize(datafeedsService.getDatafeedByJobId); + + const getJobStats = memoize( + async (jobIds: string[]) => (await mlClient.getJobStats({ job_id: jobIds.join(',') })).body.jobs + ); return { /** @@ -103,7 +124,7 @@ export function jobsHealthServiceProvider( * @param jobIds */ async getNotStartedDatafeeds(jobIds: string[]): Promise { - const datafeeds = await datafeedsService.getDatafeedByJobId(jobIds); + const datafeeds = await getDatafeeds(jobIds); if (datafeeds) { const jobsStats = await getJobStats(jobIds); @@ -154,6 +175,67 @@ export function jobsHealthServiceProvider( }; }); }, + /** + * Returns annotations about delayed data. + * + * @param jobs + * @param timeInterval - Custom time interval provided by the user. + * @param docsCount - The threshold for a number of missing documents to alert upon. + */ + async getDelayedDataReport( + jobs: MlJob[], + timeInterval: string | null, + docsCount: number | null + ): Promise { + const jobIds = getJobIds(jobs); + const datafeeds = await getDatafeeds(jobIds); + + const datafeedsMap = keyBy(datafeeds, 'job_id'); + + // We shouldn't check jobs that don't have associated datafeeds + const resultJobs = jobs.filter((j) => datafeedsMap[j.job_id] !== undefined); + const resultJobIds = getJobIds(resultJobs); + const jobsMap = keyBy(resultJobs, 'job_id'); + + const defaultLookbackInterval = resolveLookbackInterval(resultJobs, datafeeds!); + const earliestMs = getDelayedDataLookbackTimestamp(timeInterval, defaultLookbackInterval); + + const annotations: DelayedDataResponse[] = ( + await annotationService.getDelayedDataAnnotations({ + jobIds: resultJobIds, + earliestMs, + }) + ) + .map((v) => { + const match = v.annotation.match(/Datafeed has missed (\d+)\s/); + const missedDocsCount = match ? parseInt(match[1], 10) : 0; + return { + annotation: v.annotation, + // end_timestamp is always defined for delayed_data annotation + end_timestamp: v.end_timestamp!, + missed_docs_count: missedDocsCount, + job_id: v.job_id, + }; + }) + .filter((v) => { + // As we retrieved annotations based on the longest bucket span and query delay, + // we need to check end_timestamp against appropriate job configuration. + + const job = jobsMap[v.job_id]; + const datafeed = datafeedsMap[v.job_id]; + + const isDocCountExceededThreshold = docsCount ? v.missed_docs_count >= docsCount : true; + + const jobLookbackInterval = resolveLookbackInterval([job], [datafeed]); + + const isEndTimestampWithinRange = + v.end_timestamp > getDelayedDataLookbackTimestamp(timeInterval, jobLookbackInterval); + + return isDocCountExceededThreshold && isEndTimestampWithinRange; + }); + + return annotations; + }, /** * Retrieves report grouped by test. */ @@ -165,7 +247,8 @@ export function jobsHealthServiceProvider( const results: TestsResults = []; - const jobIds = await getResultJobIds(includeJobs, excludeJobs); + const jobs = await getResultJobs(includeJobs, excludeJobs); + const jobIds = getJobIds(jobs); if (jobIds.length === 0) { logger.warn(`Rule "${ruleInstanceName}" does not have associated jobs.`); @@ -178,7 +261,7 @@ export function jobsHealthServiceProvider( const response = await this.getNotStartedDatafeeds(jobIds); if (response && response.length > 0) { results.push({ - name: HEALTH_CHECK_NAMES.datafeed, + name: HEALTH_CHECK_NAMES.datafeed.name, context: { results: response, message: i18n.translate( @@ -200,7 +283,7 @@ export function jobsHealthServiceProvider( }, 0); results.push({ - name: HEALTH_CHECK_NAMES.mml, + name: HEALTH_CHECK_NAMES.mml.name, context: { results: response, message: @@ -226,6 +309,31 @@ export function jobsHealthServiceProvider( } } + if (config.delayedData.enabled) { + const response = await this.getDelayedDataReport( + jobs, + config.delayedData.timeInterval, + config.delayedData.docsCount + ); + + if (response.length > 0) { + results.push({ + name: HEALTH_CHECK_NAMES.delayedData.name, + context: { + results: response, + message: i18n.translate( + 'xpack.ml.alertTypes.jobsHealthAlertingRule.delayedDataMessage', + { + defaultMessage: + '{jobsCount, plural, one {# job is} other {# jobs are}} suffering from delayed data.', + values: { jobsCount: response.length }, + } + ), + }, + }); + } + } + return results; }, }; @@ -251,6 +359,7 @@ export function getJobsHealthServiceProvider(getGuards: GetGuards) { jobsHealthServiceProvider( mlClient, datafeedsProvider(scopedClient, mlClient), + annotationServiceProvider(scopedClient), logger ).getTestsResults(...args) ); diff --git a/x-pack/plugins/ml/server/lib/alerts/register_jobs_monitoring_rule_type.ts b/x-pack/plugins/ml/server/lib/alerts/register_jobs_monitoring_rule_type.ts index 0e4270fb94e3b..063d8ad5a8980 100644 --- a/x-pack/plugins/ml/server/lib/alerts/register_jobs_monitoring_rule_type.ts +++ b/x-pack/plugins/ml/server/lib/alerts/register_jobs_monitoring_rule_type.ts @@ -42,7 +42,20 @@ export interface NotStartedDatafeedResponse { job_state: MlJobState; } -export type AnomalyDetectionJobHealthResult = MmlTestResponse | NotStartedDatafeedResponse; +export interface DelayedDataResponse { + job_id: string; + /** Annotation string */ + annotation: string; + /** Number of missed documents */ + missed_docs_count: number; + /** Timestamp of the latest finalized bucket with missing docs */ + end_timestamp: number; +} + +export type AnomalyDetectionJobHealthResult = + | MmlTestResponse + | NotStartedDatafeedResponse + | DelayedDataResponse; export type AnomalyDetectionJobsHealthAlertContext = { results: AnomalyDetectionJobHealthResult[]; @@ -107,7 +120,7 @@ export function registerJobsMonitoringRuleType({ producer: PLUGIN_ID, minimumLicenseRequired: MINIMUM_FULL_LICENSE, isExportable: true, - async executor({ services, params, alertId, state, previousStartedAt, startedAt, name }) { + async executor({ services, params, alertId, state, previousStartedAt, startedAt, name, rule }) { const fakeRequest = {} as KibanaRequest; const { getTestsResults } = mlServicesProviders.jobsHealthServiceProvider( services.savedObjectsClient, diff --git a/x-pack/plugins/ml/server/models/annotation_service/annotation.ts b/x-pack/plugins/ml/server/models/annotation_service/annotation.ts index c5f3c152ddc7a..9552735a57d35 100644 --- a/x-pack/plugins/ml/server/models/annotation_service/annotation.ts +++ b/x-pack/plugins/ml/server/models/annotation_service/annotation.ts @@ -9,6 +9,7 @@ import Boom from '@hapi/boom'; import { each, get } from 'lodash'; import { IScopedClusterClient } from 'kibana/server'; +import { estypes } from '@elastic/elasticsearch'; import { ANNOTATION_EVENT_USER, ANNOTATION_TYPE } from '../../../common/constants/annotations'; import { PARTITION_FIELDS } from '../../../common/constants/anomalies'; import { @@ -25,6 +26,7 @@ import { getAnnotationFieldValue, EsAggregationResult, } from '../../../common/types/annotations'; +import { JobId } from '../../../common/types/anomaly_detection_jobs'; // TODO All of the following interface/type definitions should // eventually be replaced by the proper upstream definitions @@ -46,6 +48,7 @@ export interface IndexAnnotationArgs { fields?: FieldToBucket[]; detectorIndex?: number; entities?: any[]; + event?: Annotation['event']; } export interface AggTerm { @@ -60,7 +63,7 @@ export interface GetParams { export interface GetResponse { success: true; - annotations: Record; + annotations: Record; aggregations: EsAggregationResult; } @@ -116,7 +119,8 @@ export function annotationProvider({ asInternalUser }: IScopedClusterClient) { fields, detectorIndex, entities, - }: IndexAnnotationArgs) { + event, + }: IndexAnnotationArgs): Promise { const obj: GetResponse = { success: true, annotations: {}, @@ -190,6 +194,12 @@ export function annotationProvider({ asInternalUser }: IScopedClusterClient) { exists: { field: 'annotation' }, }); + if (event) { + boolCriteria.push({ + term: { event }, + }); + } + if (jobIds && jobIds.length > 0 && !(jobIds.length === 1 && jobIds[0] === '*')) { let jobIdFilterStr = ''; each(jobIds, (jobId, i: number) => { @@ -332,6 +342,68 @@ export function annotationProvider({ asInternalUser }: IScopedClusterClient) { } } + /** + * Fetches the latest delayed data annotation per job. + * @param jobIds + * @param earliestMs - Timestamp for the end_timestamp range query. + */ + async function getDelayedDataAnnotations({ + jobIds, + earliestMs, + }: { + jobIds: string[]; + earliestMs?: number; + }): Promise { + const params: estypes.SearchRequest = { + index: ML_ANNOTATIONS_INDEX_ALIAS_READ, + size: 0, + body: { + query: { + bool: { + filter: [ + ...(earliestMs ? [{ range: { end_timestamp: { gte: earliestMs } } }] : []), + { + term: { event: { value: 'delayed_data' } }, + }, + { terms: { job_id: jobIds } }, + ], + }, + }, + aggs: { + by_job: { + terms: { field: 'job_id', size: jobIds.length }, + aggs: { + latest_delayed: { + top_hits: { + size: 1, + sort: [ + { + end_timestamp: { + order: 'desc', + }, + }, + ], + }, + }, + }, + }, + }, + }, + }; + + const { body } = await asInternalUser.search(params); + + const annotations = (body.aggregations!.by_job as estypes.AggregationsTermsAggregate<{ + key: string; + doc_count: number; + latest_delayed: Pick, 'hits'>; + }>).buckets.map((bucket) => { + return bucket.latest_delayed.hits.hits[0]._source!; + }); + + return annotations; + } + async function deleteAnnotation(id: string) { const params: DeleteParams = { index: ML_ANNOTATIONS_INDEX_ALIAS_WRITE, @@ -347,5 +419,8 @@ export function annotationProvider({ asInternalUser }: IScopedClusterClient) { getAnnotations, indexAnnotation, deleteAnnotation, + getDelayedDataAnnotations, }; } + +export type AnnotationService = ReturnType; diff --git a/x-pack/plugins/ml/server/routes/schemas/alerting_schema.ts b/x-pack/plugins/ml/server/routes/schemas/alerting_schema.ts index 4e0f9a9aa7c92..ed58d8d5c9313 100644 --- a/x-pack/plugins/ml/server/routes/schemas/alerting_schema.ts +++ b/x-pack/plugins/ml/server/routes/schemas/alerting_schema.ts @@ -79,7 +79,7 @@ export const anomalyDetectionJobsHealthRuleParams = schema.object({ delayedData: schema.nullable( schema.object({ enabled: schema.boolean({ defaultValue: true }), - docsCount: schema.nullable(schema.number()), + docsCount: schema.nullable(schema.number({ min: 1 })), timeInterval: schema.nullable(schema.string()), }) ), diff --git a/x-pack/plugins/ml/server/routes/schemas/annotations_schema.ts b/x-pack/plugins/ml/server/routes/schemas/annotations_schema.ts index 96edbeb0fce0e..73e876f0a9122 100644 --- a/x-pack/plugins/ml/server/routes/schemas/annotations_schema.ts +++ b/x-pack/plugins/ml/server/routes/schemas/annotations_schema.ts @@ -21,7 +21,15 @@ export const indexAnnotationSchema = schema.object({ create_username: schema.maybe(schema.string()), modified_time: schema.maybe(schema.number()), modified_username: schema.maybe(schema.string()), - event: schema.maybe(schema.string()), + event: schema.maybe( + schema.oneOf([ + schema.literal('user'), + schema.literal('delayed_data'), + schema.literal('model_snapshot_stored'), + schema.literal('model_change'), + schema.literal('categorization_status_change'), + ]) + ), detector_index: schema.maybe(schema.number()), partition_field_name: schema.maybe(schema.string()), partition_field_value: schema.maybe(schema.string()), diff --git a/x-pack/test/functional/apps/ml/anomaly_detection/annotations.ts b/x-pack/test/functional/apps/ml/anomaly_detection/annotations.ts index a6ea27be21cc8..d2f9acf35d632 100644 --- a/x-pack/test/functional/apps/ml/anomaly_detection/annotations.ts +++ b/x-pack/test/functional/apps/ml/anomaly_detection/annotations.ts @@ -116,7 +116,7 @@ export default function ({ getService }: FtrProviderContext) { }; before(async () => { - await ml.api.indexAnnotation(annotation, annotationId); + await ml.api.indexAnnotation(annotation as Partial, annotationId); }); it('displays the original annotation correctly', async () => { @@ -189,7 +189,7 @@ export default function ({ getService }: FtrProviderContext) { const annotationId = `delete-annotation-id-${Date.now()}`; before(async () => { - await ml.api.indexAnnotation(annotation, annotationId); + await ml.api.indexAnnotation(annotation as Partial, annotationId); }); it('displays the original annotation', async () => {