Skip to content

Commit

Permalink
Automatically cleanup SO indices when SO documents are found in data.…
Browse files Browse the repository at this point in the history
…json
  • Loading branch information
gsoldevila committed Jun 13, 2023
1 parent 1d22dcc commit 6becf9d
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
* Side Public License, v 1.
*/

import type { deleteSavedObjectIndices } from './kibana_index';
import type { cleanSavedObjectIndices, deleteSavedObjectIndices } from './kibana_index';

export const mockdeleteSavedObjectIndices = jest.fn() as jest.MockedFunction<
export const mockCleanSavedObjectIndices = jest.fn() as jest.MockedFunction<
typeof cleanSavedObjectIndices
>;

export const mockDeleteSavedObjectIndices = jest.fn() as jest.MockedFunction<
typeof deleteSavedObjectIndices
>;

jest.mock('./kibana_index', () => ({
deleteSavedObjectIndices: mockdeleteSavedObjectIndices,
cleanSavedObjectIndices: mockCleanSavedObjectIndices,
deleteSavedObjectIndices: mockDeleteSavedObjectIndices,
}));
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
* Side Public License, v 1.
*/

import { mockdeleteSavedObjectIndices } from './create_index_stream.test.mock';
import {
mockCleanSavedObjectIndices,
mockDeleteSavedObjectIndices,
} from './create_index_stream.test.mock';

import sinon from 'sinon';
import Chance from 'chance';
Expand All @@ -28,7 +31,8 @@ const chance = new Chance();
const log = createStubLogger();

beforeEach(() => {
mockdeleteSavedObjectIndices.mockClear();
mockCleanSavedObjectIndices.mockClear();
mockDeleteSavedObjectIndices.mockClear();
});

