Skip to content

Commit

Permalink
[ML] Improvements to job saved object sync (elastic#101899)
Browse files Browse the repository at this point in the history
* [ML] Improvements to job saved object sync

* refactor

* adding testsadding testsadding testsadding testsadding testsadding testsadding testsadding tests

* updating test label

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
2 people authored and John Dorlus committed Jun 15, 2021
1 parent dd61f2a commit 0e3f05d
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 17 deletions.
6 changes: 4 additions & 2 deletions x-pack/plugins/ml/server/saved_objects/checks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ interface JobSavedObjectStatus {
};
}

interface JobStatus {
export interface JobStatus {
jobId: string;
datafeedId?: string | null;
checks: {
Expand Down Expand Up @@ -68,7 +68,9 @@ export function checksFactory(

if (type === 'anomaly-detector') {
jobExists = adJobs.jobs.some((j) => j.job_id === jobId);
datafeedExists = datafeeds.datafeeds.some((d) => d.job_id === jobId);
datafeedExists = datafeeds.datafeeds.some(
(d) => d.datafeed_id === datafeedId && d.job_id === jobId
);
} else {
jobExists = dfaJobs.data_frame_analytics.some((j) => j.id === jobId);
}
Expand Down
17 changes: 16 additions & 1 deletion x-pack/plugins/ml/server/saved_objects/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
InitializeSavedObjectResponse,
} from '../../common/types/saved_objects';
import { checksFactory } from './checks';
import type { JobStatus } from './checks';
import { getSavedObjectClientError } from './util';

import { Datafeed } from '../../common/types/anomaly_detection_jobs';
Expand Down Expand Up @@ -45,6 +46,12 @@ export function syncSavedObjectsFactory(
const tasks: Array<() => Promise<void>> = [];

const status = await checkStatus();

const adJobsById = status.jobs['anomaly-detector'].reduce((acc, j) => {
acc[j.jobId] = j;
return acc;
}, {} as Record<string, JobStatus>);

for (const job of status.jobs['anomaly-detector']) {
if (job.checks.savedObjectExits === false) {
if (simulate === true) {
Expand Down Expand Up @@ -141,8 +148,16 @@ export function syncSavedObjectsFactory(
}

for (const job of status.savedObjects['anomaly-detector']) {
if (job.checks.datafeedExists === true && job.datafeedId === null) {
if (
(job.checks.datafeedExists === true && job.datafeedId === null) ||
(job.checks.datafeedExists === false &&
job.datafeedId === null &&
job.checks.datafeedExists === false &&
adJobsById[job.jobId] &&
adJobsById[job.jobId].datafeedId !== job.datafeedId)
) {
// add datafeed id for jobs where the datafeed exists but the id is missing from the saved object
// or if the datafeed id in the saved object is not the same as the one attached to the job in es
if (simulate === true) {
results.datafeedsAdded[job.jobId] = { success: true };
} else {
Expand Down
93 changes: 79 additions & 14 deletions x-pack/test/api_integration/apis/ml/saved_objects/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import expect from '@kbn/expect';
import { cloneDeep } from 'lodash';
import { FtrProviderContext } from '../../../ftr_provider_context';
import { USER } from '../../../../functional/services/ml/security_common';
import { COMMON_REQUEST_HEADERS } from '../../../../functional/services/ml/common_api';
Expand All @@ -23,7 +24,7 @@ export default ({ getService }: FtrProviderContext) => {

async function runRequest(user: USER, expectedStatusCode: number) {
const { body } = await supertest
.get(`/s/space1/api/ml/saved_objects/sync`)
.get(`/s/${idSpace1}/api/ml/saved_objects/sync`)
.auth(user, ml.securityCommon.getPasswordForUser(user))
.set(COMMON_REQUEST_HEADERS)
.expect(expectedStatusCode);
Expand All @@ -32,9 +33,19 @@ export default ({ getService }: FtrProviderContext) => {
}

describe('GET saved_objects/sync', () => {
before(async () => {
beforeEach(async () => {
await spacesService.create({ id: idSpace1, name: 'space_one', disabledFeatures: [] });
await ml.testResources.setKibanaTimeZoneToUTC();
});

afterEach(async () => {
await spacesService.delete(idSpace1);
await ml.api.cleanMlIndices();
await ml.testResources.cleanMLSavedObjects();
});

it('should sync datafeeds and saved objects', async () => {
// prepare test data
await ml.api.createAnomalyDetectionJob(
ml.commonConfig.getADFqSingleMetricJobConfig(adJobId1),
idSpace1
Expand All @@ -51,18 +62,6 @@ export default ({ getService }: FtrProviderContext) => {
ml.commonConfig.getADFqSingleMetricJobConfig(adJobIdES)
);

await ml.testResources.setKibanaTimeZoneToUTC();
});

after(async () => {
await spacesService.delete(idSpace1);
await ml.api.cleanMlIndices();
await ml.testResources.cleanMLSavedObjects();
});

it('should sync datafeeds and saved objects', async () => {
// prepare test data

// datafeed should be added with the request
const datafeedConfig2 = ml.commonConfig.getADFqDatafeedConfig(adJobId2);
await ml.api.createDatafeedES(datafeedConfig2);
Expand All @@ -89,7 +88,73 @@ export default ({ getService }: FtrProviderContext) => {
});
});

it('should sync datafeeds after recreation in ES with different name', async () => {
// prepare test data
const jobConfig1 = ml.commonConfig.getADFqSingleMetricJobConfig(adJobId1);
await ml.api.createAnomalyDetectionJob(jobConfig1, idSpace1);

// datafeed should be added with the request
const datafeedConfig1 = ml.commonConfig.getADFqDatafeedConfig(adJobId1);
await ml.api.createDatafeedES(datafeedConfig1);

// run the sync request and verify the response
const body = await runRequest(USER.ML_POWERUSER_ALL_SPACES, 200);

// expect datafeed to be added
expect(body).to.eql({
datafeedsAdded: { [adJobId1]: { success: true } },
datafeedsRemoved: {},
savedObjectsCreated: {},
savedObjectsDeleted: {},
});

// delete the datafeed but do not sync
await ml.api.deleteDatafeedES(datafeedConfig1.datafeed_id);

// create a new datafeed with a different id
const datafeedConfig2 = cloneDeep(datafeedConfig1);
datafeedConfig2.datafeed_id = `different_${datafeedConfig2.datafeed_id}`;
await ml.api.createDatafeedES(datafeedConfig2);

const body2 = await runRequest(USER.ML_POWERUSER_ALL_SPACES, 200);

// previous datafeed should be removed on first sync
expect(body2).to.eql({
datafeedsAdded: {},
datafeedsRemoved: { [adJobId1]: { success: true } },
savedObjectsCreated: {},
savedObjectsDeleted: {},
});

const body3 = await runRequest(USER.ML_POWERUSER_ALL_SPACES, 200);

// new datafeed will be added on second sync
expect(body3).to.eql({
datafeedsAdded: { [adJobId1]: { success: true } },
datafeedsRemoved: {},
savedObjectsCreated: {},
savedObjectsDeleted: {},
});
});

it('should not sync anything if all objects are already synced', async () => {
await ml.api.createAnomalyDetectionJob(
ml.commonConfig.getADFqSingleMetricJobConfig(adJobId1),
idSpace1
);
await ml.api.createAnomalyDetectionJob(
ml.commonConfig.getADFqSingleMetricJobConfig(adJobId2),
idSpace1
);
await ml.api.createAnomalyDetectionJob(
ml.commonConfig.getADFqSingleMetricJobConfig(adJobId3),
idSpace1
);
await ml.api.createAnomalyDetectionJobES(
ml.commonConfig.getADFqSingleMetricJobConfig(adJobIdES)
);

await runRequest(USER.ML_POWERUSER_ALL_SPACES, 200);
const body = await runRequest(USER.ML_POWERUSER_ALL_SPACES, 200);

expect(body).to.eql({
Expand Down

0 comments on commit 0e3f05d

Please sign in to comment.