diff --git a/docs/en/connector-v2/source/FakeSource.md b/docs/en/connector-v2/source/FakeSource.md index af89dceadaf..dff5e61bfaa 100644 --- a/docs/en/connector-v2/source/FakeSource.md +++ b/docs/en/connector-v2/source/FakeSource.md @@ -18,45 +18,45 @@ just for some test cases such as type conversion or connector new feature testin ## Source Options -| Name | Type | Required | Default | Description | -|---------------------|----------|----------|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| schema | config | yes | - | Define Schema information | -| rows | config | no | - | The row list of fake data output per degree of parallelism see title `Options rows Case`. | -| row.num | int | no | 5 | The total number of data generated per degree of parallelism | -| split.num | int | no | 1 | the number of splits generated by the enumerator for each degree of parallelism | -| split.read-interval | long | no | 1 | The interval(mills) between two split reads in a reader | -| map.size | int | no | 5 | The size of `map` type that connector generated | -| array.size | int | no | 5 | The size of `array` type that connector generated | -| bytes.length | int | no | 5 | The length of `bytes` type that connector generated | -| string.length | int | no | 5 | The length of `string` type that connector generated | -| string.fake.mode | string | no | range | The fake mode of generating string data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `string.template` option | -| string.template | list | no | - | The template list of string type that connector generated, if user configured it, connector will randomly select an item from the template list | -| tinyint.fake.mode | string | no | range | The fake mode of generating tinyint data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `tinyint.template` option | -| tinyint.min | tinyint | no | 0 | The min value of tinyint data that connector generated | -| tinyint.max | tinyint | no | 127 | The max value of tinyint data that connector generated | -| tinyint.template | list | no | - | The template list of tinyint type that connector generated, if user configured it, connector will randomly select an item from the template list | -| smallint.fake.mode | string | no | range | The fake mode of generating smallint data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `smallint.template` option | -| smallint.min | smallint | no | 0 | The min value of smallint data that connector generated | -| smallint.max | smallint | no | 32767 | The max value of smallint data that connector generated | -| smallint.template | list | no | - | The template list of smallint type that connector generated, if user configured it, connector will randomly select an item from the template list | -| int.fake.template | string | no | range | The fake mode of generating int data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `int.template` option | -| int.min | int | no | 0 | The min value of int data that connector generated | -| int.max | int | no | 0x7fffffff | The max value of int data that connector generated | -| int.template | list | no | - | The template list of int type that connector generated, if user configured it, connector will randomly select an item from the template list | -| bigint.fake.mode | string | no | range | The fake mode of generating bigint data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `bigint.template` option | -| bigint.min | bigint | no | 0 | The min value of bigint data that connector generated | -| bigint.max | bigint | no | 0x7fffffffffffffff | The max value of bigint data that connector generated | -| bigint.template | list | no | - | The template list of bigint type that connector generated, if user configured it, connector will randomly select an item from the template list | -| float.fake.mode | string | no | range | The fake mode of generating float data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `float.template` option | -| float.min | float | no | 0 | The min value of float data that connector generated | -| float.max | float | no | 0x1.fffffeP+127 | The max value of float data that connector generated | -| float.template | list | no | - | The template list of float type that connector generated, if user configured it, connector will randomly select an item from the template list | -| double.fake.mode | string | no | range | The fake mode of generating float data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `double.template` option | -| double.min | double | no | 0 | The min value of double data that connector generated | -| double.max | double | no | 0x1.fffffffffffffP+1023 | The max value of double data that connector generated | -| double.template | list | no | - | The template list of double type that connector generated, if user configured it, connector will randomly select an item from the template list | -| table-names | list | no | - | The table list that connector generated, used to simulate multi-table scenarios.
This option will override the `table` option in the `schema` option. For example, if you configure the `table-names` option as follows, the connector will generate data for the `test.table1` and `test.table2` tables, the `database.schema.table` will be dropFor details, see title `Options table-names Case`. | -| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | +| Name | Type | Required | Default | Description | +|---------------------|----------|----------|-------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| tables_configs | list | no | - | Define Multiple FakeSource, each item can contains the whole fake source config description below | +| schema | config | yes | - | Define Schema information | +| rows | config | no | - | The row list of fake data output per degree of parallelism see title `Options rows Case`. | +| row.num | int | no | 5 | The total number of data generated per degree of parallelism | +| split.num | int | no | 1 | the number of splits generated by the enumerator for each degree of parallelism | +| split.read-interval | long | no | 1 | The interval(mills) between two split reads in a reader | +| map.size | int | no | 5 | The size of `map` type that connector generated | +| array.size | int | no | 5 | The size of `array` type that connector generated | +| bytes.length | int | no | 5 | The length of `bytes` type that connector generated | +| string.length | int | no | 5 | The length of `string` type that connector generated | +| string.fake.mode | string | no | range | The fake mode of generating string data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `string.template` option | +| string.template | list | no | - | The template list of string type that connector generated, if user configured it, connector will randomly select an item from the template list | +| tinyint.fake.mode | string | no | range | The fake mode of generating tinyint data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `tinyint.template` option | +| tinyint.min | tinyint | no | 0 | The min value of tinyint data that connector generated | +| tinyint.max | tinyint | no | 127 | The max value of tinyint data that connector generated | +| tinyint.template | list | no | - | The template list of tinyint type that connector generated, if user configured it, connector will randomly select an item from the template list | +| smallint.fake.mode | string | no | range | The fake mode of generating smallint data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `smallint.template` option | +| smallint.min | smallint | no | 0 | The min value of smallint data that connector generated | +| smallint.max | smallint | no | 32767 | The max value of smallint data that connector generated | +| smallint.template | list | no | - | The template list of smallint type that connector generated, if user configured it, connector will randomly select an item from the template list | +| int.fake.template | string | no | range | The fake mode of generating int data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `int.template` option | +| int.min | int | no | 0 | The min value of int data that connector generated | +| int.max | int | no | 0x7fffffff | The max value of int data that connector generated | +| int.template | list | no | - | The template list of int type that connector generated, if user configured it, connector will randomly select an item from the template list | +| bigint.fake.mode | string | no | range | The fake mode of generating bigint data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `bigint.template` option | +| bigint.min | bigint | no | 0 | The min value of bigint data that connector generated | +| bigint.max | bigint | no | 0x7fffffffffffffff | The max value of bigint data that connector generated | +| bigint.template | list | no | - | The template list of bigint type that connector generated, if user configured it, connector will randomly select an item from the template list | +| float.fake.mode | string | no | range | The fake mode of generating float data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `float.template` option | +| float.min | float | no | 0 | The min value of float data that connector generated | +| float.max | float | no | 0x1.fffffeP+127 | The max value of float data that connector generated | +| float.template | list | no | - | The template list of float type that connector generated, if user configured it, connector will randomly select an item from the template list | +| double.fake.mode | string | no | range | The fake mode of generating float data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `double.template` option | +| double.min | double | no | 0 | The min value of double data that connector generated | +| double.max | double | no | 0x1.fffffffffffffP+1023 | The max value of double data that connector generated | +| double.template | list | no | - | The template list of double type that connector generated, if user configured it, connector will randomly select an item from the template list | +| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | ## Task Example @@ -311,18 +311,38 @@ FakeSource { ```hocon FakeSource { - table-names = ["test.table1", "test.table2"] - schema { - fields { - c_string = string - c_tinyint = tinyint - c_smallint = smallint - c_int = int - c_bigint = bigint - c_float = float - c_double = double + tables_configs = [ + { + row.num = 16 + schema { + table = "test.table1" + fields { + c_string = string + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + } + } + }, + { + row.num = 17 + schema { + table = "test.table2" + fields { + c_string = string + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + } + } } - } + ] } ``` diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java index cec765b3f20..ab1c829536f 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java @@ -17,13 +17,11 @@ package org.apache.seatunnel.connectors.seatunnel.fake.config; -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions; - -import org.apache.seatunnel.api.table.catalog.CatalogOptions; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; -import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException; import lombok.AllArgsConstructor; @@ -33,6 +31,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ARRAY_SIZE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BIGINT_FAKE_MODE; @@ -149,277 +148,257 @@ public class FakeConfig implements Serializable { private List fakeRows; - @Builder.Default private List tableIdentifiers = new ArrayList<>(); + private CatalogTable catalogTable; - // todo: use ReadonlyConfig - public static FakeConfig buildWithConfig(Config config) { + public static FakeConfig buildWithConfig(ReadonlyConfig readonlyConfig) { FakeConfigBuilder builder = FakeConfig.builder(); - if (config.hasPath(ROW_NUM.key())) { - builder.rowNum(config.getInt(ROW_NUM.key())); - } - if (config.hasPath(SPLIT_NUM.key())) { - builder.splitNum(config.getInt(SPLIT_NUM.key())); - } - if (config.hasPath(SPLIT_READ_INTERVAL.key())) { - builder.splitReadInterval(config.getInt(SPLIT_READ_INTERVAL.key())); - } - if (config.hasPath(MAP_SIZE.key())) { - builder.mapSize(config.getInt(MAP_SIZE.key())); - } - if (config.hasPath(ARRAY_SIZE.key())) { - builder.arraySize(config.getInt(ARRAY_SIZE.key())); - } - if (config.hasPath(BYTES_LENGTH.key())) { - builder.bytesLength(config.getInt(BYTES_LENGTH.key())); - } - if (config.hasPath(STRING_LENGTH.key())) { - builder.stringLength(config.getInt(STRING_LENGTH.key())); - } - if (config.hasPath(ROWS.key())) { - List configs = config.getConfigList(ROWS.key()); + builder.rowNum(readonlyConfig.get(ROW_NUM)); + builder.splitNum(readonlyConfig.get(SPLIT_NUM)); + builder.splitReadInterval(readonlyConfig.get(SPLIT_READ_INTERVAL)); + builder.mapSize(readonlyConfig.get(MAP_SIZE)); + builder.arraySize(readonlyConfig.get(ARRAY_SIZE)); + builder.bytesLength(readonlyConfig.get(BYTES_LENGTH)); + builder.stringLength(readonlyConfig.get(STRING_LENGTH)); + + if (readonlyConfig.getOptional(ROWS).isPresent()) { + List> configs = readonlyConfig.get(ROWS); List rows = new ArrayList<>(configs.size()); - ConfigRenderOptions options = ConfigRenderOptions.concise(); - for (Config configItem : configs) { - String fieldsJson = configItem.getValue(RowData.KEY_FIELDS).render(options); - RowData rowData = new RowData(configItem.getString(RowData.KEY_KIND), fieldsJson); + for (Map configItem : configs) { + String fieldsJson = JsonUtils.toJsonString(configItem.get(RowData.KEY_FIELDS)); + RowData rowData = + new RowData(configItem.get(RowData.KEY_KIND).toString(), fieldsJson); rows.add(rowData); } builder.fakeRows(rows); } - if (config.hasPath(STRING_TEMPLATE.key())) { - builder.stringTemplate(config.getStringList(STRING_TEMPLATE.key())); - } - if (config.hasPath(TINYINT_TEMPLATE.key())) { - builder.tinyintTemplate(config.getIntList(TINYINT_TEMPLATE.key())); - } - if (config.hasPath(SMALLINT_TEMPLATE.key())) { - builder.smallintTemplate(config.getIntList(SMALLINT_TEMPLATE.key())); - } - if (config.hasPath(INT_TEMPLATE.key())) { - builder.intTemplate(config.getIntList(INT_TEMPLATE.key())); - } - if (config.hasPath(BIGINT_TEMPLATE.key())) { - builder.bigTemplate(config.getLongList(BIGINT_TEMPLATE.key())); - } - if (config.hasPath(FLOAT_TEMPLATE.key())) { - builder.floatTemplate(config.getDoubleList(FLOAT_TEMPLATE.key())); - } - if (config.hasPath(DOUBLE_TEMPLATE.key())) { - builder.doubleTemplate(config.getDoubleList(DOUBLE_TEMPLATE.key())); - } - if (config.hasPath(DATE_YEAR_TEMPLATE.key())) { - builder.dateYearTemplate(config.getIntList(DATE_YEAR_TEMPLATE.key())); - } - if (config.hasPath(DATE_MONTH_TEMPLATE.key())) { - builder.dateMonthTemplate(config.getIntList(DATE_MONTH_TEMPLATE.key())); - } - if (config.hasPath(DATE_DAY_TEMPLATE.key())) { - builder.dateDayTemplate(config.getIntList(DATE_DAY_TEMPLATE.key())); - } - if (config.hasPath(TIME_HOUR_TEMPLATE.key())) { - builder.timeHourTemplate(config.getIntList(TIME_HOUR_TEMPLATE.key())); - } - if (config.hasPath(TIME_MINUTE_TEMPLATE.key())) { - builder.timeMinuteTemplate(config.getIntList(TIME_MINUTE_TEMPLATE.key())); - } - if (config.hasPath(TIME_SECOND_TEMPLATE.key())) { - builder.timeSecondTemplate(config.getIntList(TIME_SECOND_TEMPLATE.key())); - } - if (config.hasPath(TINYINT_MIN.key())) { - int tinyintMin = config.getInt(TINYINT_MIN.key()); - if (tinyintMin < TINYINT_MIN.defaultValue() - || tinyintMin > TINYINT_MAX.defaultValue()) { - throw new FakeConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - TINYINT_MIN.key() - + " should >= " - + TINYINT_MIN.defaultValue() - + " and <= " - + TINYINT_MAX.defaultValue()); - } - builder.tinyintMin(tinyintMin); - } - if (config.hasPath(TINYINT_MAX.key())) { - int tinyintMax = config.getInt(TINYINT_MAX.key()); - if (tinyintMax < TINYINT_MIN.defaultValue() - || tinyintMax > TINYINT_MAX.defaultValue()) { - throw new FakeConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - TINYINT_MAX.key() - + " should >= " - + TINYINT_MIN.defaultValue() - + " and <= " - + TINYINT_MAX.defaultValue()); - } - builder.tinyintMax(tinyintMax); - } - if (config.hasPath(SMALLINT_MIN.key())) { - int smallintMin = config.getInt(SMALLINT_MIN.key()); - if (smallintMin < SMALLINT_MIN.defaultValue() - || smallintMin > SMALLINT_MAX.defaultValue()) { - throw new FakeConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - SMALLINT_MIN.key() - + " should >= " - + SMALLINT_MIN.defaultValue() - + " and <= " - + SMALLINT_MAX.defaultValue()); - } - builder.smallintMin(smallintMin); - } - if (config.hasPath(SMALLINT_MAX.key())) { - int smallintMax = config.getInt(SMALLINT_MAX.key()); - if (smallintMax < SMALLINT_MIN.defaultValue() - || smallintMax > SMALLINT_MAX.defaultValue()) { - throw new FakeConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - SMALLINT_MAX.key() - + " should >= " - + SMALLINT_MIN.defaultValue() - + " and <= " - + SMALLINT_MAX.defaultValue()); - } - builder.smallintMax(smallintMax); - } - if (config.hasPath(INT_MIN.key())) { - int intMin = config.getInt(INT_MIN.key()); - if (intMin < INT_MIN.defaultValue() || intMin > INT_MAX.defaultValue()) { - throw new FakeConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - INT_MIN.key() - + " should >= " - + INT_MIN.defaultValue() - + " and <= " - + INT_MAX.defaultValue()); - } - builder.intMin(intMin); - } - if (config.hasPath(INT_MAX.key())) { - int intMax = config.getInt(INT_MAX.key()); - if (intMax < INT_MIN.defaultValue() || intMax > INT_MAX.defaultValue()) { - throw new FakeConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - INT_MAX.key() - + " should >= " - + INT_MIN.defaultValue() - + " and <= " - + INT_MAX.defaultValue()); - } - builder.intMax(intMax); - } - if (config.hasPath(BIGINT_MIN.key())) { - long bigintMin = config.getLong(BIGINT_MIN.key()); - if (bigintMin < BIGINT_MIN.defaultValue() || bigintMin > BIGINT_MAX.defaultValue()) { - throw new FakeConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - BIGINT_MIN.key() - + " should >= " - + BIGINT_MIN.defaultValue() - + " and <= " - + BIGINT_MAX.defaultValue()); - } - builder.bigintMin(bigintMin); - } - if (config.hasPath(BIGINT_MAX.key())) { - long bigintMax = config.getLong(BIGINT_MAX.key()); - if (bigintMax < BIGINT_MIN.defaultValue() || bigintMax > BIGINT_MAX.defaultValue()) { - throw new FakeConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - BIGINT_MAX.key() - + " should >= " - + BIGINT_MIN.defaultValue() - + " and <= " - + BIGINT_MAX.defaultValue()); - } - builder.bigintMax(bigintMax); - } - if (config.hasPath(FLOAT_MIN.key())) { - double floatMin = config.getDouble(FLOAT_MIN.key()); - if (floatMin < FLOAT_MIN.defaultValue() || floatMin > FLOAT_MAX.defaultValue()) { - throw new FakeConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - FLOAT_MIN.key() - + " should >= " - + FLOAT_MIN.defaultValue() - + " and <= " - + FLOAT_MAX.defaultValue()); - } - builder.floatMin(floatMin); - } - if (config.hasPath(FLOAT_MAX.key())) { - double floatMax = config.getDouble(FLOAT_MAX.key()); - if (floatMax < FLOAT_MIN.defaultValue() || floatMax > FLOAT_MAX.defaultValue()) { - throw new FakeConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - FLOAT_MAX.key() - + " should >= " - + FLOAT_MIN.defaultValue() - + " and <= " - + FLOAT_MAX.defaultValue()); - } - builder.floatMax(floatMax); - } - if (config.hasPath(DOUBLE_MIN.key())) { - double doubleMin = config.getDouble(DOUBLE_MIN.key()); - if (doubleMin < DOUBLE_MIN.defaultValue() || doubleMin > DOUBLE_MAX.defaultValue()) { - throw new FakeConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - DOUBLE_MIN.key() - + " should >= " - + DOUBLE_MIN.defaultValue() - + " and <= " - + DOUBLE_MAX.defaultValue()); - } - builder.doubleMin(doubleMin); - } - if (config.hasPath(DOUBLE_MAX.key())) { - double doubleMax = config.getDouble(DOUBLE_MAX.key()); - if (doubleMax < DOUBLE_MIN.defaultValue() || doubleMax > DOUBLE_MAX.defaultValue()) { - throw new FakeConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - DOUBLE_MAX.key() - + " should >= " - + DOUBLE_MIN.defaultValue() - + " and <= " - + DOUBLE_MAX.defaultValue()); - } - builder.doubleMax(doubleMax); - } - if (config.hasPath(STRING_FAKE_MODE.key())) { - builder.stringFakeMode( - FakeOption.FakeMode.parse(config.getString(STRING_FAKE_MODE.key()))); - } - if (config.hasPath(TINYINT_FAKE_MODE.key())) { - builder.tinyintFakeMode( - FakeOption.FakeMode.parse(config.getString(TINYINT_FAKE_MODE.key()))); - } - if (config.hasPath(SMALLINT_FAKE_MODE.key())) { - builder.smallintFakeMode( - FakeOption.FakeMode.parse(config.getString(SMALLINT_FAKE_MODE.key()))); - } - if (config.hasPath(INT_FAKE_MODE.key())) { - builder.intFakeMode(FakeOption.FakeMode.parse(config.getString(INT_FAKE_MODE.key()))); - } - if (config.hasPath(BIGINT_FAKE_MODE.key())) { - builder.bigintFakeMode( - FakeOption.FakeMode.parse(config.getString(BIGINT_FAKE_MODE.key()))); - } - if (config.hasPath(FLOAT_FAKE_MODE.key())) { - builder.floatFakeMode( - FakeOption.FakeMode.parse(config.getString(FLOAT_FAKE_MODE.key()))); - } - if (config.hasPath(DOUBLE_FAKE_MODE.key())) { - builder.doubleFakeMode( - FakeOption.FakeMode.parse(config.getString(DOUBLE_FAKE_MODE.key()))); - } - if (config.hasPath(CatalogOptions.TABLE_NAMES.key())) { - List tableNames = config.getStringList(CatalogOptions.TABLE_NAMES.key()); - List tableIdentifiers = new ArrayList<>(tableNames.size()); - for (String tableName : tableNames) { - tableIdentifiers.add(TableIdentifier.of("FakeSource", TablePath.of(tableName))); - } - builder.tableIdentifiers(tableIdentifiers); - } + readonlyConfig.getOptional(STRING_TEMPLATE).ifPresent(builder::stringTemplate); + readonlyConfig.getOptional(TINYINT_TEMPLATE).ifPresent(builder::tinyintTemplate); + readonlyConfig.getOptional(SMALLINT_TEMPLATE).ifPresent(builder::smallintTemplate); + readonlyConfig.getOptional(INT_TEMPLATE).ifPresent(builder::intTemplate); + readonlyConfig.getOptional(BIGINT_TEMPLATE).ifPresent(builder::bigTemplate); + readonlyConfig.getOptional(FLOAT_TEMPLATE).ifPresent(builder::floatTemplate); + readonlyConfig.getOptional(DOUBLE_TEMPLATE).ifPresent(builder::doubleTemplate); + readonlyConfig.getOptional(DATE_YEAR_TEMPLATE).ifPresent(builder::dateYearTemplate); + readonlyConfig.getOptional(DATE_MONTH_TEMPLATE).ifPresent(builder::dateMonthTemplate); + readonlyConfig.getOptional(DATE_DAY_TEMPLATE).ifPresent(builder::dateDayTemplate); + readonlyConfig.getOptional(TIME_HOUR_TEMPLATE).ifPresent(builder::timeHourTemplate); + readonlyConfig.getOptional(TIME_MINUTE_TEMPLATE).ifPresent(builder::timeMinuteTemplate); + readonlyConfig.getOptional(TIME_SECOND_TEMPLATE).ifPresent(builder::timeSecondTemplate); + + readonlyConfig + .getOptional(TINYINT_MIN) + .ifPresent( + tinyintMin -> { + if (tinyintMin < TINYINT_MIN.defaultValue() + || tinyintMin > TINYINT_MAX.defaultValue()) { + throw new FakeConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + TINYINT_MIN.key() + + " should >= " + + TINYINT_MIN.defaultValue() + + " and <= " + + TINYINT_MAX.defaultValue()); + } + builder.tinyintMin(tinyintMin); + }); + + readonlyConfig + .getOptional(TINYINT_MAX) + .ifPresent( + tinyintMax -> { + if (tinyintMax < TINYINT_MIN.defaultValue() + || tinyintMax > TINYINT_MAX.defaultValue()) { + throw new FakeConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + TINYINT_MAX.key() + + " should >= " + + TINYINT_MIN.defaultValue() + + " and <= " + + TINYINT_MAX.defaultValue()); + } + builder.tinyintMax(tinyintMax); + }); + + readonlyConfig + .getOptional(SMALLINT_MIN) + .ifPresent( + smallintMin -> { + if (smallintMin < SMALLINT_MIN.defaultValue() + || smallintMin > SMALLINT_MAX.defaultValue()) { + throw new FakeConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + SMALLINT_MIN.key() + + " should >= " + + SMALLINT_MIN.defaultValue() + + " and <= " + + SMALLINT_MAX.defaultValue()); + } + builder.smallintMin(smallintMin); + }); + + readonlyConfig + .getOptional(SMALLINT_MAX) + .ifPresent( + smallintMax -> { + if (smallintMax < SMALLINT_MIN.defaultValue() + || smallintMax > SMALLINT_MAX.defaultValue()) { + throw new FakeConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + SMALLINT_MAX.key() + + " should >= " + + SMALLINT_MIN.defaultValue() + + " and <= " + + SMALLINT_MAX.defaultValue()); + } + builder.smallintMax(smallintMax); + }); + + readonlyConfig + .getOptional(INT_MIN) + .ifPresent( + intMin -> { + if (intMin < INT_MIN.defaultValue() + || intMin > INT_MAX.defaultValue()) { + throw new FakeConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + INT_MIN.key() + + " should >= " + + INT_MIN.defaultValue() + + " and <= " + + INT_MAX.defaultValue()); + } + builder.intMin(intMin); + }); + + readonlyConfig + .getOptional(INT_MAX) + .ifPresent( + intMax -> { + if (intMax < INT_MIN.defaultValue() + || intMax > INT_MAX.defaultValue()) { + throw new FakeConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + INT_MAX.key() + + " should >= " + + INT_MIN.defaultValue() + + " and <= " + + INT_MAX.defaultValue()); + } + builder.intMax(intMax); + }); + + readonlyConfig + .getOptional(BIGINT_MIN) + .ifPresent( + bigintMin -> { + if (bigintMin < BIGINT_MIN.defaultValue() + || bigintMin > BIGINT_MAX.defaultValue()) { + throw new FakeConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + BIGINT_MIN.key() + + " should >= " + + BIGINT_MIN.defaultValue() + + " and <= " + + BIGINT_MAX.defaultValue()); + } + builder.bigintMin(bigintMin); + }); + + readonlyConfig + .getOptional(BIGINT_MAX) + .ifPresent( + bigintMax -> { + if (bigintMax < BIGINT_MIN.defaultValue() + || bigintMax > BIGINT_MAX.defaultValue()) { + throw new FakeConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + BIGINT_MAX.key() + + " should >= " + + BIGINT_MIN.defaultValue() + + " and <= " + + BIGINT_MAX.defaultValue()); + } + builder.bigintMax(bigintMax); + }); + + readonlyConfig + .getOptional(FLOAT_MIN) + .ifPresent( + floatMin -> { + if (floatMin < FLOAT_MIN.defaultValue() + || floatMin > FLOAT_MAX.defaultValue()) { + throw new FakeConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + FLOAT_MIN.key() + + " should >= " + + FLOAT_MIN.defaultValue() + + " and <= " + + FLOAT_MAX.defaultValue()); + } + builder.floatMin(floatMin); + }); + + readonlyConfig + .getOptional(FLOAT_MAX) + .ifPresent( + floatMax -> { + if (floatMax < FLOAT_MIN.defaultValue() + || floatMax > FLOAT_MAX.defaultValue()) { + throw new FakeConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + FLOAT_MAX.key() + + " should >= " + + FLOAT_MIN.defaultValue() + + " and <= " + + FLOAT_MAX.defaultValue()); + } + builder.floatMax(floatMax); + }); + + readonlyConfig + .getOptional(DOUBLE_MIN) + .ifPresent( + doubleMin -> { + if (doubleMin < DOUBLE_MIN.defaultValue() + || doubleMin > DOUBLE_MAX.defaultValue()) { + throw new FakeConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + DOUBLE_MIN.key() + + " should >= " + + DOUBLE_MIN.defaultValue() + + " and <= " + + DOUBLE_MAX.defaultValue()); + } + builder.doubleMin(doubleMin); + }); + + readonlyConfig + .getOptional(DOUBLE_MAX) + .ifPresent( + doubleMax -> { + if (doubleMax < DOUBLE_MIN.defaultValue() + || doubleMax > DOUBLE_MAX.defaultValue()) { + throw new FakeConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + DOUBLE_MAX.key() + + " should >= " + + DOUBLE_MIN.defaultValue() + + " and <= " + + DOUBLE_MAX.defaultValue()); + } + builder.doubleMax(doubleMax); + }); + + readonlyConfig.getOptional(STRING_FAKE_MODE).ifPresent(builder::stringFakeMode); + readonlyConfig.getOptional(TINYINT_FAKE_MODE).ifPresent(builder::tinyintFakeMode); + readonlyConfig.getOptional(SMALLINT_FAKE_MODE).ifPresent(builder::smallintFakeMode); + readonlyConfig.getOptional(INT_FAKE_MODE).ifPresent(builder::intFakeMode); + readonlyConfig.getOptional(BIGINT_FAKE_MODE).ifPresent(builder::bigintFakeMode); + readonlyConfig.getOptional(FLOAT_FAKE_MODE).ifPresent(builder::floatFakeMode); + readonlyConfig.getOptional(DOUBLE_FAKE_MODE).ifPresent(builder::doubleFakeMode); + + builder.catalogTable(CatalogTableUtil.buildWithConfig("FakeSource", readonlyConfig)); + return builder.build(); } diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java index d16d5c4f593..f9ff49cc41b 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java @@ -17,17 +17,25 @@ package org.apache.seatunnel.connectors.seatunnel.fake.config; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; + import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; import java.util.List; +import java.util.Map; public class FakeOption { - public static final Option> ROWS = + public static final Option>> TABLES_CONFIGS = + Options.key("tables_configs") + .type(new TypeReference>>() {}) + .noDefaultValue() + .withDescription("The multiple table config list of fake source"); + + public static final Option>> ROWS = Options.key("rows") - .listType(SeaTunnelRow.class) + .type(new TypeReference>>() {}) .noDefaultValue() .withDescription("The row list of fake data output per degree of parallelism"); public static final Option ROW_NUM = diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java new file mode 100644 index 00000000000..051d88a88f9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fake.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import org.apache.commons.collections4.CollectionUtils; + +import com.google.common.collect.Lists; +import lombok.Getter; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.List; +import java.util.stream.Collectors; + +public class MultipleTableFakeSourceConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + @Getter private List fakeConfigs; + + public MultipleTableFakeSourceConfig(ReadonlyConfig fakeSourceRootConfig) { + if (fakeSourceRootConfig.getOptional(FakeOption.TABLES_CONFIGS).isPresent()) { + parseFromConfigs(fakeSourceRootConfig); + } else { + parseFromConfig(fakeSourceRootConfig); + } + // validate + if (fakeConfigs.size() > 1) { + List tableNames = + fakeConfigs.stream() + .map(FakeConfig::getCatalogTable) + .map(catalogTable -> catalogTable.getTableId().toTablePath().toString()) + .collect(Collectors.toList()); + if (CollectionUtils.size(tableNames) != new HashSet<>(tableNames).size()) { + throw new IllegalArgumentException("table name: " + tableNames + " must be unique"); + } + } + } + + private void parseFromConfigs(ReadonlyConfig readonlyConfig) { + List readonlyConfigs = + readonlyConfig.getOptional(FakeOption.TABLES_CONFIGS).get().stream() + .map(ReadonlyConfig::fromMap) + .collect(Collectors.toList()); + // Use the config outside if it's not set in sub config + fakeConfigs = + readonlyConfigs.stream() + .map(FakeConfig::buildWithConfig) + .collect(Collectors.toList()); + } + + private void parseFromConfig(ReadonlyConfig readonlyConfig) { + FakeConfig fakeConfig = FakeConfig.buildWithConfig(readonlyConfig); + fakeConfigs = Lists.newArrayList(fakeConfig); + } +} diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java index 0d8b94f90f0..2c9559078fd 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source; -import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; @@ -32,8 +32,6 @@ import org.apache.seatunnel.connectors.seatunnel.fake.utils.FakeDataRandomUtils; import org.apache.seatunnel.format.json.JsonDeserializationSchema; -import org.apache.commons.lang3.RandomUtils; - import java.io.IOException; import java.lang.reflect.Array; import java.util.ArrayList; @@ -41,18 +39,21 @@ import java.util.List; public class FakeDataGenerator { - private final SeaTunnelRowType rowType; + private final CatalogTable catalogTable; private final FakeConfig fakeConfig; private final JsonDeserializationSchema jsonDeserializationSchema; private final FakeDataRandomUtils fakeDataRandomUtils; + private String tableId; - public FakeDataGenerator(SeaTunnelRowType rowType, FakeConfig fakeConfig) { - this.rowType = rowType; + public FakeDataGenerator(FakeConfig fakeConfig) { + this.catalogTable = fakeConfig.getCatalogTable(); + this.tableId = catalogTable.getTableId().toTablePath().toString(); this.fakeConfig = fakeConfig; this.jsonDeserializationSchema = fakeConfig.getFakeRows() == null ? null - : new JsonDeserializationSchema(false, false, rowType); + : new JsonDeserializationSchema( + false, false, catalogTable.getSeaTunnelRowType()); this.fakeDataRandomUtils = new FakeDataRandomUtils(fakeConfig); } @@ -63,6 +64,7 @@ private SeaTunnelRow convertRow(FakeConfig.RowData rowData) { if (rowData.getKind() != null) { seaTunnelRow.setRowKind(RowKind.valueOf(rowData.getKind())); } + seaTunnelRow.setTableId(tableId); return seaTunnelRow; } catch (IOException e) { throw new FakeConnectorException(CommonErrorCodeDeprecated.JSON_OPERATION_FAILED, e); @@ -70,39 +72,35 @@ private SeaTunnelRow convertRow(FakeConfig.RowData rowData) { } private SeaTunnelRow randomRow() { + SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType(); String[] fieldNames = rowType.getFieldNames(); SeaTunnelDataType[] fieldTypes = rowType.getFieldTypes(); List randomRow = new ArrayList<>(fieldNames.length); for (SeaTunnelDataType fieldType : fieldTypes) { randomRow.add(randomColumnValue(fieldType)); } - SeaTunnelRow row = new SeaTunnelRow(randomRow.toArray()); - if (!fakeConfig.getTableIdentifiers().isEmpty()) { - row.setTableId( - fakeConfig - .getTableIdentifiers() - .get(RandomUtils.nextInt(0, fakeConfig.getTableIdentifiers().size())) - .toTablePath() - .toString()); - } - return row; + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(randomRow.toArray()); + seaTunnelRow.setTableId(tableId); + return seaTunnelRow; } /** * @param rowNum The number of pieces of data to be generated by the current task - * @param output Data collection and distribution + * @return The generated data */ - public void collectFakedRows(int rowNum, Collector output) { + public List generateFakedRows(int rowNum) { // Use manual configuration data preferentially + List seaTunnelRows = new ArrayList<>(); if (fakeConfig.getFakeRows() != null) { for (FakeConfig.RowData rowData : fakeConfig.getFakeRows()) { - output.collect(convertRow(rowData)); + seaTunnelRows.add(convertRow(rowData)); } } else { for (int i = 0; i < rowNum; i++) { - output.collect(randomRow()); + seaTunnelRows.add(randomRow()); } } + return seaTunnelRows; } @SuppressWarnings("magicnumber") diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java index d3bf9c6430a..dd312bed108 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java @@ -26,17 +26,12 @@ import org.apache.seatunnel.api.source.SupportColumnProjection; import org.apache.seatunnel.api.source.SupportParallelism; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig; +import org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig; import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState; -import org.apache.commons.collections4.CollectionUtils; - -import com.google.common.collect.Lists; - import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -47,14 +42,10 @@ public class FakeSource SupportColumnProjection { private JobContext jobContext; - private CatalogTable catalogTable; - private FakeConfig fakeConfig; - - public FakeSource() {} + private final MultipleTableFakeSourceConfig multipleTableFakeSourceConfig; public FakeSource(ReadonlyConfig readonlyConfig) { - this.catalogTable = CatalogTableUtil.buildWithConfig(getPluginName(), readonlyConfig); - this.fakeConfig = FakeConfig.buildWithConfig(readonlyConfig.toConfig()); + this.multipleTableFakeSourceConfig = new MultipleTableFakeSourceConfig(readonlyConfig); } @Override @@ -66,29 +57,16 @@ public Boundedness getBoundedness() { @Override public List getProducedCatalogTables() { - // If tableNames is empty, means this is only one catalogTable, return the original - // catalogTable - if (CollectionUtils.isEmpty(fakeConfig.getTableIdentifiers())) { - return Lists.newArrayList(catalogTable); - } - // Otherwise, return the catalogTables with the tableNames - return fakeConfig.getTableIdentifiers().stream() - .map( - tableIdentifier -> - CatalogTable.of( - TableIdentifier.of( - getPluginName(), tableIdentifier.toTablePath()), - catalogTable.getTableSchema(), - catalogTable.getOptions(), - catalogTable.getPartitionKeys(), - catalogTable.getComment())) + return multipleTableFakeSourceConfig.getFakeConfigs().stream() + .map(FakeConfig::getCatalogTable) .collect(Collectors.toList()); } @Override public SourceSplitEnumerator createEnumerator( - SourceSplitEnumerator.Context enumeratorContext) throws Exception { - return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig, Collections.emptySet()); + SourceSplitEnumerator.Context enumeratorContext) { + return new FakeSourceSplitEnumerator( + enumeratorContext, multipleTableFakeSourceConfig, Collections.emptySet()); } @Override @@ -96,13 +74,15 @@ public SourceSplitEnumerator restoreEnumerator SourceSplitEnumerator.Context enumeratorContext, FakeSourceState checkpointState) { return new FakeSourceSplitEnumerator( - enumeratorContext, fakeConfig, checkpointState.getAssignedSplits()); + enumeratorContext, + multipleTableFakeSourceConfig, + checkpointState.getAssignedSplits()); } @Override public SourceReader createReader( SourceReader.Context readerContext) { - return new FakeSourceReader(readerContext, catalogTable.getSeaTunnelRowType(), fakeConfig); + return new FakeSourceReader(readerContext, multipleTableFakeSourceConfig); } @Override diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java index 91ef55950c0..1578a015237 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java @@ -20,7 +20,6 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; -import org.apache.seatunnel.api.table.catalog.CatalogOptions; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; @@ -54,6 +53,7 @@ import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_READ_INTERVAL; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_FAKE_MODE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TABLES_CONFIGS; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_HOUR_TEMPLATE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_MINUTE_TEMPLATE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_SECOND_TEMPLATE; @@ -70,7 +70,8 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(TableSchemaOptions.SCHEMA) + .optional(TABLES_CONFIGS) + .optional(TableSchemaOptions.SCHEMA) .optional(STRING_FAKE_MODE) .conditional(STRING_FAKE_MODE, FakeOption.FakeMode.TEMPLATE, STRING_TEMPLATE) .optional(TINYINT_FAKE_MODE) @@ -98,8 +99,7 @@ public OptionRule optionRule() { DATE_DAY_TEMPLATE, TIME_HOUR_TEMPLATE, TIME_MINUTE_TEMPLATE, - TIME_SECOND_TEMPLATE, - CatalogOptions.TABLE_NAMES) + TIME_SECOND_TEMPLATE) .build(); } diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java index 84b49722e90..016f336d37e 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java @@ -21,8 +21,8 @@ import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig; +import org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig; import lombok.extern.slf4j.Slf4j; @@ -30,7 +30,9 @@ import java.util.ArrayList; import java.util.Deque; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.stream.Collectors; @Slf4j public class FakeSourceReader implements SourceReader { @@ -38,16 +40,34 @@ public class FakeSourceReader implements SourceReader splits = new ConcurrentLinkedDeque<>(); - private final FakeConfig config; - private final FakeDataGenerator fakeDataGenerator; + private final MultipleTableFakeSourceConfig multipleTableFakeSourceConfig; + // TableFullName to FakeDataGenerator + private final Map fakeDataGeneratorMap; private volatile boolean noMoreSplit; + private final long minSplitReadInterval; private volatile long latestTimestamp = 0; public FakeSourceReader( - SourceReader.Context context, SeaTunnelRowType rowType, FakeConfig fakeConfig) { + SourceReader.Context context, + MultipleTableFakeSourceConfig multipleTableFakeSourceConfig) { this.context = context; - this.config = fakeConfig; - this.fakeDataGenerator = new FakeDataGenerator(rowType, fakeConfig); + this.multipleTableFakeSourceConfig = multipleTableFakeSourceConfig; + this.fakeDataGeneratorMap = + multipleTableFakeSourceConfig.getFakeConfigs().stream() + .collect( + Collectors.toMap( + fakeConfig -> + fakeConfig + .getCatalogTable() + .getTableId() + .toTablePath() + .toString(), + FakeDataGenerator::new)); + this.minSplitReadInterval = + multipleTableFakeSourceConfig.getFakeConfigs().stream() + .map(FakeConfig::getSplitReadInterval) + .min(Integer::compareTo) + .get(); } @Override @@ -64,19 +84,23 @@ public void close() { @SuppressWarnings("MagicNumber") public void pollNext(Collector output) throws InterruptedException { long currentTimestamp = Instant.now().toEpochMilli(); - if (currentTimestamp <= latestTimestamp + config.getSplitReadInterval()) { + if (currentTimestamp <= latestTimestamp + minSplitReadInterval) { return; } latestTimestamp = currentTimestamp; synchronized (output.getCheckpointLock()) { FakeSourceSplit split = splits.poll(); if (null != split) { + FakeDataGenerator fakeDataGenerator = fakeDataGeneratorMap.get(split.getTableId()); // Randomly generated data are sent directly to the downstream operator - fakeDataGenerator.collectFakedRows(split.getRowNum(), output); + List seaTunnelRows = + fakeDataGenerator.generateFakedRows(split.getRowNum()); + seaTunnelRows.forEach(output::collect); log.info( - "{} rows of data have been generated in split({}). Generation time: {}", - split.getRowNum(), + "{} rows of data have been generated in split({}) for table {}. Generation time: {}", + seaTunnelRows.size(), split.splitId(), + split.getTableId(), latestTimestamp); } else { if (!noMoreSplit) { diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java index 1b8e51ba6f2..796b6423f5a 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java @@ -25,12 +25,15 @@ @Data @AllArgsConstructor public class FakeSourceSplit implements SourceSplit { + + private String tableId; + private int splitId; private int rowNum; @Override public String splitId() { - return String.valueOf(splitId); + return tableId + "_" + splitId; } } diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java index 9e9356c1eee..102957d26ea 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig; +import org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig; import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState; import lombok.extern.slf4j.Slf4j; @@ -38,7 +39,7 @@ public class FakeSourceSplitEnumerator private final SourceSplitEnumerator.Context enumeratorContext; private final Map> pendingSplits; - private final FakeConfig fakeConfig; + private final MultipleTableFakeSourceConfig multipleTableFakeSourceConfig; /** Partitions that have been assigned to readers. */ private final Set assignedSplits; @@ -46,11 +47,11 @@ public class FakeSourceSplitEnumerator public FakeSourceSplitEnumerator( SourceSplitEnumerator.Context enumeratorContext, - FakeConfig config, + MultipleTableFakeSourceConfig multipleTableFakeSourceConfig, Set assignedSplits) { this.enumeratorContext = enumeratorContext; this.pendingSplits = new HashMap<>(); - this.fakeConfig = config; + this.multipleTableFakeSourceConfig = multipleTableFakeSourceConfig; this.assignedSplits = new HashSet<>(assignedSplits); } @@ -99,20 +100,29 @@ public FakeSourceState snapshotState(long checkpointId) throws Exception { } @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception {} + public void notifyCheckpointComplete(long checkpointId) {} private void discoverySplits() { Set allSplit = new HashSet<>(); log.info("Starting to calculate splits."); int numReaders = enumeratorContext.currentParallelism(); - int readerRowNum = fakeConfig.getRowNum(); - int splitNum = fakeConfig.getSplitNum(); - int splitRowNum = (int) Math.ceil((double) readerRowNum / splitNum); - for (int i = 0; i < numReaders; i++) { - int index = i; - for (int num = 0; num < readerRowNum; index += numReaders, num += splitRowNum) { - allSplit.add(new FakeSourceSplit(index, Math.min(splitRowNum, readerRowNum - num))); + for (FakeConfig fakeConfig : multipleTableFakeSourceConfig.getFakeConfigs()) { + String tableId = fakeConfig.getCatalogTable().getTableId().toTablePath().toString(); + int readerRowNum = fakeConfig.getRowNum(); + int splitNum = fakeConfig.getSplitNum(); + int splitRowNum = (int) Math.ceil((double) readerRowNum / splitNum); + for (int i = 0; i < numReaders; i++) { + int index = i; + for (int num = 0; num < readerRowNum; index += numReaders, num += splitRowNum) { + allSplit.add( + new FakeSourceSplit( + tableId, index, Math.min(splitRowNum, readerRowNum - num))); + } } + log.info( + "Calculated splits for table {} successfully, the size of splits is {}.", + tableId, + allSplit.size()); } assignedSplits.forEach(allSplit::remove); diff --git a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfigTest.java b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfigTest.java new file mode 100644 index 00000000000..51745e8cfbb --- /dev/null +++ b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfigTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fake.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; + +class MultipleTableFakeSourceConfigTest { + + @Test + void getFakeConfigs() throws URISyntaxException { + URL resource = MultipleTableFakeSourceConfigTest.class.getResource("/multiple_table.conf"); + Config config = ConfigFactory.parseFile(new File(Paths.get(resource.toURI()).toString())); + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config.getConfig("FakeSource")); + MultipleTableFakeSourceConfig multipleTableFakeSourceConfig = + new MultipleTableFakeSourceConfig(readonlyConfig); + Assertions.assertEquals(2, multipleTableFakeSourceConfig.getFakeConfigs().size()); + } +} diff --git a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java index 6e5962e3a75..bf962187f24 100644 --- a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java +++ b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java @@ -20,7 +20,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; -import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -36,7 +36,6 @@ import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -47,26 +46,15 @@ public class FakeDataGeneratorTest { @ValueSource(strings = {"complex.schema.conf", "simple.schema.conf"}) public void testComplexSchemaParse(String conf) throws FileNotFoundException, URISyntaxException { - Config testConfig = getTestConfigFile(conf); + ReadonlyConfig testConfig = getTestConfigFile(conf); SeaTunnelRowType seaTunnelRowType = CatalogTableUtil.buildWithConfig(testConfig).getSeaTunnelRowType(); FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig); - FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(seaTunnelRowType, fakeConfig); - List seaTunnelRows = new ArrayList<>(); - fakeDataGenerator.collectFakedRows( - fakeConfig.getRowNum(), - new Collector() { - @Override - public void collect(SeaTunnelRow record) { - seaTunnelRows.add(record); - } - - @Override - public Object getCheckpointLock() { - throw new UnsupportedOperationException(); - } - }); + FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(fakeConfig); + List seaTunnelRows = + fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum()); Assertions.assertNotNull(seaTunnelRows); + Assertions.assertEquals(seaTunnelRows.size(), 10); for (SeaTunnelRow seaTunnelRow : seaTunnelRows) { for (int i = 0; i < seaTunnelRowType.getFieldTypes().length; i++) { @@ -109,29 +97,15 @@ public void testRowDataParse(String conf) throws FileNotFoundException, URISynta List expected = Arrays.asList(row1, row2, row3, row1UpdateBefore, row1UpdateAfter, row2Delete); - Config testConfig = getTestConfigFile(conf); - SeaTunnelRowType seaTunnelRowType = - CatalogTableUtil.buildWithConfig(testConfig).getSeaTunnelRowType(); + ReadonlyConfig testConfig = getTestConfigFile(conf); FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig); - FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(seaTunnelRowType, fakeConfig); - List seaTunnelRows = new ArrayList<>(); - fakeDataGenerator.collectFakedRows( - fakeConfig.getRowNum(), - new Collector() { - @Override - public void collect(SeaTunnelRow record) { - seaTunnelRows.add(record); - } - - @Override - public Object getCheckpointLock() { - throw new UnsupportedOperationException(); - } - }); + FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(fakeConfig); + List seaTunnelRows = + fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum()); Assertions.assertIterableEquals(expected, seaTunnelRows); } - private Config getTestConfigFile(String configFile) + private ReadonlyConfig getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException { if (!configFile.startsWith("/")) { configFile = "/" + configFile; @@ -143,6 +117,6 @@ private Config getTestConfigFile(String configFile) String path = Paths.get(resource.toURI()).toString(); Config config = ConfigFactory.parseFile(new File(path)); assert config.hasPath("FakeSource"); - return config.getConfig("FakeSource"); + return ReadonlyConfig.fromConfig(config.getConfig("FakeSource")); } } diff --git a/seatunnel-connectors-v2/connector-fake/src/test/resources/multiple_table.conf b/seatunnel-connectors-v2/connector-fake/src/test/resources/multiple_table.conf new file mode 100644 index 00000000000..d42413934e2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-fake/src/test/resources/multiple_table.conf @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FakeSource { + tables_configs = [ + { + row.num = 10 + map.size = 10 + array.size = 10 + bytes.length = 10 + string.length = 10 + schema = { + table = "fake.table1" + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_null = "null" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + }, + { + row.num = 10 + map.size = 10 + array.size = 10 + bytes.length = 10 + string.length = 10 + schema = { + table = "fake.table2" + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_null = "null" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } + ] + result_table_name = "fake" +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java index 2983c05c6cc..110222ae324 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java @@ -209,9 +209,9 @@ public SeaTunnelDataType toSeaTunnelType( case DATE: return LocalTimeType.LOCAL_DATE_TYPE; case DATETIME: - return LocalTimeType.LOCAL_DATE_TIME_TYPE; - case TIMESTAMP: return LocalTimeType.LOCAL_TIME_TYPE; + case TIMESTAMP: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; case BOOLEAN: return BasicType.BOOLEAN_TYPE; case VOID: diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java index 9ef0c2020dc..0859690febb 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java @@ -65,7 +65,7 @@ public static Config of(@NonNull Path filePath) { adapterSupplier .map(adapter -> of(adapter, filePath)) .orElseGet(() -> ofInner(filePath)); - log.info("Parsed config file: {}", config.root().render(CONFIG_RENDER_OPTIONS)); + log.info("Parsed config file: \n{}", config.root().render(CONFIG_RENDER_OPTIONS)); return config; } @@ -84,7 +84,7 @@ public static Config of(@NonNull Map objectMap, boolean isEncryp if (!isEncrypt) { config = ConfigShadeUtils.decryptConfig(config); } - log.info("Parsed config file: {}", config.root().render(CONFIG_RENDER_OPTIONS)); + log.info("Parsed config file: \n{}", config.root().render(CONFIG_RENDER_OPTIONS)); return config; } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf index 32fb751610d..5b526ba337b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf @@ -22,40 +22,80 @@ env { source { FakeSource { - row.num = 100 - table-names = ["test.table1", "test.table2"] - schema = { - columns = [ - { - name = id - type = bigint - } - { - name = name - type = string - } - { - name = age - type = int + tables_configs = [ + { + row.num = 100 + schema = { + table = "test.table1" + columns = [ + { + name = id + type = bigint + } + { + name = name + type = string + } + { + name = age + type = int + } + ] + primaryKey = { + name = "primary key" + columnNames = ["id"] + } + constraintKeys = [ + { + constraintName = "unique_name" + constraintType = UNIQUE_KEY + constraintColumns = [ + { + columnName = "id" + sortType = ASC + } + ] + } + ] } - ] - primaryKey = { - name = "primary key" - columnNames = ["id"] - } - constraintKeys = [ - { - constraintName = "unique_name" - constraintType = UNIQUE_KEY - constraintColumns = [ - { - columnName = "id" - sortType = ASC - } - ] + }, + { + row.num = 100 + schema = { + table = "test.table2" + columns = [ + { + name = id + type = bigint + } + { + name = name + type = string + } + { + name = age + type = int + } + ] + primaryKey = { + name = "primary key" + columnNames = ["id"] + } + constraintKeys = [ + { + constraintName = "unique_name" + constraintType = UNIQUE_KEY + constraintColumns = [ + { + columnName = "id" + sortType = ASC + } + ] + } + ] } - ] - } + } + ] result_table_name = "fake" } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_console_with_multitable_mode.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_console_with_multitable_mode.conf index 22d68207ea7..c7bf0adb100 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_console_with_multitable_mode.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_console_with_multitable_mode.conf @@ -22,24 +22,48 @@ env { source { FakeSource { - row.num = 100 - table-names = ["test.table1", "test.table2"] - schema = { - columns = [ + tables_configs = [ { - name = id - type = bigint - } - { - name = name - type = string - } + row.num = 100 + schema = { + table = "test.table1" + columns = [ + { + name = id + type = bigint + } + { + name = name + type = string + } + { + name = age + type = int + } + ] + } + }, { - name = age - type = int + row.num = 100 + schema = { + table = "test.table2" + columns = [ + { + name = id + type = bigint + } + { + name = name + type = string + } + { + name = age + type = int + } + ] + } } - ] - } + ] result_table_name = "fake" } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java index 092f37f9bc4..466eff946eb 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java @@ -17,9 +17,10 @@ package org.apache.seatunnel.e2e.connector.pulsar; +import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; -import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; @@ -52,12 +53,13 @@ import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import java.io.File; import java.io.IOException; +import java.net.URL; +import java.nio.file.Paths; import java.time.Duration; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -144,35 +146,23 @@ private void initTopic() throws PulsarClientException { private void produceData() { try { - FakeConfig fakeConfig = FakeConfig.buildWithConfig(ConfigFactory.empty()); - FakeDataGenerator fakeDataGenerator = - new FakeDataGenerator(SEATUNNEL_ROW_TYPE, fakeConfig); - SimpleCollector simpleCollector = new SimpleCollector(); - fakeDataGenerator.collectFakedRows(100, simpleCollector); + URL resource = PulsarBatchIT.class.getResource("/fake_source.conf"); + Config config = + ConfigFactory.parseFile(new File(Paths.get(resource.toURI()).toString())); + + FakeConfig fakeConfig = FakeConfig.buildWithConfig(ReadonlyConfig.fromConfig(config)); + FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(fakeConfig); + List seaTunnelRows = fakeDataGenerator.generateFakedRows(100); JsonSerializationSchema jsonSerializationSchema = new JsonSerializationSchema(SEATUNNEL_ROW_TYPE); - for (SeaTunnelRow seaTunnelRow : simpleCollector.getList()) { + for (SeaTunnelRow seaTunnelRow : seaTunnelRows) { producer.send(jsonSerializationSchema.serialize(seaTunnelRow)); } - } catch (PulsarClientException e) { + } catch (Exception e) { throw new RuntimeException("produce data error", e); } } - private static class SimpleCollector implements Collector { - @Getter private List list = new ArrayList<>(); - - @Override - public void collect(SeaTunnelRow record) { - list.add(record); - } - - @Override - public Object getCheckpointLock() { - return null; - } - } - @TestTemplate void testPulsarBatch(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = container.executeJob("/batch_pulsar_to_console.conf"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/fake_source.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/fake_source.conf new file mode 100644 index 00000000000..5fa5b073d7d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/fake_source.conf @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(38, 8)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + + } +} \ No newline at end of file