diff --git a/rfcs/text/0013_saved_object_migrations.md b/rfcs/text/0013_saved_object_migrations.md index e5ff7616ccf82..0ca183f1fee44 100644 --- a/rfcs/text/0013_saved_object_migrations.md +++ b/rfcs/text/0013_saved_object_migrations.md @@ -257,9 +257,11 @@ Note: 6. Set a write block on the source index. This prevents any further writes from outdated nodes. 7. Create a new temporary index `.kibana_7.10.0_reindex_temp` with `dynamic: false` on the top-level mappings so that any kind of document can be written to the index. This allows us to write untransformed documents to the index which might have fields which have been removed from the latest mappings defined by the plugin. Define minimal mappings for the `migrationVersion` and `type` fields so that we're still able to search for outdated documents that need to be transformed. 1. Ignore errors if the target index already exists. -8. Reindex the source index into the new temporary index. - 1. Use `op_type=create` `conflicts=proceed` and `wait_for_completion=false` so that multiple instances can perform the reindex in parallel but only one write per document will succeed. - 2. Wait for the reindex task to complete. If reindexing doesn’t complete within the 60s timeout, log a warning for visibility and poll again. +8. Reindex the source index into the new temporary index using a 'client-side' reindex, by reading batches of documents from the source, migrating them, and indexing them into the temp index. + 1. Use `op_type=index` so that multiple instances can perform the reindex in parallel (last node running will override the documents, with no effect as the input data is the same) + 2. Ignore `version_conflict_engine_exception` exceptions as they just mean that another node was indexing the same documents + 3. If a `target_index_had_write_block` exception is encountered for all document of a batch, assume that another node already completed the temporary index reindex, and jump to the next step + 4. If a document transform throws an exception, add the document to a failure list and continue trying to transform all other documents (without writing them to the temp index). If any failures occured, log the complete list of documents that failed to transform, then fail the migration. 9. Clone the temporary index into the target index `.kibana_7.10.0_001`. Since any further writes will only happen against the cloned target index this prevents a lost delete from occuring where one instance finishes the migration and deletes a document and another instance's reindex operation re-creates the deleted document. 1. Set a write block on the temporary index 2. Clone the temporary index into the target index while specifying that the target index should have writes enabled. diff --git a/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.test.ts b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.test.ts index 8ff9591798fd4..57a1f54925d47 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.test.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.test.ts @@ -6,29 +6,96 @@ * Side Public License, v 1. */ -import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import * as Either from 'fp-ts/Either'; import { errors as EsErrors } from '@elastic/elasticsearch'; -jest.mock('./catch_retryable_es_client_errors'); import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; import { bulkOverwriteTransformedDocuments } from './bulk_overwrite_transformed_documents'; +jest.mock('./catch_retryable_es_client_errors'); + describe('bulkOverwriteTransformedDocuments', () => { beforeEach(() => { jest.clearAllMocks(); }); - // Create a mock client that rejects all methods with a 503 status code - // response. - const retryableError = new EsErrors.ResponseError( - elasticsearchClientMock.createApiResponse({ - statusCode: 503, - body: { error: { type: 'es_type', reason: 'es_reason' } }, - }) - ); - const client = elasticsearchClientMock.createInternalClient( - elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) - ); + it('resolves with `right:bulk_index_succeeded` if no error is encountered', async () => { + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createSuccessTransportRequestPromise({ + items: [ + { + index: { + _index: '.dolly', + }, + }, + { + index: { + _index: '.dolly', + }, + }, + ], + }) + ); + + const task = bulkOverwriteTransformedDocuments({ + client, + index: 'new_index', + transformedDocs: [], + refresh: 'wait_for', + }); + + const result = await task(); + + expect(Either.isRight(result)).toBe(true); + expect((result as Either.Right).right).toEqual('bulk_index_succeeded'); + }); + + it('resolves with `right:bulk_index_succeeded` if version conflict errors are encountered', async () => { + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createSuccessTransportRequestPromise({ + items: [ + { + index: { + _index: '.dolly', + }, + }, + { + index: { + error: { + type: 'version_conflict_engine_exception', + reason: 'reason', + }, + }, + }, + ], + }) + ); + + const task = bulkOverwriteTransformedDocuments({ + client, + index: 'new_index', + transformedDocs: [], + refresh: 'wait_for', + }); + + const result = await task(); + + expect(Either.isRight(result)).toBe(true); + expect((result as Either.Right).right).toEqual('bulk_index_succeeded'); + }); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + // Create a mock client that rejects all methods with a 503 status code response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + const task = bulkOverwriteTransformedDocuments({ client, index: 'new_index', @@ -43,4 +110,93 @@ describe('bulkOverwriteTransformedDocuments', () => { expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); }); + + it('resolves with `left:target_index_had_write_block` if all errors are write block exceptions', async () => { + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createSuccessTransportRequestPromise({ + items: [ + { + index: { + error: { + type: 'cluster_block_exception', + reason: + 'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]', + }, + }, + }, + { + index: { + error: { + type: 'cluster_block_exception', + reason: + 'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]', + }, + }, + }, + ], + }) + ); + + const task = bulkOverwriteTransformedDocuments({ + client, + index: 'new_index', + transformedDocs: [], + refresh: 'wait_for', + }); + + const result = await task(); + + expect(Either.isLeft(result)).toBe(true); + expect((result as Either.Left).left).toEqual({ + type: 'target_index_had_write_block', + }); + }); + + it('throws an error if any error is not a write block exceptions', async () => { + (catchRetryableEsClientErrors as jest.Mock).mockImplementation((e) => { + throw e; + }); + + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createSuccessTransportRequestPromise({ + items: [ + { + index: { + error: { + type: 'cluster_block_exception', + reason: + 'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]', + }, + }, + }, + { + index: { + error: { + type: 'dolly_exception', + reason: 'because', + }, + }, + }, + { + index: { + error: { + type: 'cluster_block_exception', + reason: + 'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]', + }, + }, + }, + ], + }) + ); + + const task = bulkOverwriteTransformedDocuments({ + client, + index: 'new_index', + transformedDocs: [], + refresh: 'wait_for', + }); + + await expect(task()).rejects.toThrow(); + }); }); diff --git a/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts index 830a8efccc7eb..4c0f8717576ac 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts @@ -15,7 +15,9 @@ import { catchRetryableEsClientErrors, RetryableEsClientError, } from './catch_retryable_es_client_errors'; +import { isWriteBlockException } from './es_errors'; import { WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE } from './constants'; +import type { TargetIndexHadWriteBlock } from './index'; /** @internal */ export interface BulkOverwriteTransformedDocumentsParams { @@ -24,6 +26,7 @@ export interface BulkOverwriteTransformedDocumentsParams { transformedDocs: SavedObjectsRawDoc[]; refresh?: estypes.Refresh; } + /** * Write the up-to-date transformed documents to the index, overwriting any * documents that are still on their outdated version. @@ -34,7 +37,7 @@ export const bulkOverwriteTransformedDocuments = ({ transformedDocs, refresh = false, }: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither< - RetryableEsClientError, + RetryableEsClientError | TargetIndexHadWriteBlock, 'bulk_index_succeeded' > => () => { return client @@ -71,12 +74,19 @@ export const bulkOverwriteTransformedDocuments = ({ .then((res) => { // Filter out version_conflict_engine_exception since these just mean // that another instance already updated these documents - const errors = (res.body.items ?? []).filter( - (item) => item.index?.error?.type !== 'version_conflict_engine_exception' - ); + const errors = (res.body.items ?? []) + .filter((item) => item.index?.error) + .map((item) => item.index!.error!) + .filter(({ type }) => type !== 'version_conflict_engine_exception'); + if (errors.length === 0) { return Either.right('bulk_index_succeeded' as const); } else { + if (errors.every(isWriteBlockException)) { + return Either.left({ + type: 'target_index_had_write_block' as const, + }); + } throw new Error(JSON.stringify(errors)); } }) diff --git a/src/core/server/saved_objects/migrationsv2/actions/es_errors.test.ts b/src/core/server/saved_objects/migrationsv2/actions/es_errors.test.ts new file mode 100644 index 0000000000000..c3a8c7a036a44 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/es_errors.test.ts @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { isIncompatibleMappingException, isWriteBlockException } from './es_errors'; + +describe('isWriteBlockError', () => { + it('returns true for a `index write` cluster_block_exception', () => { + expect( + isWriteBlockException({ + type: 'cluster_block_exception', + reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/index write (api)]`, + }) + ).toEqual(true); + }); + it('returns true for a `moving to block index write` cluster_block_exception', () => { + expect( + isWriteBlockException({ + type: 'cluster_block_exception', + reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/moving to block index write (api)]`, + }) + ).toEqual(true); + }); + it('returns false for incorrect type', () => { + expect( + isWriteBlockException({ + type: 'not_a_cluster_block_exception_at_all', + reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/index write (api)]`, + }) + ).toEqual(false); + }); +}); + +describe('isIncompatibleMappingExceptionError', () => { + it('returns true for `strict_dynamic_mapping_exception` errors', () => { + expect( + isIncompatibleMappingException({ + type: 'strict_dynamic_mapping_exception', + reason: 'idk', + }) + ).toEqual(true); + }); + + it('returns true for `mapper_parsing_exception` errors', () => { + expect( + isIncompatibleMappingException({ + type: 'mapper_parsing_exception', + reason: 'idk', + }) + ).toEqual(true); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/es_errors.ts b/src/core/server/saved_objects/migrationsv2/actions/es_errors.ts new file mode 100644 index 0000000000000..0d3c9fe3741aa --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/es_errors.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export interface EsErrorCause { + type: string; + reason: string; +} + +export const isWriteBlockException = ({ type, reason }: EsErrorCause): boolean => { + return ( + type === 'cluster_block_exception' && + reason.match(/index \[.+] blocked by: \[FORBIDDEN\/8\/.+ \(api\)\]/) !== null + ); +}; + +export const isIncompatibleMappingException = ({ type }: EsErrorCause): boolean => { + return type === 'strict_dynamic_mapping_exception' || type === 'mapper_parsing_exception'; +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/integration_tests/actions.test.ts b/src/core/server/saved_objects/migrationsv2/actions/integration_tests/actions.test.ts index 3fa4d59e383bf..ecce5e9543457 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/integration_tests/actions.test.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/integration_tests/actions.test.ts @@ -181,14 +181,17 @@ describe('migration actions', () => { { _source: { title: 'doc 3' } }, { _source: { title: 'doc 4' } }, ] as unknown) as SavedObjectsRawDoc[]; - await expect( - bulkOverwriteTransformedDocuments({ - client, - index: 'new_index_without_write_block', - transformedDocs: sourceDocs, - refresh: 'wait_for', - })() - ).rejects.toMatchObject(expect.anything()); + + const res = (await bulkOverwriteTransformedDocuments({ + client, + index: 'new_index_without_write_block', + transformedDocs: sourceDocs, + refresh: 'wait_for', + })()) as Either.Left; + + expect(res.left).toEqual({ + type: 'target_index_had_write_block', + }); }); it('resolves left index_not_found_exception when the index does not exist', async () => { expect.assertions(1); @@ -1094,6 +1097,7 @@ describe('migration actions', () => { return Either.right({ processedDocs }); }; } + const transformTask = transformDocs({ transformRawDocs: innerTransformRawDocs, outdatedDocuments: originalDocs, @@ -1496,7 +1500,7 @@ describe('migration actions', () => { } `); }); - it('rejects if there are errors', async () => { + it('resolves left if there are write_block errors', async () => { const newDocs = ([ { _source: { title: 'doc 5' } }, { _source: { title: 'doc 6' } }, @@ -1509,7 +1513,14 @@ describe('migration actions', () => { transformedDocs: newDocs, refresh: 'wait_for', })() - ).rejects.toMatchObject(expect.anything()); + ).resolves.toMatchInlineSnapshot(` + Object { + "_tag": "Left", + "left": Object { + "type": "target_index_had_write_block", + }, + } + `); }); }); }); diff --git a/src/core/server/saved_objects/migrationsv2/actions/integration_tests/es_errors.test.ts b/src/core/server/saved_objects/migrationsv2/actions/integration_tests/es_errors.test.ts new file mode 100644 index 0000000000000..baeef6b9d9f56 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/integration_tests/es_errors.test.ts @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { ElasticsearchClient } from '../../../../'; +import { InternalCoreStart } from '../../../../internal_types'; +import * as kbnTestServer from '../../../../../test_helpers/kbn_server'; +import { Root } from '../../../../root'; +import { isWriteBlockException } from '../es_errors'; +import { createIndex } from '../create_index'; +import { setWriteBlock } from '../set_write_block'; + +const { startES } = kbnTestServer.createTestServers({ + adjustTimeout: (t: number) => jest.setTimeout(t), +}); + +describe('Elasticsearch Errors', () => { + let root: Root; + let start: InternalCoreStart; + let client: ElasticsearchClient; + let esServer: kbnTestServer.TestElasticsearchUtils; + + beforeAll(async () => { + esServer = await startES(); + root = kbnTestServer.createRootWithCorePlugins({ + server: { + basePath: '/foo', + }, + }); + + await root.setup(); + start = await root.start(); + client = start.elasticsearch.client.asInternalUser; + + await createIndex({ + client, + indexName: 'existing_index_with_write_block', + mappings: { properties: {} }, + })(); + await setWriteBlock({ client, index: 'existing_index_with_write_block' })(); + }); + + afterAll(async () => { + await esServer.stop(); + await root.shutdown(); + }); + + describe('isWriteBlockException', () => { + it('correctly identify errors from index operations', async () => { + const res = await client.index( + { + index: 'existing_index_with_write_block', + id: 'some-id', + op_type: 'index', + body: { + hello: 'dolly', + }, + }, + { ignore: [403] } + ); + + expect(isWriteBlockException(res.body.error!)).toEqual(true); + }); + + it('correctly identify errors from create operations', async () => { + const res = await client.create( + { + index: 'existing_index_with_write_block', + id: 'some-id', + body: { + hello: 'dolly', + }, + }, + { ignore: [403] } + ); + + expect(isWriteBlockException(res.body.error!)).toEqual(true); + }); + + it('correctly identify errors from bulk index operations', async () => { + const res = await client.bulk({ + refresh: 'wait_for', + body: [ + { + index: { + _index: 'existing_index_with_write_block', + _id: 'some-id', + }, + }, + { + hello: 'dolly', + }, + ], + }); + + const cause = res.body.items[0].index!.error!; + + expect(isWriteBlockException(cause)).toEqual(true); + }); + + it('correctly identify errors from bulk create operations', async () => { + const res = await client.bulk({ + refresh: 'wait_for', + body: [ + { + create: { + _index: 'existing_index_with_write_block', + _id: 'some-id', + op_type: 'index', + }, + }, + { + hello: 'dolly', + }, + ], + }); + + const cause = res.body.items[0].create!.error!; + + expect(isWriteBlockException(cause)).toEqual(true); + }); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.ts b/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.ts index 18cf3350292b5..cafc8f15f0290 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.ts @@ -10,12 +10,14 @@ import * as TaskEither from 'fp-ts/lib/TaskEither'; import * as Option from 'fp-ts/lib/Option'; import { flow } from 'fp-ts/lib/function'; import { RetryableEsClientError } from './catch_retryable_es_client_errors'; -import type { IndexNotFound, WaitForReindexTaskFailure, TargetIndexHadWriteBlock } from './index'; +import type { IndexNotFound, TargetIndexHadWriteBlock } from './index'; import { waitForTask, WaitForTaskCompletionTimeout } from './wait_for_task'; +import { isWriteBlockException, isIncompatibleMappingException } from './es_errors'; export interface IncompatibleMappingException { type: 'incompatible_mapping_exception'; } + export const waitForReindexTask = flow( waitForTask, TaskEither.chain( @@ -29,15 +31,6 @@ export const waitForReindexTask = flow( | WaitForTaskCompletionTimeout, 'reindex_succeeded' > => { - const failureIsAWriteBlock = ({ cause: { type, reason } }: WaitForReindexTaskFailure) => - type === 'cluster_block_exception' && - reason.match(/index \[.+] blocked by: \[FORBIDDEN\/8\/index write \(api\)\]/); - - const failureIsIncompatibleMappingException = ({ - cause: { type, reason }, - }: WaitForReindexTaskFailure) => - type === 'strict_dynamic_mapping_exception' || type === 'mapper_parsing_exception'; - if (Option.isSome(res.error)) { if (res.error.value.type === 'index_not_found_exception') { return TaskEither.left({ @@ -48,9 +41,10 @@ export const waitForReindexTask = flow( throw new Error('Reindex failed with the following error:\n' + JSON.stringify(res.error)); } } else if (Option.isSome(res.failures)) { - if (res.failures.value.every(failureIsAWriteBlock)) { + const failureCauses = res.failures.value.map((failure) => failure.cause); + if (failureCauses.every(isWriteBlockException)) { return TaskEither.left({ type: 'target_index_had_write_block' as const }); - } else if (res.failures.value.every(failureIsIncompatibleMappingException)) { + } else if (failureCauses.every(isIncompatibleMappingException)) { return TaskEither.left({ type: 'incompatible_mapping_exception' as const }); } else { throw new Error( diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.13.0_concurrent_5k_foo.zip b/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.13.0_concurrent_5k_foo.zip new file mode 100644 index 0000000000000..46cc61cbe7b5f Binary files /dev/null and b/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.13.0_concurrent_5k_foo.zip differ diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_kibana_nodes.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_kibana_nodes.test.ts new file mode 100644 index 0000000000000..6d98576581a25 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_kibana_nodes.test.ts @@ -0,0 +1,259 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import Path from 'path'; +import Fs from 'fs'; +import Util from 'util'; +import glob from 'glob'; +import { esTestConfig, kibanaServerTestUser } from '@kbn/test'; +import { kibanaPackageJson as pkg } from '@kbn/utils'; +import * as kbnTestServer from '../../../../test_helpers/kbn_server'; +import type { ElasticsearchClient } from '../../../elasticsearch'; +import { SavedObjectsType } from '../../types'; +import type { Root } from '../../../root'; + +const LOG_FILE_PREFIX = 'migration_test_multiple_kibana_nodes'; + +const asyncUnlink = Util.promisify(Fs.unlink); + +async function removeLogFiles() { + glob(Path.join(__dirname, `${LOG_FILE_PREFIX}_*.log`), (err, files) => { + files.forEach(async (file) => { + // ignore errors if it doesn't exist + await asyncUnlink(file).catch(() => void 0); + }); + }); +} + +function extractSortNumberFromId(id: string): number { + const parsedId = parseInt(id.split(':')[1], 10); // "foo:123" -> 123 + if (isNaN(parsedId)) { + throw new Error(`Failed to parse Saved Object ID [${id}]. Result is NaN`); + } + return parsedId; +} + +async function fetchDocs(esClient: ElasticsearchClient, index: string) { + const { body } = await esClient.search({ + index, + size: 10000, + body: { + query: { + bool: { + should: [ + { + term: { type: 'foo' }, + }, + ], + }, + }, + }, + }); + + return body.hits.hits + .map((h) => ({ + ...h._source, + id: h._id, + })) + .sort((a, b) => extractSortNumberFromId(a.id) - extractSortNumberFromId(b.id)); +} + +interface CreateRootConfig { + logFileName: string; +} + +function createRoot({ logFileName }: CreateRootConfig) { + return kbnTestServer.createRoot({ + elasticsearch: { + hosts: [esTestConfig.getUrl()], + username: kibanaServerTestUser.username, + password: kibanaServerTestUser.password, + }, + migrations: { + skip: false, + enableV2: true, + batchSize: 100, // fixture contains 5000 docs + }, + logging: { + appenders: { + file: { + type: 'file', + fileName: logFileName, + layout: { + type: 'pattern', + }, + }, + }, + loggers: [ + { + name: 'root', + appenders: ['file'], + }, + { + name: 'savedobjects-service', + appenders: ['file'], + level: 'debug', + }, + ], + }, + }); +} + +describe('migration v2', () => { + let esServer: kbnTestServer.TestElasticsearchUtils; + let rootA: Root; + let rootB: Root; + let rootC: Root; + + const migratedIndex = `.kibana_${pkg.version}_001`; + const fooType: SavedObjectsType = { + name: 'foo', + hidden: false, + mappings: { properties: { status: { type: 'text' } } }, + namespaceType: 'agnostic', + migrations: { + '7.14.0': (doc) => { + if (doc.attributes?.status) { + doc.attributes.status = doc.attributes.status.replace('unmigrated', 'migrated'); + } + return doc; + }, + }, + }; + + afterAll(async () => { + await new Promise((resolve) => setTimeout(resolve, 10000)); + }); + + beforeEach(async () => { + await removeLogFiles(); + + rootA = createRoot({ + logFileName: Path.join(__dirname, `${LOG_FILE_PREFIX}_A.log`), + }); + rootB = createRoot({ + logFileName: Path.join(__dirname, `${LOG_FILE_PREFIX}_B.log`), + }); + rootC = createRoot({ + logFileName: Path.join(__dirname, `${LOG_FILE_PREFIX}_C.log`), + }); + + const { startES } = kbnTestServer.createTestServers({ + adjustTimeout: (t: number) => jest.setTimeout(t), + settings: { + es: { + license: 'basic', + // original SOs: 5k of `foo` docs with this structure: + // [ + // { id: 'foo:1', type: 'foo', foo: { status: 'unmigrated' }, migrationVersion: { foo: '7.13.0' } }, + // { id: 'foo:2', type: 'foo', foo: { status: 'unmigrated' }, migrationVersion: { foo: '7.13.0' } }, + // { id: 'foo:3', type: 'foo', foo: { status: 'unmigrated' }, migrationVersion: { foo: '7.13.0' } }, + // ]; + dataArchive: Path.join(__dirname, 'archives', '7.13.0_concurrent_5k_foo.zip'), + }, + }, + }); + esServer = await startES(); + }); + + afterEach(async () => { + await Promise.all([rootA.shutdown(), rootB.shutdown(), rootC.shutdown()]); + + if (esServer) { + await esServer.stop(); + } + }); + + const delay = (timeInMs: number) => new Promise((resolve) => setTimeout(resolve, timeInMs)); + const startWithDelay = async (instances: Root[], delayInSec: number) => { + const promises: Array> = []; + for (let i = 0; i < instances.length; i++) { + promises.push(instances[i].start()); + if (i < instances.length - 1) { + await delay(delayInSec * 1000); + } + } + return Promise.all(promises); + }; + + it('migrates saved objects normally when multiple Kibana instances are started at the same time', async () => { + const setupContracts = await Promise.all([rootA.setup(), rootB.setup(), rootC.setup()]); + + setupContracts.forEach((setup) => setup.savedObjects.registerType(fooType)); + + await startWithDelay([rootA, rootB, rootC], 0); + + const esClient = esServer.es.getClient(); + const migratedDocs = await fetchDocs(esClient, migratedIndex); + + expect(migratedDocs.length).toBe(5000); + + migratedDocs.forEach((doc, i) => { + expect(doc.id).toBe(`foo:${i}`); + expect(doc.foo.status).toBe(`migrated`); + expect(doc.migrationVersion.foo).toBe('7.14.0'); + }); + }); + + it('migrates saved objects normally when multiple Kibana instances are started with a small interval', async () => { + const setupContracts = await Promise.all([rootA.setup(), rootB.setup(), rootC.setup()]); + + setupContracts.forEach((setup) => setup.savedObjects.registerType(fooType)); + + await startWithDelay([rootA, rootB, rootC], 1); + + const esClient = esServer.es.getClient(); + const migratedDocs = await fetchDocs(esClient, migratedIndex); + + expect(migratedDocs.length).toBe(5000); + + migratedDocs.forEach((doc, i) => { + expect(doc.id).toBe(`foo:${i}`); + expect(doc.foo.status).toBe(`migrated`); + expect(doc.migrationVersion.foo).toBe('7.14.0'); + }); + }); + + it('migrates saved objects normally when multiple Kibana instances are started with an average interval', async () => { + const setupContracts = await Promise.all([rootA.setup(), rootB.setup(), rootC.setup()]); + + setupContracts.forEach((setup) => setup.savedObjects.registerType(fooType)); + + await startWithDelay([rootA, rootB, rootC], 5); + + const esClient = esServer.es.getClient(); + const migratedDocs = await fetchDocs(esClient, migratedIndex); + + expect(migratedDocs.length).toBe(5000); + + migratedDocs.forEach((doc, i) => { + expect(doc.id).toBe(`foo:${i}`); + expect(doc.foo.status).toBe(`migrated`); + expect(doc.migrationVersion.foo).toBe('7.14.0'); + }); + }); + + it('migrates saved objects normally when multiple Kibana instances are started with a bigger interval', async () => { + const setupContracts = await Promise.all([rootA.setup(), rootB.setup(), rootC.setup()]); + + setupContracts.forEach((setup) => setup.savedObjects.registerType(fooType)); + + await startWithDelay([rootA, rootB, rootC], 20); + + const esClient = esServer.es.getClient(); + const migratedDocs = await fetchDocs(esClient, migratedIndex); + + expect(migratedDocs.length).toBe(5000); + + migratedDocs.forEach((doc, i) => { + expect(doc.id).toBe(`foo:${i}`); + expect(doc.foo.status).toBe(`migrated`); + expect(doc.migrationVersion.foo).toBe('7.14.0'); + }); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/model/model.test.ts b/src/core/server/saved_objects/migrationsv2/model/model.test.ts index 174459d04d9ee..136709d1b874f 100644 --- a/src/core/server/saved_objects/migrationsv2/model/model.test.ts +++ b/src/core/server/saved_objects/migrationsv2/model/model.test.ts @@ -1054,6 +1054,15 @@ describe('migrations v2 model', () => { expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); + test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT if response is left target_index_had_write_block', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = Either.left({ + type: 'target_index_had_write_block', + }); + const newState = model(reindexSourceToTempIndexBulkState, res); + expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_CLOSE_PIT'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK should throw a throwBadResponse error if action failed', () => { const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = Either.left({ type: 'retryable_es_client_error', @@ -1101,7 +1110,7 @@ describe('migrations v2 model', () => { expect(newState.retryCount).toBe(0); expect(newState.retryDelay).toBe(0); }); - it('CLONE_TEMP_TO_TARGET -> REFRESH_TARGET if response is left index_not_fonud_exception', () => { + it('CLONE_TEMP_TO_TARGET -> REFRESH_TARGET if response is left index_not_found_exception', () => { const res: ResponseType<'CLONE_TEMP_TO_TARGET'> = Either.left({ type: 'index_not_found_exception', index: 'temp_index', diff --git a/src/core/server/saved_objects/migrationsv2/model/model.ts b/src/core/server/saved_objects/migrationsv2/model/model.ts index e7d6b8ed175e5..b28e4e3024380 100644 --- a/src/core/server/saved_objects/migrationsv2/model/model.ts +++ b/src/core/server/saved_objects/migrationsv2/model/model.ts @@ -499,7 +499,15 @@ export const model = (currentState: State, resW: ResponseType): transformErrors: [], }; } else { - throwBadResponse(stateP, res); + if (isLeftTypeof(res.left, 'target_index_had_write_block')) { + // the temp index has a write block, meaning that another instance already finished and moved forward. + // close the PIT search and carry on with the happy path. + return { + ...stateP, + controlState: 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT', + }; + } + throwBadResponse(stateP, res.left); } } else if (stateP.controlState === 'SET_TEMP_WRITE_BLOCK') { const res = resW as ExcludeRetryableEsError>; @@ -667,7 +675,7 @@ export const model = (currentState: State, resW: ResponseType): hasTransformedDocs: true, }; } else { - throwBadResponse(stateP, res); + throwBadResponse(stateP, res as never); } } else if (stateP.controlState === 'UPDATE_TARGET_MAPPINGS') { const res = resW as ExcludeRetryableEsError>;