describe('esArchiver: createCreateIndexStream()', () => {
Expand Down Expand Up @@ -199,26 +203,26 @@ describe('esArchiver: createCreateIndexStream()', () => {
it('does not delete Kibana indices for indexes that do not start with .kibana', async () => {
await doTest('.foo');

expect(mockdeleteSavedObjectIndices).not.toHaveBeenCalled();
expect(mockDeleteSavedObjectIndices).not.toHaveBeenCalled();
});

it('deletes Kibana indices at most once for indices that start with .kibana', async () => {
// If we are loading the main Kibana index, we should delete all Kibana indices for backwards compatibility reasons.
await doTest('.kibana_7.16.0_001', '.kibana_task_manager_7.16.0_001');

expect(mockdeleteSavedObjectIndices).toHaveBeenCalledTimes(1);
expect(mockdeleteSavedObjectIndices).toHaveBeenCalledWith(
expect.not.objectContaining({ onlyTaskManager: true })
expect(mockDeleteSavedObjectIndices).toHaveBeenCalledTimes(1);
expect(mockDeleteSavedObjectIndices).toHaveBeenCalledWith(
expect.not.objectContaining({ index: '.kibana_task_manager_7.16.0_001' })
);
});

it('deletes Kibana task manager index at most once, using onlyTaskManager: true', async () => {
it('deletes Kibana task manager index at most once', async () => {
// If we are loading the Kibana task manager index, we should only delete that index, not any other Kibana indices.
await doTest('.kibana_task_manager_7.16.0_001', '.kibana_task_manager_7.16.0_002');

expect(mockdeleteSavedObjectIndices).toHaveBeenCalledTimes(1);
expect(mockdeleteSavedObjectIndices).toHaveBeenCalledWith(
expect.objectContaining({ onlyTaskManager: true })
expect(mockDeleteSavedObjectIndices).toHaveBeenCalledTimes(1);
expect(mockDeleteSavedObjectIndices).toHaveBeenCalledWith(
expect.objectContaining({ index: '.kibana_task_manager_7.16.0_001' })
);
});

Expand All @@ -227,18 +231,63 @@ describe('esArchiver: createCreateIndexStream()', () => {
// So, we first delete only the Kibana task manager indices, then we wind up deleting all Kibana indices.
await doTest('.kibana_task_manager_7.16.0_001', '.kibana_7.16.0_001');

expect(mockdeleteSavedObjectIndices).toHaveBeenCalledTimes(2);
expect(mockdeleteSavedObjectIndices).toHaveBeenNthCalledWith(
expect(mockDeleteSavedObjectIndices).toHaveBeenCalledTimes(2);
expect(mockDeleteSavedObjectIndices).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ onlyTaskManager: true })
expect.objectContaining({ index: '.kibana_task_manager_7.16.0_001' })
);
expect(mockdeleteSavedObjectIndices).toHaveBeenNthCalledWith(
expect(mockDeleteSavedObjectIndices).toHaveBeenNthCalledWith(
2,
expect.not.objectContaining({ onlyTaskManager: true })
expect.not.objectContaining({ index: expect.any(String) })
);
});
});

describe('saved object cleanup', () => {
describe('when saved object documents are found', () => {
it('cleans the corresponding saved object indices', async () => {
const client = createStubClient();
const stats = createStubStats();
await createPromiseFromStreams([
createListStream([
createStubDocRecord('.kibana_task_manager', 1),
createStubDocRecord('.kibana_alerting_cases', 2),
createStubDocRecord('.kibana', 3),
]),
createCreateIndexStream({ client, stats, log }),
]);

expect(mockCleanSavedObjectIndices).toHaveBeenCalledTimes(2);

expect(mockCleanSavedObjectIndices).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ index: '.kibana_task_manager' })
);
expect(mockCleanSavedObjectIndices).toHaveBeenNthCalledWith(
2,
expect.not.objectContaining({ index: expect.any(String) })
);
});
});

describe('when saved object documents are not found', () => {
it('does not clean any indices', async () => {
const client = createStubClient();
const stats = createStubStats();
await createPromiseFromStreams([
createListStream([
createStubDocRecord('.foo', 1),
createStubDocRecord('.bar', 2),
createStubDocRecord('.baz', 3),
]),
createCreateIndexStream({ client, stats, log }),
]);

expect(mockCleanSavedObjectIndices).not.toHaveBeenCalled();
});
});
});

describe('docsOnly = true', () => {
it('passes through "hit" records without attempting to create indices', async () => {
const client = createStubClient();
Expand Down
43 changes: 36 additions & 7 deletions packages/kbn-es-archiver/src/lib/indices/create_index_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
TASK_MANAGER_SAVED_OBJECT_INDEX,
} from '@kbn/core-saved-objects-server';
import { Stats } from '../stats';
import { deleteSavedObjectIndices } from './kibana_index';
import { cleanSavedObjectIndices, deleteSavedObjectIndices } from './kibana_index';
import { deleteIndex } from './delete_index';
import { deleteDataStream } from './delete_data_stream';
import { ES_CLIENT_HEADERS } from '../../client_headers';
Expand Down Expand Up @@ -50,14 +50,37 @@ export function createCreateIndexStream({
// If we're trying to import Kibana index docs, we need to ensure that
// previous indices are removed so we're starting w/ a clean slate for
// migrations. This only needs to be done once per archive load operation.
let kibanaIndexAlreadyDeleted = false;
let kibanaIndicesAlreadyDeleted = false;
let kibanaTaskManagerIndexAlreadyDeleted = false;

// if we detect saved object documents defined in the data.json, we will cleanup their indices
let kibanaIndicesAlreadyCleaned = false;
let kibanaTaskManagerIndexAlreadyCleaned = false;

async function handleDoc(stream: Readable, record: DocRecord) {
if (skipDocsFromIndices.has(record.value.index)) {
const index = record.value.index;

if (skipDocsFromIndices.has(index)) {
return;
}

if (
index.startsWith(TASK_MANAGER_SAVED_OBJECT_INDEX) &&
!kibanaTaskManagerIndexAlreadyDeleted &&
!kibanaTaskManagerIndexAlreadyCleaned
) {
await cleanSavedObjectIndices({ client, stats, log, index });
kibanaTaskManagerIndexAlreadyCleaned = true;
log.debug(`Cleaned saved object index [${index}]`);
} else if (
index.startsWith(MAIN_SAVED_OBJECT_INDEX) &&
!kibanaIndicesAlreadyDeleted &&
!kibanaIndicesAlreadyCleaned
) {
await cleanSavedObjectIndices({ client, stats, log });
kibanaIndicesAlreadyCleaned = kibanaTaskManagerIndexAlreadyCleaned = true;
log.debug(`Cleaned all saved object indices`);
}
stream.push(record);
}

Expand Down Expand Up @@ -109,12 +132,14 @@ export function createCreateIndexStream({

async function attemptToCreate(attemptNumber = 1) {
try {
if (isKibana && !kibanaIndexAlreadyDeleted) {
if (isKibana && !kibanaIndicesAlreadyDeleted) {
await deleteSavedObjectIndices({ client, stats, log }); // delete all .kibana* indices
kibanaIndexAlreadyDeleted = kibanaTaskManagerIndexAlreadyDeleted = true;
kibanaIndicesAlreadyDeleted = kibanaTaskManagerIndexAlreadyDeleted = true;
log.debug(`Deleted all saved object indices`);
} else if (isKibanaTaskManager && !kibanaTaskManagerIndexAlreadyDeleted) {
await deleteSavedObjectIndices({ client, stats, onlyTaskManager: true, log }); // delete only .kibana_task_manager* indices
await deleteSavedObjectIndices({ client, stats, index, log }); // delete only .kibana_task_manager* index
kibanaTaskManagerIndexAlreadyDeleted = true;
log.debug(`Deleted saved object index [${index}]`);
}

await client.indices.create(
Expand All @@ -137,7 +162,11 @@ export function createCreateIndexStream({
err?.body?.error?.reason?.includes('index exists with the same name as the alias') &&
attemptNumber < 3
) {
kibanaIndexAlreadyDeleted = false;
kibanaTaskManagerIndexAlreadyDeleted = false;
if (isKibana) {
kibanaIndicesAlreadyDeleted = false;
}

const aliasStr = inspect(aliases);
log.info(
`failed to create aliases [${aliasStr}] because ES indicated an index/alias already exists, trying again`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
* Side Public License, v 1.
*/

import { mockCleanSavedObjectIndices } from './create_index_stream.test.mock';

import sinon from 'sinon';

import { createListStream, createPromiseFromStreams } from '@kbn/utils';
Expand All @@ -18,10 +20,15 @@ import {
createStubIndexRecord,
createStubDataStreamRecord,
createStubLogger,
createStubDocRecord,
} from './__mocks__/stubs';

const log = createStubLogger();

beforeEach(() => {
mockCleanSavedObjectIndices.mockClear();
});

describe('esArchiver: createDeleteIndexStream()', () => {
it('deletes the index without checking if it exists', async () => {
const stats = createStubStats();
Expand Down Expand Up @@ -73,4 +80,44 @@ describe('esArchiver: createDeleteIndexStream()', () => {
name: 'foo-template',
});
});

describe('saved object cleanup', () => {
describe('when saved object documents are found', () => {
it('cleans all saved object indices', async () => {
const client = createStubClient();
const stats = createStubStats();
await createPromiseFromStreams([
createListStream([
createStubDocRecord('.kibana_task_manager', 1),
createStubDocRecord('.kibana_alerting_cases', 2),
createStubDocRecord('.kibana', 3),
]),
createDeleteIndexStream(client, stats, log),
]);

expect(mockCleanSavedObjectIndices).toHaveBeenCalledTimes(1);

expect(mockCleanSavedObjectIndices).toHaveBeenCalledWith(
expect.not.objectContaining({ index: expect.any(String) })
);
});
});

describe('when saved object documents are not found', () => {
it('does not clean any indices', async () => {
const client = createStubClient();
const stats = createStubStats();
await createPromiseFromStreams([
createListStream([
createStubDocRecord('.foo', 1),
createStubDocRecord('.bar', 2),
createStubDocRecord('.baz', 3),
]),
createDeleteIndexStream(client, stats, log),
]);

expect(mockCleanSavedObjectIndices).not.toHaveBeenCalled();
});
});
});
});
15 changes: 14 additions & 1 deletion packages/kbn-es-archiver/src/lib/indices/delete_index_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { cleanSavedObjectIndices } from './kibana_index';
import { deleteDataStream } from './delete_data_stream';

export function createDeleteIndexStream(client: Client, stats: Stats, log: ToolingLog) {
let cleanedSavedObjectIndices = false;

return new Transform({
readableObjectMode: true,
writableObjectMode: true,
Expand All @@ -30,7 +32,11 @@ export function createDeleteIndexStream(client: Client, stats: Stats, log: Tooli
const { index } = record.value;

if (index.startsWith(MAIN_SAVED_OBJECT_INDEX)) {
await cleanSavedObjectIndices({ client, stats, log });
if (!cleanedSavedObjectIndices) {
await cleanSavedObjectIndices({ client, stats, log });
cleanedSavedObjectIndices = true;
log.debug(`Cleaned all saved object indices`);
}
} else {
await deleteIndex({ client, stats, log, index });
}
Expand All @@ -42,6 +48,13 @@ export function createDeleteIndexStream(client: Client, stats: Stats, log: Tooli

await deleteDataStream(client, dataStream, name);
stats.deletedDataStream(dataStream, name);
} else if (record.type === 'doc') {
const index = record.value.index;
if (index.startsWith(MAIN_SAVED_OBJECT_INDEX) && !cleanedSavedObjectIndices) {
await cleanSavedObjectIndices({ client, stats, log });
cleanedSavedObjectIndices = true;
log.debug(`Cleaned all saved object indices`);
}
} else {
this.push(record);
}
Expand Down
Loading

0 comments on commit 6becf9d

Please sign in to comment.