Skip to content

Commit

Permalink
[Logs UI] Interpret finished analysis jobs as healthy (elastic#45268)
Browse files Browse the repository at this point in the history
This changes the job health check of the log analysis page to consider finished jobs as healthy. This situation is relevant when the job is non-continuous, i.e. created with a definite end date.

At the same time it ensures that datafeeds that failed to be started due to empty indices lead to the jobs being considered unhealthy.

fixes elastic#45180
  • Loading branch information
weltenwort committed Sep 11, 2019
1 parent 7c6a838 commit 5e6d84b
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,33 @@ export const fetchJobStatusRequestPayloadRT = rt.type({

export type FetchJobStatusRequestPayload = rt.TypeOf<typeof fetchJobStatusRequestPayloadRT>;

// TODO: Get this to align with the payload - something is tripping it up somewhere
// export const fetchJobStatusResponsePayloadRT = rt.array(rt.type({
// datafeedId: rt.string,
// datafeedIndices: rt.array(rt.string),
// datafeedState: rt.string,
// description: rt.string,
// earliestTimestampMs: rt.number,
// groups: rt.array(rt.string),
// hasDatafeed: rt.boolean,
// id: rt.string,
// isSingleMetricViewerJob: rt.boolean,
// jobState: rt.string,
// latestResultsTimestampMs: rt.number,
// latestTimestampMs: rt.number,
// memory_status: rt.string,
// nodeName: rt.union([rt.string, rt.undefined]),
// processed_record_count: rt.number,
// fullJob: rt.any,
// auditMessage: rt.any,
// deleting: rt.union([rt.boolean, rt.undefined]),
// }));

export const fetchJobStatusResponsePayloadRT = rt.any;
const datafeedStateRT = rt.keyof({
started: null,
stopped: null,
});

const jobStateRT = rt.keyof({
closed: null,
closing: null,
failed: null,
opened: null,
opening: null,
});

export const jobSummaryRT = rt.intersection([
rt.type({
id: rt.string,
jobState: jobStateRT,
}),
rt.partial({
datafeedIndices: rt.array(rt.string),
datafeedState: datafeedStateRT,
fullJob: rt.partial({
finished_time: rt.number,
}),
}),
]);

export const fetchJobStatusResponsePayloadRT = rt.array(jobSummaryRT);

export type FetchJobStatusResponsePayload = rt.TypeOf<typeof fetchJobStatusResponsePayloadRT>;
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,23 @@
*/

import createContainer from 'constate-latest';
import { useMemo, useEffect, useState } from 'react';
import { bucketSpan, getJobId } from '../../../../common/log_analysis';
import { useEffect, useMemo, useState } from 'react';

import { bucketSpan, getDatafeedId, getJobId, JobType } from '../../../../common/log_analysis';
import { useTrackedPromise } from '../../../utils/use_tracked_promise';
import { callJobsSummaryAPI, FetchJobStatusResponsePayload } from './api/ml_get_jobs_summary_api';
import { callSetupMlModuleAPI, SetupMlModuleResponsePayload } from './api/ml_setup_module_api';
import { callJobsSummaryAPI } from './api/ml_get_jobs_summary_api';

// combines and abstracts job and datafeed status
type JobStatus =
| 'unknown'
| 'missing'
| 'inconsistent'
| 'created'
| 'initializing'
| 'stopped'
| 'started'
| 'opening'
| 'opened'
| 'finished'
| 'failed';

interface AllJobStatuses {
[key: string]: JobStatus;
}

const getInitialJobStatuses = (): AllJobStatuses => {
return {
logEntryRate: 'unknown',
};
};

export const useLogAnalysisJobs = ({
indexPattern,
sourceId,
Expand All @@ -43,14 +33,19 @@ export const useLogAnalysisJobs = ({
spaceId: string;
timeField: string;
}) => {
const [jobStatus, setJobStatus] = useState<AllJobStatuses>(getInitialJobStatuses());
const [jobStatus, setJobStatus] = useState<Record<JobType, JobStatus>>({
'log-entry-rate': 'unknown',
});
const [hasCompletedSetup, setHasCompletedSetup] = useState<boolean>(false);

const [setupMlModuleRequest, setupMlModule] = useTrackedPromise(
{
cancelPreviousOn: 'resolution',
createPromise: async (start, end) => {
setJobStatus(getInitialJobStatuses());
setJobStatus(currentJobStatus => ({
...currentJobStatus,
'log-entry-rate': 'initializing',
}));
return await callSetupMlModuleAPI(
start,
end,
Expand All @@ -62,26 +57,25 @@ export const useLogAnalysisJobs = ({
);
},
onResolve: ({ datafeeds, jobs }: SetupMlModuleResponsePayload) => {
const hasSuccessfullyCreatedJobs = jobs.every(job => job.success);
const hasSuccessfullyStartedDatafeeds = datafeeds.every(
datafeed => datafeed.success && datafeed.started
);
const hasAnyErrors =
jobs.some(job => !!job.error) || datafeeds.some(datafeed => !!datafeed.error);

setJobStatus(currentJobStatus => ({
...currentJobStatus,
logEntryRate: hasAnyErrors
? 'failed'
: hasSuccessfullyCreatedJobs
? hasSuccessfullyStartedDatafeeds
'log-entry-rate':
hasSuccessfullyCreatedJob(getJobId(spaceId, sourceId, 'log-entry-rate'))(jobs) &&
hasSuccessfullyStartedDatafeed(getDatafeedId(spaceId, sourceId, 'log-entry-rate'))(
datafeeds
)
? 'started'
: 'failed'
: 'failed',
: 'failed',
}));

setHasCompletedSetup(true);
},
onReject: () => {
setJobStatus(currentJobStatus => ({
...currentJobStatus,
'log-entry-rate': 'failed',
}));
},
},
[indexPattern, spaceId, sourceId]
);
Expand All @@ -90,20 +84,19 @@ export const useLogAnalysisJobs = ({
{
cancelPreviousOn: 'resolution',
createPromise: async () => {
return callJobsSummaryAPI(spaceId, sourceId);
return await callJobsSummaryAPI(spaceId, sourceId);
},
onResolve: response => {
if (response && response.length) {
const logEntryRate = response.find(
(job: any) => job.id === getJobId(spaceId, sourceId, 'log-entry-rate')
);
setJobStatus({
logEntryRate: logEntryRate ? logEntryRate.jobState : 'unknown',
});
}
setJobStatus(currentJobStatus => ({
...currentJobStatus,
'log-entry-rate': getJobStatus(getJobId(spaceId, sourceId, 'log-entry-rate'))(response),
}));
},
onReject: error => {
// TODO: Handle errors
onReject: err => {
setJobStatus(currentJobStatus => ({
...currentJobStatus,
'log-entry-rate': 'unknown',
}));
},
},
[indexPattern, spaceId, sourceId]
Expand All @@ -114,11 +107,7 @@ export const useLogAnalysisJobs = ({
}, []);

const isSetupRequired = useMemo(() => {
const jobStates = Object.values(jobStatus);
return (
jobStates.filter(state => ['opened', 'opening', 'created', 'started'].includes(state))
.length < jobStates.length
);
return !Object.values(jobStatus).every(state => ['started', 'finished'].includes(state));
}, [jobStatus]);

const isLoadingSetupStatus = useMemo(() => fetchJobStatusRequest.state === 'pending', [
Expand Down Expand Up @@ -147,3 +136,52 @@ export const useLogAnalysisJobs = ({
};

export const LogAnalysisJobs = createContainer(useLogAnalysisJobs);

const hasSuccessfullyCreatedJob = (jobId: string) => (
jobSetupResponses: SetupMlModuleResponsePayload['jobs']
) =>
jobSetupResponses.filter(
jobSetupResponse =>
jobSetupResponse.id === jobId && jobSetupResponse.success && !jobSetupResponse.error
).length > 0;

const hasSuccessfullyStartedDatafeed = (datafeedId: string) => (
datafeedSetupResponses: SetupMlModuleResponsePayload['datafeeds']
) =>
datafeedSetupResponses.filter(
datafeedSetupResponse =>
datafeedSetupResponse.id === datafeedId &&
datafeedSetupResponse.success &&
datafeedSetupResponse.started &&
!datafeedSetupResponse.error
).length > 0;

const getJobStatus = (jobId: string) => (jobSummaries: FetchJobStatusResponsePayload): JobStatus =>
jobSummaries
.filter(jobSummary => jobSummary.id === jobId)
.map(
(jobSummary): JobStatus => {
if (jobSummary.jobState === 'failed') {
return 'failed';
} else if (
jobSummary.jobState === 'closed' &&
jobSummary.datafeedState === 'stopped' &&
jobSummary.fullJob &&
jobSummary.fullJob.finished_time != null
) {
return 'finished';
} else if (
jobSummary.jobState === 'closed' ||
jobSummary.jobState === 'closing' ||
jobSummary.datafeedState === 'stopped'
) {
return 'stopped';
} else if (jobSummary.jobState === 'opening') {
return 'initializing';
} else if (jobSummary.jobState === 'opened' && jobSummary.datafeedState === 'started') {
return 'started';
}

return 'unknown';
}
)[0] || 'missing';

0 comments on commit 5e6d84b

Please sign in to comment.