From 6897a602f3e2f868f3c001cbe05f9826a6d95350 Mon Sep 17 00:00:00 2001 From: Vladimir Date: Fri, 29 Apr 2022 17:17:32 +0300 Subject: [PATCH] Choose optimal sync modes by default on UI (#12411) * fix CatalogSectionInner component padding * fix React key error in StreamFieldTable component * Update choosing the optimal sync mode logic. Sync Mode priority: 1. Incremental(cursor defined) => Append Dedup 2. Full Refresh => Overwrite 3. Incremental => Append 4. Full Refresh => Append * Fixes after comments to PR: - replace func "isEmpty" with "[].length" - replace named export with direct export - update "config" type for updateStreamConfig func - rename "getOptimalSyncMode" to "setOptimalSyncMode" * remove fp-ts package * fix wrong condition * cover formConfigHelpers with tests * add npm script: run tests with coverage * tiny fix: test assert --- airbyte-webapp/package.json | 1 + .../Connection/CatalogTree/CatalogSection.tsx | 1 + .../CatalogTree/StreamFieldTable.tsx | 2 +- .../Connection/ConnectionForm/formConfig.tsx | 58 ++------- .../ConnectionForm/formConfigHelpers.test.ts | 120 ++++++++++++++++++ .../ConnectionForm/formConfigHelpers.ts | 86 +++++++++++++ 6 files changed, 217 insertions(+), 51 deletions(-) create mode 100644 airbyte-webapp/src/views/Connection/ConnectionForm/formConfigHelpers.test.ts create mode 100644 airbyte-webapp/src/views/Connection/ConnectionForm/formConfigHelpers.ts diff --git a/airbyte-webapp/package.json b/airbyte-webapp/package.json index 1fbc245d3d6f..065e704118f5 100644 --- a/airbyte-webapp/package.json +++ b/airbyte-webapp/package.json @@ -9,6 +9,7 @@ "start": "react-scripts start", "build": "react-scripts build", "test": "react-scripts test", + "test:coverage": "npm test -- --coverage --watchAll=false", "format": "prettier --write 'src/**/*.{ts,tsx}'", "storybook": "start-storybook -p 9009 -s public --quiet", "lint": "eslint --ext js,ts,tsx src", diff --git a/airbyte-webapp/src/views/Connection/CatalogTree/CatalogSection.tsx b/airbyte-webapp/src/views/Connection/CatalogTree/CatalogSection.tsx index ed9eb24f09e6..7c69f2b78827 100644 --- a/airbyte-webapp/src/views/Connection/CatalogTree/CatalogSection.tsx +++ b/airbyte-webapp/src/views/Connection/CatalogTree/CatalogSection.tsx @@ -28,6 +28,7 @@ import { flatten, getPathType } from "./utils"; const Section = styled.div<{ error?: boolean; isSelected: boolean }>` border: 1px solid ${(props) => (props.error ? props.theme.dangerColor : "none")}; background: ${({ theme, isSelected }) => (isSelected ? "rgba(97, 94, 255, 0.1);" : theme.greyColor0)}; + padding: 2px; &:first-child { border-radius: 8px 8px 0 0; diff --git a/airbyte-webapp/src/views/Connection/CatalogTree/StreamFieldTable.tsx b/airbyte-webapp/src/views/Connection/CatalogTree/StreamFieldTable.tsx index 6776794f4cda..6e5454aaf1b7 100644 --- a/airbyte-webapp/src/views/Connection/CatalogTree/StreamFieldTable.tsx +++ b/airbyte-webapp/src/views/Connection/CatalogTree/StreamFieldTable.tsx @@ -38,7 +38,7 @@ export const StreamFieldTable: React.FC = (props) => { {props.syncSchemaFields.map((field) => ( - + +const useInitialSchema = (schema: SyncSchema, supportedDestinationSyncModes: DestinationSyncMode[]): SyncSchema => useMemo( () => ({ streams: schema.streams.map((apiNode, id) => { const nodeWithId: SyncSchemaStream = { ...apiNode, id: id.toString() }; + const nodeStream = verifyConfigCursorField(verifySupportedSyncModes(nodeWithId)); - // If the value in supportedSyncModes is empty assume the only supported sync mode is FULL_REFRESH. - // Otherwise, it supports whatever sync modes are present. - const streamNode = nodeWithId.stream.supportedSyncModes?.length - ? nodeWithId - : setIn(nodeWithId, "stream.supportedSyncModes", [SyncMode.FullRefresh]); - - // If syncMode isn't null - don't change item - if (streamNode.config.syncMode) { - return streamNode; - } - - const updateStreamConfig = (config: Partial): SyncSchemaStream => ({ - ...streamNode, - config: { ...streamNode.config, ...config }, - }); - - const supportedSyncModes = streamNode.stream.supportedSyncModes; - - // Prefer INCREMENTAL sync mode over other sync modes - if (supportedSyncModes.includes(SyncMode.Incremental)) { - return updateStreamConfig({ - cursorField: streamNode.config.cursorField.length - ? streamNode.config.cursorField - : getDefaultCursorField(streamNode), - syncMode: SyncMode.Incremental, - }); - } - - // If source don't support INCREMENTAL and FULL_REFRESH - set first value from supportedSyncModes list - return updateStreamConfig({ - syncMode: streamNode.stream.supportedSyncModes[0], - }); + return getOptimalSyncMode(nodeStream, supportedDestinationSyncModes); }), }), - [schema.streams] + [schema.streams, supportedDestinationSyncModes] ); const getInitialTransformations = (operations: Operation[]): Transformation[] => operations.filter(isDbtTransformation); @@ -266,7 +224,7 @@ const useInitialValues = ( destDefinition: DestinationDefinitionSpecification, isEditMode?: boolean ): FormikConnectionFormValues => { - const initialSchema = useInitialSchema(connection.syncCatalog); + const initialSchema = useInitialSchema(connection.syncCatalog, destDefinition.supportedDestinationSyncModes); return useMemo(() => { const initialValues: FormikConnectionFormValues = { diff --git a/airbyte-webapp/src/views/Connection/ConnectionForm/formConfigHelpers.test.ts b/airbyte-webapp/src/views/Connection/ConnectionForm/formConfigHelpers.test.ts new file mode 100644 index 000000000000..5cd853bfc6ea --- /dev/null +++ b/airbyte-webapp/src/views/Connection/ConnectionForm/formConfigHelpers.test.ts @@ -0,0 +1,120 @@ +import { DestinationSyncMode, SyncMode, SyncSchemaStream } from "../../../core/domain/catalog"; +import { getOptimalSyncMode, verifyConfigCursorField, verifySupportedSyncModes } from "./formConfigHelpers"; + +const mockedStreamNode: SyncSchemaStream = { + stream: { + name: "test", + supportedSyncModes: [], + jsonSchema: {}, + sourceDefinedCursor: null, + sourceDefinedPrimaryKey: [], + defaultCursorField: [], + }, + config: { + cursorField: [], + primaryKey: [], + selected: true, + syncMode: SyncMode.FullRefresh, + destinationSyncMode: DestinationSyncMode.Append, + aliasName: "", + }, + id: "1", +}; + +describe("formConfigHelpers", () => { + describe("verifySupportedSyncModes", () => { + const streamNodeWithDefinedSyncMode: SyncSchemaStream = { + ...mockedStreamNode, + stream: { ...mockedStreamNode.stream, supportedSyncModes: [SyncMode.Incremental] }, + }; + + test("should not change supportedSyncModes if it's not empty", () => { + const streamNode = verifySupportedSyncModes(streamNodeWithDefinedSyncMode); + + expect(streamNode.stream.supportedSyncModes).toStrictEqual([SyncMode.Incremental]); + }); + + test("should set default supportedSyncModes if it's empty", () => { + const streamNodeWithEmptySyncMode = { + ...streamNodeWithDefinedSyncMode, + stream: { ...mockedStreamNode.stream, supportedSyncModes: [] }, + }; + const streamNode = verifySupportedSyncModes(streamNodeWithEmptySyncMode); + + expect(streamNode.stream.supportedSyncModes).toStrictEqual([SyncMode.FullRefresh]); + }); + }); + + describe("verifyConfigCursorField", () => { + const streamWithDefinedCursorField: SyncSchemaStream = { + ...mockedStreamNode, + config: { ...mockedStreamNode.config, cursorField: ["id"] }, + stream: { ...mockedStreamNode.stream, defaultCursorField: ["name"] }, + }; + + test("should leave cursorField value as is if it's defined", () => { + const streamNode = verifyConfigCursorField(streamWithDefinedCursorField); + + expect(streamNode.config.cursorField).toStrictEqual(["id"]); + }); + + test("should set defaultCursorField if cursorField is not defined", () => { + const streamNodeWithoutDefinedCursor = { + ...streamWithDefinedCursorField, + config: { ...mockedStreamNode.config, cursorField: [] }, + }; + const streamNode = verifyConfigCursorField(streamNodeWithoutDefinedCursor); + + expect(streamNode.config.cursorField).toStrictEqual(["name"]); + }); + + test("should leave cursorField empty if defaultCursorField not defined", () => { + const streamNode = verifyConfigCursorField(mockedStreamNode); + + expect(streamNode.config.cursorField).toStrictEqual([]); + }); + }); + + describe("getOptimalSyncMode", () => { + test("should get 'Incremental(cursor defined) => Append dedup' mode", () => { + const streamNodeWithIncrDedupMode = { + ...mockedStreamNode, + stream: { ...mockedStreamNode.stream, supportedSyncModes: [SyncMode.Incremental], sourceDefinedCursor: true }, + }; + const nodeStream = getOptimalSyncMode(streamNodeWithIncrDedupMode, [DestinationSyncMode.Dedupted]); + + expect(nodeStream.config.syncMode).toBe(SyncMode.Incremental); + expect(nodeStream.config.destinationSyncMode).toBe(DestinationSyncMode.Dedupted); + }); + + test("should get 'FullRefresh => Overwrite' mode", () => { + const nodeStream = getOptimalSyncMode(mockedStreamNode, [DestinationSyncMode.Overwrite]); + + expect(nodeStream.config.syncMode).toBe(SyncMode.FullRefresh); + expect(nodeStream.config.destinationSyncMode).toBe(DestinationSyncMode.Overwrite); + }); + + test("should get 'Incremental => Append' mode", () => { + const streamNodeWithIncrAppendMode = { + ...mockedStreamNode, + stream: { ...mockedStreamNode.stream, supportedSyncModes: [SyncMode.Incremental] }, + }; + const nodeStream = getOptimalSyncMode(streamNodeWithIncrAppendMode, [DestinationSyncMode.Append]); + + expect(nodeStream.config.syncMode).toBe(SyncMode.Incremental); + expect(nodeStream.config.destinationSyncMode).toBe(DestinationSyncMode.Append); + }); + + test("should get 'FullRefresh => Append' mode", () => { + const nodeStream = getOptimalSyncMode(mockedStreamNode, [DestinationSyncMode.Append]); + + expect(nodeStream.config.syncMode).toBe(SyncMode.FullRefresh); + expect(nodeStream.config.destinationSyncMode).toBe(DestinationSyncMode.Append); + }); + test("should return untouched nodeStream", () => { + const nodeStream = getOptimalSyncMode(mockedStreamNode, []); + + expect(nodeStream).toBe(mockedStreamNode); + }); + }); +}); diff --git a/airbyte-webapp/src/views/Connection/ConnectionForm/formConfigHelpers.ts b/airbyte-webapp/src/views/Connection/ConnectionForm/formConfigHelpers.ts new file mode 100644 index 000000000000..649d98fc497b --- /dev/null +++ b/airbyte-webapp/src/views/Connection/ConnectionForm/formConfigHelpers.ts @@ -0,0 +1,86 @@ +import { + AirbyteStreamConfiguration, + DestinationSyncMode, + SyncMode, + SyncSchemaStream, +} from "../../../core/domain/catalog"; + +const getDefaultCursorField = (streamNode: SyncSchemaStream): string[] => { + if (streamNode.stream.defaultCursorField.length) { + return streamNode.stream.defaultCursorField; + } + return streamNode.config.cursorField; +}; + +export const verifySupportedSyncModes = (streamNode: SyncSchemaStream): SyncSchemaStream => { + const { + stream: { supportedSyncModes }, + } = streamNode; + + if (supportedSyncModes?.length) return streamNode; + return { ...streamNode, stream: { ...streamNode.stream, supportedSyncModes: [SyncMode.FullRefresh] } }; +}; + +export const verifyConfigCursorField = (streamNode: SyncSchemaStream): SyncSchemaStream => { + const { config } = streamNode; + + return { + ...streamNode, + config: { + ...config, + cursorField: config.cursorField?.length ? config.cursorField : getDefaultCursorField(streamNode), + }, + }; +}; + +export const getOptimalSyncMode = ( + streamNode: SyncSchemaStream, + supportedDestinationSyncModes: DestinationSyncMode[] +): SyncSchemaStream => { + const updateStreamConfig = ( + config: Pick + ): SyncSchemaStream => ({ + ...streamNode, + config: { ...streamNode.config, ...config }, + }); + + const { + stream: { supportedSyncModes, sourceDefinedCursor }, + } = streamNode; + + if ( + supportedSyncModes.includes(SyncMode.Incremental) && + supportedDestinationSyncModes.includes(DestinationSyncMode.Dedupted) && + sourceDefinedCursor + ) { + return updateStreamConfig({ + syncMode: SyncMode.Incremental, + destinationSyncMode: DestinationSyncMode.Dedupted, + }); + } + + if (supportedDestinationSyncModes.includes(DestinationSyncMode.Overwrite)) { + return updateStreamConfig({ + syncMode: SyncMode.FullRefresh, + destinationSyncMode: DestinationSyncMode.Overwrite, + }); + } + + if ( + supportedSyncModes.includes(SyncMode.Incremental) && + supportedDestinationSyncModes.includes(DestinationSyncMode.Append) + ) { + return updateStreamConfig({ + syncMode: SyncMode.Incremental, + destinationSyncMode: DestinationSyncMode.Append, + }); + } + + if (supportedDestinationSyncModes.includes(DestinationSyncMode.Append)) { + return updateStreamConfig({ + syncMode: SyncMode.FullRefresh, + destinationSyncMode: DestinationSyncMode.Append, + }); + } + return streamNode; +};