Skip to content

Commit

Permalink
bootstrapping the ZDT migration algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
pgayvallet committed Feb 15, 2023
1 parent fe78c05 commit b28424a
Show file tree
Hide file tree
Showing 30 changed files with 810 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import { schema, TypeOf } from '@kbn/config-schema';
import type { ServiceConfigDescriptor } from '@kbn/core-base-server-internal';

const migrationSchema = schema.object({
algorithm: schema.oneOf([schema.literal('v2'), schema.literal('zdt')], {
defaultValue: 'v2',
}),
batchSize: schema.number({ defaultValue: 1_000 }),
maxBatchSizeBytes: schema.byteSize({ defaultValue: '100mb' }), // 100mb is the default http.max_content_length Elasticsearch config value
discardUnknownObjects: schema.maybe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

export { DocumentMigrator, KibanaMigrator, buildActiveMappings, mergeTypes } from './src';
export { DocumentMigrator, KibanaMigrator, buildActiveMappings, buildTypesMappings } from './src';
export type { KibanaMigratorOptions } from './src';
export { getAggregatedTypesDocuments } from './src/actions/check_for_unknown_docs';
export { addExcludedTypesToBoolQuery } from './src/model/helpers';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export { logActionResponse, logStateTransition, type LogAwareState } from './logs';
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import type { Logger, LogMeta } from '@kbn/logging';
import { MigrationLog } from '../../types';

export interface LogAwareState {
controlState: string;
logs: MigrationLog[];
}


interface StateTransitionLogMeta extends LogMeta {
kibana: {
migrations: {
state: LogAwareState;
duration: number;
};
};
}

export const logStateTransition = (
logger: Logger,
logMessagePrefix: string,
prevState: LogAwareState,
currState: LogAwareState,
tookMs: number
) => {
if (currState.logs.length > prevState.logs.length) {
currState.logs.slice(prevState.logs.length).forEach(({ message, level }) => {
switch (level) {
case 'error':
return logger.error(logMessagePrefix + message);
case 'warning':
return logger.warn(logMessagePrefix + message);
case 'info':
return logger.info(logMessagePrefix + message);
default:
throw new Error(`unexpected log level ${level}`);
}
});
}

logger.info(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`
);
logger.debug<StateTransitionLogMeta>(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`,
{
kibana: {
migrations: {
state: currState,
duration: tookMs,
},
},
}
);
};

export const logActionResponse = (
logger: Logger,
logMessagePrefix: string,
state: LogAwareState,
res: unknown
) => {
logger.debug(logMessagePrefix + `${state.controlState} RESPONSE`, res as LogMeta);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import type { SavedObjectsType } from '@kbn/core-saved-objects-server';
import type { SavedObjectsTypeMappingDefinitions } from '@kbn/core-saved-objects-base-server-internal';

/**
* Merge mappings from all registered saved object types.
*/
export const buildTypesMappings = (
types: SavedObjectsType[]
): SavedObjectsTypeMappingDefinitions => {
return types.reduce((acc, { name: type, mappings }) => {
const duplicate = acc.hasOwnProperty(type);
if (duplicate) {
throw new Error(`Type ${type} is already defined.`);
}
return {
...acc,
[type]: mappings,
};
}, {});
};
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ export type { LogFn } from './migration_logger';
export { excludeUnusedTypesQuery, REMOVED_TYPES } from './unused_types';
export { TransformSavedObjectDocumentError } from './transform_saved_object_document_error';
export { deterministicallyRegenerateObjectId } from './regenerate_object_id';
export { buildTypesMappings } from './build_types_mappings';
export { createIndexMap } from './build_index_map';
export type {
DocumentsTransformFailed,
DocumentsTransformSuccess,
TransformErrorObjects,
} from './migrate_raw_docs';
export type { IndexMap, CreateIndexMapOptions } from './build_index_map';
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

export { KibanaMigrator, mergeTypes } from './kibana_migrator';
export { KibanaMigrator } from './kibana_migrator';
export type { KibanaMigratorOptions } from './kibana_migrator';
export { buildActiveMappings } from './core';
export { buildActiveMappings, buildTypesMappings } from './core';
export { DocumentMigrator } from './document_migrator';
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import Semver from 'semver';
import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectsType } from '@kbn/core-saved-objects-server';
import type {
SavedObjectUnsanitizedDoc,
SavedObjectsRawDoc,
Expand All @@ -31,11 +30,12 @@ import {
type KibanaMigratorStatus,
type MigrationResult,
} from '@kbn/core-saved-objects-base-server-internal';
import { buildActiveMappings } from './core';
import { buildActiveMappings, buildTypesMappings } from './core';
import { DocumentMigrator, type VersionedTransformer } from './document_migrator';
import { createIndexMap } from './core/build_index_map';
import { runResilientMigrator } from './run_resilient_migrator';
import { migrateRawDocsSafely } from './core/migrate_raw_docs';
import { runZeroDowntimeMigration } from './zdt';

// ensure plugins don't try to convert SO namespaceTypes after 8.0.0
// see https://github.com/elastic/kibana/issues/147344
Expand Down Expand Up @@ -91,7 +91,7 @@ export class KibanaMigrator implements IKibanaMigrator {
this.soMigrationsConfig = soMigrationsConfig;
this.typeRegistry = typeRegistry;
this.serializer = new SavedObjectsSerializer(this.typeRegistry);
this.mappingProperties = mergeTypes(this.typeRegistry.getAllTypes());
this.mappingProperties = buildTypesMappings(this.typeRegistry.getAllTypes());
this.log = logger;
this.kibanaVersion = kibanaVersion;
this.documentMigrator = new DocumentMigrator({
Expand Down Expand Up @@ -135,6 +135,27 @@ export class KibanaMigrator implements IKibanaMigrator {
}

private runMigrationsInternal(): Promise<MigrationResult[]> {
const migrationAlgorithm = this.soMigrationsConfig.algorithm;
if (migrationAlgorithm === 'zdt') {
return this.runMigrationZdt();
} else {
return this.runMigrationV2();
}
}

private runMigrationZdt(): Promise<MigrationResult[]> {
return runZeroDowntimeMigration({
kibanaIndexPrefix: this.kibanaIndex,
typeRegistry: this.typeRegistry,
logger: this.log,
documentMigrator: this.documentMigrator,
migrationConfig: this.soMigrationsConfig,
docLinks: this.docLinks,
serializer: this.serializer,
});
}

private runMigrationV2(): Promise<MigrationResult[]> {
const indexMap = createIndexMap({
kibanaIndexName: this.kibanaIndex,
indexMap: this.mappingProperties,
Expand Down Expand Up @@ -187,20 +208,3 @@ export class KibanaMigrator implements IKibanaMigrator {
return this.documentMigrator.migrate(doc);
}
}

/**
* Merges savedObjectMappings properties into a single object, verifying that
* no mappings are redefined.
*/
export function mergeTypes(types: SavedObjectsType[]): SavedObjectsTypeMappingDefinitions {
return types.reduce((acc, { name: type, mappings }) => {
const duplicate = acc.hasOwnProperty(type);
if (duplicate) {
throw new Error(`Type ${type} is already defined.`);
}
return {
...acc,
[type]: mappings,
};
}, {});
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,73 +8,18 @@

import { errors as EsErrors } from '@elastic/elasticsearch';
import * as Option from 'fp-ts/lib/Option';
import type { Logger, LogMeta } from '@kbn/logging';
import type { Logger } from '@kbn/logging';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import {
getErrorMessage,
getRequestDebugMeta,
} from '@kbn/core-elasticsearch-client-server-internal';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import { logActionResponse, logStateTransition } from './common/utils/logs';
import { type Model, type Next, stateActionMachine } from './state_action_machine';
import { cleanup } from './migrations_state_machine_cleanup';
import type { ReindexSourceToTempTransform, ReindexSourceToTempIndexBulk, State } from './state';

interface StateTransitionLogMeta extends LogMeta {
kibana: {
migrations: {
state: State;
duration: number;
};
};
}

const logStateTransition = (
logger: Logger,
logMessagePrefix: string,
prevState: State,
currState: State,
tookMs: number
) => {
if (currState.logs.length > prevState.logs.length) {
currState.logs.slice(prevState.logs.length).forEach(({ message, level }) => {
switch (level) {
case 'error':
return logger.error(logMessagePrefix + message);
case 'warning':
return logger.warn(logMessagePrefix + message);
case 'info':
return logger.info(logMessagePrefix + message);
default:
throw new Error(`unexpected log level ${level}`);
}
});
}

logger.info(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`
);
logger.debug<StateTransitionLogMeta>(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`,
{
kibana: {
migrations: {
state: currState,
duration: tookMs,
},
},
}
);
};

const logActionResponse = (
logger: Logger,
logMessagePrefix: string,
state: State,
res: unknown
) => {
logger.debug(logMessagePrefix + `${state.controlState} RESPONSE`, res as LogMeta);
};

/**
* A specialized migrations-specific state-action machine that:
* - logs messages in state.logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import type {
} from '@elastic/elasticsearch/lib/api/types';
import * as Either from 'fp-ts/lib/Either';
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
import type { State } from '../state';
import type { AliasAction, FetchIndexResponse } from '../actions';

/**
Expand All @@ -27,8 +26,8 @@ export function throwBadControlState(controlState: any) {
/**
* A helper function/type for ensuring that all response types are handled.
*/
export function throwBadResponse(state: State, p: never): never;
export function throwBadResponse(state: State, res: any): never {
export function throwBadResponse(state: { controlState: string }, p: never): never;
export function throwBadResponse(state: { controlState: string }, res: any): never {
throw new Error(
`${state.controlState} received unexpected action response: ` + JSON.stringify(res)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@
* Side Public License, v 1.
*/

import { State } from '../state';
import type { MigrationLog } from '../types';

export const delayRetryState = <S extends State>(
export interface RetryableState {
controlState: string;
retryCount: number;
retryDelay: number;
logs: MigrationLog[];
}

export const delayRetryState = <S extends RetryableState>(
state: S,
errorMessage: string,
/** How many times to retry a step that fails */
Expand Down Expand Up @@ -39,7 +46,7 @@ export const delayRetryState = <S extends State>(
};
}
};
export const resetRetryState = <S extends State>(state: S): S => {
export const resetRetryState = <S extends RetryableState>(state: S): S => {
return {
...state,
retryCount: 0,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import type {
IncompatibleClusterRoutingAllocation,
RetryableEsClientError,
WaitForTaskCompletionTimeout,
IndexNotFound,
} from '../../actions';

export {
initAction as init,
type InitActionParams,
type IncompatibleClusterRoutingAllocation,
type RetryableEsClientError,
type WaitForTaskCompletionTimeout,
type IndexNotFound,
} from '../../actions';

export interface ActionErrorTypeMap {
wait_for_task_completion_timeout: WaitForTaskCompletionTimeout;
incompatible_cluster_routing_allocation: IncompatibleClusterRoutingAllocation;
retryable_es_client_error: RetryableEsClientError;
index_not_found_exception: IndexNotFound;
}

/** Type guard for narrowing the type of a left */
export function isTypeof<T extends keyof ActionErrorTypeMap>(
res: any,
typeString: T
): res is ActionErrorTypeMap[T] {
return res.type === typeString;
}
Loading

0 comments on commit b28424a

Please sign in to comment.