Skip to content

Commit

Permalink
Choose optimal sync modes by default on UI (#12411)
Browse files Browse the repository at this point in the history
* fix CatalogSectionInner component padding

* fix React key error in StreamFieldTable component

* Update choosing the optimal sync mode logic.

Sync Mode priority:
1. Incremental(cursor defined) => Append Dedup
2. Full Refresh => Overwrite
3. Incremental => Append
4. Full Refresh => Append

* Fixes after comments to PR:
- replace func "isEmpty" with "[].length"
- replace named export with direct export
- update "config" type for updateStreamConfig func
- rename "getOptimalSyncMode" to "setOptimalSyncMode"

* remove fp-ts package

* fix wrong condition

* cover formConfigHelpers with tests

* add npm script: run tests with coverage

* tiny fix: test assert
  • Loading branch information
dizel852 authored and suhomud committed May 23, 2022
1 parent 2738fd6 commit cea2f7a
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 51 deletions.
1 change: 1 addition & 0 deletions airbyte-webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"start": "react-scripts start",
"build": "react-scripts build",
"test": "react-scripts test",
"test:coverage": "npm test -- --coverage --watchAll=false",
"format": "prettier --write 'src/**/*.{ts,tsx}'",
"storybook": "start-storybook -p 9009 -s public --quiet",
"lint": "eslint --ext js,ts,tsx src",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { flatten, getPathType } from "./utils";
const Section = styled.div<{ error?: boolean; isSelected: boolean }>`
border: 1px solid ${(props) => (props.error ? props.theme.dangerColor : "none")};
background: ${({ theme, isSelected }) => (isSelected ? "rgba(97, 94, 255, 0.1);" : theme.greyColor0)};
padding: 2px;
&:first-child {
border-radius: 8px 8px 0 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export const StreamFieldTable: React.FC<StreamFieldTableProps> = (props) => {
</TreeRowWrapper>
<RowsContainer>
{props.syncSchemaFields.map((field) => (
<TreeRowWrapper depth={1} key={field.key}>
<TreeRowWrapper depth={1} key={pathDisplayName(field.path)}>
<FieldRow
path={field.path}
name={pathDisplayName(field.path)}
Expand Down
58 changes: 8 additions & 50 deletions airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
import { setIn } from "formik";
import { useMemo } from "react";
import { useIntl } from "react-intl";
import * as yup from "yup";

import { DropDownRow } from "components";

import FrequencyConfig from "config/FrequencyConfig.json";
import {
AirbyteStreamConfiguration,
DestinationSyncMode,
SyncMode,
SyncSchema,
SyncSchemaStream,
} from "core/domain/catalog";
import { DestinationSyncMode, SyncMode, SyncSchema, SyncSchemaStream } from "core/domain/catalog";
import { Connection, ScheduleProperties } from "core/domain/connection";
import { ConnectionNamespaceDefinition, ConnectionSchedule } from "core/domain/connection";
import {
Expand All @@ -28,6 +21,8 @@ import { SOURCE_NAMESPACE_TAG } from "core/domain/connector/source";
import { ValuesProps } from "hooks/services/useConnectionHook";
import { useCurrentWorkspace } from "services/workspaces/WorkspacesService";

import { getOptimalSyncMode, verifyConfigCursorField, verifySupportedSyncModes } from "./formConfigHelpers";

type FormikConnectionFormValues = {
schedule?: ScheduleProperties | null;
prefix: string;
Expand Down Expand Up @@ -197,54 +192,17 @@ function mapFormPropsToOperation(
return newOperations;
}

function getDefaultCursorField(streamNode: SyncSchemaStream): string[] {
if (streamNode.stream.defaultCursorField.length) {
return streamNode.stream.defaultCursorField;
}
return streamNode.config.cursorField;
}

const useInitialSchema = (schema: SyncSchema): SyncSchema =>
const useInitialSchema = (schema: SyncSchema, supportedDestinationSyncModes: DestinationSyncMode[]): SyncSchema =>
useMemo<SyncSchema>(
() => ({
streams: schema.streams.map<SyncSchemaStream>((apiNode, id) => {
const nodeWithId: SyncSchemaStream = { ...apiNode, id: id.toString() };
const nodeStream = verifyConfigCursorField(verifySupportedSyncModes(nodeWithId));

// If the value in supportedSyncModes is empty assume the only supported sync mode is FULL_REFRESH.
// Otherwise, it supports whatever sync modes are present.
const streamNode = nodeWithId.stream.supportedSyncModes?.length
? nodeWithId
: setIn(nodeWithId, "stream.supportedSyncModes", [SyncMode.FullRefresh]);

// If syncMode isn't null - don't change item
if (streamNode.config.syncMode) {
return streamNode;
}

const updateStreamConfig = (config: Partial<AirbyteStreamConfiguration>): SyncSchemaStream => ({
...streamNode,
config: { ...streamNode.config, ...config },
});

const supportedSyncModes = streamNode.stream.supportedSyncModes;

// Prefer INCREMENTAL sync mode over other sync modes
if (supportedSyncModes.includes(SyncMode.Incremental)) {
return updateStreamConfig({
cursorField: streamNode.config.cursorField.length
? streamNode.config.cursorField
: getDefaultCursorField(streamNode),
syncMode: SyncMode.Incremental,
});
}

// If source don't support INCREMENTAL and FULL_REFRESH - set first value from supportedSyncModes list
return updateStreamConfig({
syncMode: streamNode.stream.supportedSyncModes[0],
});
return getOptimalSyncMode(nodeStream, supportedDestinationSyncModes);
}),
}),
[schema.streams]
[schema.streams, supportedDestinationSyncModes]
);

const getInitialTransformations = (operations: Operation[]): Transformation[] => operations.filter(isDbtTransformation);
Expand All @@ -266,7 +224,7 @@ const useInitialValues = (
destDefinition: DestinationDefinitionSpecification,
isEditMode?: boolean
): FormikConnectionFormValues => {
const initialSchema = useInitialSchema(connection.syncCatalog);
const initialSchema = useInitialSchema(connection.syncCatalog, destDefinition.supportedDestinationSyncModes);

return useMemo(() => {
const initialValues: FormikConnectionFormValues = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import { DestinationSyncMode, SyncMode, SyncSchemaStream } from "../../../core/domain/catalog";
import { getOptimalSyncMode, verifyConfigCursorField, verifySupportedSyncModes } from "./formConfigHelpers";

const mockedStreamNode: SyncSchemaStream = {
stream: {
name: "test",
supportedSyncModes: [],
jsonSchema: {},
sourceDefinedCursor: null,
sourceDefinedPrimaryKey: [],
defaultCursorField: [],
},
config: {
cursorField: [],
primaryKey: [],
selected: true,
syncMode: SyncMode.FullRefresh,
destinationSyncMode: DestinationSyncMode.Append,
aliasName: "",
},
id: "1",
};

describe("formConfigHelpers", () => {
describe("verifySupportedSyncModes", () => {
const streamNodeWithDefinedSyncMode: SyncSchemaStream = {
...mockedStreamNode,
stream: { ...mockedStreamNode.stream, supportedSyncModes: [SyncMode.Incremental] },
};

test("should not change supportedSyncModes if it's not empty", () => {
const streamNode = verifySupportedSyncModes(streamNodeWithDefinedSyncMode);

expect(streamNode.stream.supportedSyncModes).toStrictEqual([SyncMode.Incremental]);
});

test("should set default supportedSyncModes if it's empty", () => {
const streamNodeWithEmptySyncMode = {
...streamNodeWithDefinedSyncMode,
stream: { ...mockedStreamNode.stream, supportedSyncModes: [] },
};
const streamNode = verifySupportedSyncModes(streamNodeWithEmptySyncMode);

expect(streamNode.stream.supportedSyncModes).toStrictEqual([SyncMode.FullRefresh]);
});
});

describe("verifyConfigCursorField", () => {
const streamWithDefinedCursorField: SyncSchemaStream = {
...mockedStreamNode,
config: { ...mockedStreamNode.config, cursorField: ["id"] },
stream: { ...mockedStreamNode.stream, defaultCursorField: ["name"] },
};

test("should leave cursorField value as is if it's defined", () => {
const streamNode = verifyConfigCursorField(streamWithDefinedCursorField);

expect(streamNode.config.cursorField).toStrictEqual(["id"]);
});

test("should set defaultCursorField if cursorField is not defined", () => {
const streamNodeWithoutDefinedCursor = {
...streamWithDefinedCursorField,
config: { ...mockedStreamNode.config, cursorField: [] },
};
const streamNode = verifyConfigCursorField(streamNodeWithoutDefinedCursor);

expect(streamNode.config.cursorField).toStrictEqual(["name"]);
});

test("should leave cursorField empty if defaultCursorField not defined", () => {
const streamNode = verifyConfigCursorField(mockedStreamNode);

expect(streamNode.config.cursorField).toStrictEqual([]);
});
});

describe("getOptimalSyncMode", () => {
test("should get 'Incremental(cursor defined) => Append dedup' mode", () => {
const streamNodeWithIncrDedupMode = {
...mockedStreamNode,
stream: { ...mockedStreamNode.stream, supportedSyncModes: [SyncMode.Incremental], sourceDefinedCursor: true },
};
const nodeStream = getOptimalSyncMode(streamNodeWithIncrDedupMode, [DestinationSyncMode.Dedupted]);

expect(nodeStream.config.syncMode).toBe(SyncMode.Incremental);
expect(nodeStream.config.destinationSyncMode).toBe(DestinationSyncMode.Dedupted);
});

test("should get 'FullRefresh => Overwrite' mode", () => {
const nodeStream = getOptimalSyncMode(mockedStreamNode, [DestinationSyncMode.Overwrite]);

expect(nodeStream.config.syncMode).toBe(SyncMode.FullRefresh);
expect(nodeStream.config.destinationSyncMode).toBe(DestinationSyncMode.Overwrite);
});

test("should get 'Incremental => Append' mode", () => {
const streamNodeWithIncrAppendMode = {
...mockedStreamNode,
stream: { ...mockedStreamNode.stream, supportedSyncModes: [SyncMode.Incremental] },
};
const nodeStream = getOptimalSyncMode(streamNodeWithIncrAppendMode, [DestinationSyncMode.Append]);

expect(nodeStream.config.syncMode).toBe(SyncMode.Incremental);
expect(nodeStream.config.destinationSyncMode).toBe(DestinationSyncMode.Append);
});

test("should get 'FullRefresh => Append' mode", () => {
const nodeStream = getOptimalSyncMode(mockedStreamNode, [DestinationSyncMode.Append]);

expect(nodeStream.config.syncMode).toBe(SyncMode.FullRefresh);
expect(nodeStream.config.destinationSyncMode).toBe(DestinationSyncMode.Append);
});
test("should return untouched nodeStream", () => {
const nodeStream = getOptimalSyncMode(mockedStreamNode, []);

expect(nodeStream).toBe(mockedStreamNode);
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import {
AirbyteStreamConfiguration,
DestinationSyncMode,
SyncMode,
SyncSchemaStream,
} from "../../../core/domain/catalog";

const getDefaultCursorField = (streamNode: SyncSchemaStream): string[] => {
if (streamNode.stream.defaultCursorField.length) {
return streamNode.stream.defaultCursorField;
}
return streamNode.config.cursorField;
};

export const verifySupportedSyncModes = (streamNode: SyncSchemaStream): SyncSchemaStream => {
const {
stream: { supportedSyncModes },
} = streamNode;

if (supportedSyncModes?.length) return streamNode;
return { ...streamNode, stream: { ...streamNode.stream, supportedSyncModes: [SyncMode.FullRefresh] } };
};

export const verifyConfigCursorField = (streamNode: SyncSchemaStream): SyncSchemaStream => {
const { config } = streamNode;

return {
...streamNode,
config: {
...config,
cursorField: config.cursorField?.length ? config.cursorField : getDefaultCursorField(streamNode),
},
};
};

export const getOptimalSyncMode = (
streamNode: SyncSchemaStream,
supportedDestinationSyncModes: DestinationSyncMode[]
): SyncSchemaStream => {
const updateStreamConfig = (
config: Pick<AirbyteStreamConfiguration, "syncMode" | "destinationSyncMode">
): SyncSchemaStream => ({
...streamNode,
config: { ...streamNode.config, ...config },
});

const {
stream: { supportedSyncModes, sourceDefinedCursor },
} = streamNode;

if (
supportedSyncModes.includes(SyncMode.Incremental) &&
supportedDestinationSyncModes.includes(DestinationSyncMode.Dedupted) &&
sourceDefinedCursor
) {
return updateStreamConfig({
syncMode: SyncMode.Incremental,
destinationSyncMode: DestinationSyncMode.Dedupted,
});
}

if (supportedDestinationSyncModes.includes(DestinationSyncMode.Overwrite)) {
return updateStreamConfig({
syncMode: SyncMode.FullRefresh,
destinationSyncMode: DestinationSyncMode.Overwrite,
});
}

if (
supportedSyncModes.includes(SyncMode.Incremental) &&
supportedDestinationSyncModes.includes(DestinationSyncMode.Append)
) {
return updateStreamConfig({
syncMode: SyncMode.Incremental,
destinationSyncMode: DestinationSyncMode.Append,
});
}

if (supportedDestinationSyncModes.includes(DestinationSyncMode.Append)) {
return updateStreamConfig({
syncMode: SyncMode.FullRefresh,
destinationSyncMode: DestinationSyncMode.Append,
});
}
return streamNode;
};

0 comments on commit cea2f7a

Please sign in to comment.