Skip to content

Commit

Permalink
Bootstrap ZDT migration algorithm (#151282)
Browse files Browse the repository at this point in the history
## Summary

Part of #150309

Purpose of the PR is to create the skeleton of the ZDT algorithm, in
order to make sure we're all aligned on the way we'll be managing our
codebase between the 2 implementation (and to ease with the review of
the follow-up PRs by not having the bootstrap of the algo to review at
the same time)

---------

Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
pgayvallet and kibanamachine authored Feb 27, 2023
1 parent c164d77 commit bbbf8d1
Show file tree
Hide file tree
Showing 40 changed files with 1,151 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import { schema, TypeOf } from '@kbn/config-schema';
import type { ServiceConfigDescriptor } from '@kbn/core-base-server-internal';

const migrationSchema = schema.object({
algorithm: schema.oneOf([schema.literal('v2'), schema.literal('zdt')], {
defaultValue: 'v2',
}),
batchSize: schema.number({ defaultValue: 1_000 }),
maxBatchSizeBytes: schema.byteSize({ defaultValue: '100mb' }), // 100mb is the default http.max_content_length Elasticsearch config value
discardUnknownObjects: schema.maybe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

export { DocumentMigrator, KibanaMigrator, buildActiveMappings, mergeTypes } from './src';
export { DocumentMigrator, KibanaMigrator, buildActiveMappings, buildTypesMappings } from './src';
export type { KibanaMigratorOptions } from './src';
export { getAggregatedTypesDocuments } from './src/actions/check_for_unknown_docs';
export {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export { logActionResponse, logStateTransition, type LogAwareState } from './logs';
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import type { Logger, LogMeta } from '@kbn/logging';
import { MigrationLog } from '../../types';

export interface LogAwareState {
controlState: string;
logs: MigrationLog[];
}

interface StateTransitionLogMeta extends LogMeta {
kibana: {
migrations: {
state: LogAwareState;
duration: number;
};
};
}

export const logStateTransition = (
logger: Logger,
logMessagePrefix: string,
prevState: LogAwareState,
currState: LogAwareState,
tookMs: number
) => {
if (currState.logs.length > prevState.logs.length) {
currState.logs.slice(prevState.logs.length).forEach(({ message, level }) => {
switch (level) {
case 'error':
return logger.error(logMessagePrefix + message);
case 'warning':
return logger.warn(logMessagePrefix + message);
case 'info':
return logger.info(logMessagePrefix + message);
default:
throw new Error(`unexpected log level ${level}`);
}
});
}

logger.info(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`
);
logger.debug<StateTransitionLogMeta>(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`,
{
kibana: {
migrations: {
state: currState,
duration: tookMs,
},
},
}
);
};

export const logActionResponse = (
logger: Logger,
logMessagePrefix: string,
state: LogAwareState,
res: unknown
) => {
logger.debug(logMessagePrefix + `${state.controlState} RESPONSE`, res as LogMeta);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import type { SavedObjectsType } from '@kbn/core-saved-objects-server';
import type { SavedObjectsTypeMappingDefinitions } from '@kbn/core-saved-objects-base-server-internal';

/**
* Merge mappings from all registered saved object types.
*/
export const buildTypesMappings = (
types: SavedObjectsType[]
): SavedObjectsTypeMappingDefinitions => {
return types.reduce((acc, { name: type, mappings }) => {
const duplicate = acc.hasOwnProperty(type);
if (duplicate) {
throw new Error(`Type ${type} is already defined.`);
}
return {
...acc,
[type]: mappings,
};
}, {});
};
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export type { LogFn } from './migration_logger';
export { excludeUnusedTypesQuery, REMOVED_TYPES } from './unused_types';
export { TransformSavedObjectDocumentError } from './transform_saved_object_document_error';
export { deterministicallyRegenerateObjectId } from './regenerate_object_id';
export { buildTypesMappings } from './build_types_mappings';
export { createIndexMap, type IndexMap, type CreateIndexMapOptions } from './build_index_map';
export type {
DocumentsTransformFailed,
DocumentsTransformSuccess,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

export { KibanaMigrator, mergeTypes } from './kibana_migrator';
export { KibanaMigrator } from './kibana_migrator';
export type { KibanaMigratorOptions } from './kibana_migrator';
export { buildActiveMappings } from './core';
export { buildActiveMappings, buildTypesMappings } from './core';
export { DocumentMigrator } from './document_migrator';
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ const mockOptions = () => {
]),
kibanaIndex: '.my-index',
soMigrationsConfig: {
algorithm: 'v2',
batchSize: 20,
maxBatchSizeBytes: ByteSizeValue.parse('20mb'),
pollInterval: 20000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import Semver from 'semver';
import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectsType } from '@kbn/core-saved-objects-server';
import type {
SavedObjectUnsanitizedDoc,
SavedObjectsRawDoc,
Expand All @@ -31,11 +30,12 @@ import {
type KibanaMigratorStatus,
type MigrationResult,
} from '@kbn/core-saved-objects-base-server-internal';
import { buildActiveMappings } from './core';
import { buildActiveMappings, buildTypesMappings } from './core';
import { DocumentMigrator, type VersionedTransformer } from './document_migrator';
import { createIndexMap } from './core/build_index_map';
import { runResilientMigrator } from './run_resilient_migrator';
import { migrateRawDocsSafely } from './core/migrate_raw_docs';
import { runZeroDowntimeMigration } from './zdt';

// ensure plugins don't try to convert SO namespaceTypes after 8.0.0
// see https://github.com/elastic/kibana/issues/147344
Expand Down Expand Up @@ -91,7 +91,7 @@ export class KibanaMigrator implements IKibanaMigrator {
this.soMigrationsConfig = soMigrationsConfig;
this.typeRegistry = typeRegistry;
this.serializer = new SavedObjectsSerializer(this.typeRegistry);
this.mappingProperties = mergeTypes(this.typeRegistry.getAllTypes());
this.mappingProperties = buildTypesMappings(this.typeRegistry.getAllTypes());
this.log = logger;
this.kibanaVersion = kibanaVersion;
this.documentMigrator = new DocumentMigrator({
Expand Down Expand Up @@ -135,6 +135,28 @@ export class KibanaMigrator implements IKibanaMigrator {
}

private runMigrationsInternal(): Promise<MigrationResult[]> {
const migrationAlgorithm = this.soMigrationsConfig.algorithm;
if (migrationAlgorithm === 'zdt') {
return this.runMigrationZdt();
} else {
return this.runMigrationV2();
}
}

private runMigrationZdt(): Promise<MigrationResult[]> {
return runZeroDowntimeMigration({
kibanaIndexPrefix: this.kibanaIndex,
typeRegistry: this.typeRegistry,
logger: this.log,
documentMigrator: this.documentMigrator,
migrationConfig: this.soMigrationsConfig,
docLinks: this.docLinks,
serializer: this.serializer,
elasticsearchClient: this.client,
});
}

private runMigrationV2(): Promise<MigrationResult[]> {
const indexMap = createIndexMap({
kibanaIndexName: this.kibanaIndex,
indexMap: this.mappingProperties,
Expand Down Expand Up @@ -187,20 +209,3 @@ export class KibanaMigrator implements IKibanaMigrator {
return this.documentMigrator.migrate(doc);
}
}

/**
* Merges savedObjectMappings properties into a single object, verifying that
* no mappings are redefined.
*/
export function mergeTypes(types: SavedObjectsType[]): SavedObjectsTypeMappingDefinitions {
return types.reduce((acc, { name: type, mappings }) => {
const duplicate = acc.hasOwnProperty(type);
if (duplicate) {
throw new Error(`Type ${type} is already defined.`);
}
return {
...acc,
[type]: mappings,
};
}, {});
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ describe('migrationsStateActionMachine', () => {
migrationVersionPerType: {},
indexPrefix: '.my-so-index',
migrationsConfig: {
algorithm: 'v2',
batchSize: 1000,
maxBatchSizeBytes: new ByteSizeValue(1e8),
pollInterval: 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,75 +8,20 @@

import { errors as EsErrors } from '@elastic/elasticsearch';
import * as Option from 'fp-ts/lib/Option';
import type { Logger, LogMeta } from '@kbn/logging';
import type { Logger } from '@kbn/logging';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import {
getErrorMessage,
getRequestDebugMeta,
} from '@kbn/core-elasticsearch-client-server-internal';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import type { BulkOperationContainer } from '@elastic/elasticsearch/lib/api/types';
import { logActionResponse, logStateTransition } from './common/utils/logs';
import { type Model, type Next, stateActionMachine } from './state_action_machine';
import { cleanup } from './migrations_state_machine_cleanup';
import type { ReindexSourceToTempTransform, ReindexSourceToTempIndexBulk, State } from './state';
import type { BulkOperation } from './model/create_batches';

interface StateTransitionLogMeta extends LogMeta {
kibana: {
migrations: {
state: State;
duration: number;
};
};
}

const logStateTransition = (
logger: Logger,
logMessagePrefix: string,
prevState: State,
currState: State,
tookMs: number
) => {
if (currState.logs.length > prevState.logs.length) {
currState.logs.slice(prevState.logs.length).forEach(({ message, level }) => {
switch (level) {
case 'error':
return logger.error(logMessagePrefix + message);
case 'warning':
return logger.warn(logMessagePrefix + message);
case 'info':
return logger.info(logMessagePrefix + message);
default:
throw new Error(`unexpected log level ${level}`);
}
});
}

logger.info(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`
);
logger.debug<StateTransitionLogMeta>(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`,
{
kibana: {
migrations: {
state: currState,
duration: tookMs,
},
},
}
);
};

const logActionResponse = (
logger: Logger,
logMessagePrefix: string,
state: State,
res: unknown
) => {
logger.debug(logMessagePrefix + `${state.controlState} RESPONSE`, res as LogMeta);
};

/**
* A specialized migrations-specific state-action machine that:
* - logs messages in state.logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import * as Actions from './actions';
import type { State } from './state';

export async function cleanup(client: ElasticsearchClient, state?: State) {
if (!state) return;
type CleanableState = { sourceIndexPitId: string } | {};

export async function cleanup(client: ElasticsearchClient, state?: CleanableState) {
if (!state) {
return;
}
if ('sourceIndexPitId' in state) {
await Actions.closePit({ client, pitId: state.sourceIndexPitId })();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,22 @@ import type {
import * as Either from 'fp-ts/lib/Either';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
import type { State } from '../state';
import type { AliasAction, FetchIndexResponse } from '../actions';
import type { BulkIndexOperationTuple } from './create_batches';

/**
* A helper function/type for ensuring that all control state's are handled.
*/
export function throwBadControlState(p: never): never;
export function throwBadControlState(controlState: any) {
export function throwBadControlState(controlState: unknown) {
throw new Error('Unexpected control state: ' + controlState);
}

/**
* A helper function/type for ensuring that all response types are handled.
*/
export function throwBadResponse(state: State, p: never): never;
export function throwBadResponse(state: State, res: any): never {
export function throwBadResponse(state: { controlState: string }, p: never): never;
export function throwBadResponse(state: { controlState: string }, res: unknown): never {
throw new Error(
`${state.controlState} received unexpected action response: ` + JSON.stringify(res)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@
* Side Public License, v 1.
*/

import { State } from '../state';
import type { MigrationLog } from '../types';

export const delayRetryState = <S extends State>(
export interface RetryableState {
controlState: string;
retryCount: number;
retryDelay: number;
logs: MigrationLog[];
}

export const delayRetryState = <S extends RetryableState>(
state: S,
errorMessage: string,
/** How many times to retry a step that fails */
Expand Down Expand Up @@ -39,7 +46,7 @@ export const delayRetryState = <S extends State>(
};
}
};
export const resetRetryState = <S extends State>(state: S): S => {
export const resetRetryState = <S extends RetryableState>(state: S): S => {
return {
...state,
retryCount: 0,
Expand Down
Loading

0 comments on commit bbbf8d1

Please sign in to comment.