From 8004274769fb7d55ddc0523d546ed3fd71544bd6 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 28 Nov 2023 18:44:12 +0800 Subject: [PATCH] LocalFile sink support multiple table --- docs/en/connector-v2/sink/LocalFile.md | 14 +- .../configuration/util/ConfigValidator.java | 46 ++++--- .../api/table/catalog/CatalogTableUtil.java | 4 +- .../util/ConfigValidatorTest.java | 4 +- .../file/local/sink/LocalFileSink.java | 91 +++++++++++-- .../file/local/sink/LocalFileSinkFactory.java | 83 +++++++++++- .../sink/writter/LocalFileSinkWriter.java | 51 ++++++++ .../local/LocalFileWithMultipleTableIT.java | 9 ++ ...ake_to_local_file_with_multiple_table.conf | 121 ++++++++++++++++++ .../client/LogicalDagGeneratorTest.java | 2 +- .../MultipleTableJobConfigParserTest.java | 11 +- .../dag/execution/ExecutionPlanGenerator.java | 3 +- 12 files changed, 400 insertions(+), 39 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_with_multiple_table.conf diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index d7a183a4ae9..e9d79850512 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -55,7 +55,7 @@ By default, we use 2PC commit to ensure `exactly-once` ### path [string] -The target dir path is required. +The target dir path is required, you can inject the upstream CatalogTable into the path by using: `${database_name}`, `${table_name}` and `${schema_name}`. ### custom_filename [boolean] @@ -237,6 +237,18 @@ LocalFile { ``` +For extract source metadata from upstream, you can use `${database_name}`, `${table_name}` and `${schema_name}` in the path. + +```bash + +LocalFile { + path = "/tmp/hive/warehouse/${table_name}" + file_format_type = "parquet" + sink_columns = ["name","age"] +} + +``` + ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java index f82632b0e17..884e82034d0 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java @@ -45,14 +45,18 @@ public void validate(OptionRule rule) { List requiredOptions = rule.getRequiredOptions(); for (RequiredOption requiredOption : requiredOptions) { validate(requiredOption); - requiredOption - .getOptions() - .forEach( - option -> { - if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) { - validateSingleChoice(option); - } - }); + + for (Option option : requiredOption.getOptions()) { + if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) { + // is required option and not match condition, skip validate + if (isConditionOption(requiredOption) + && !matchCondition( + (RequiredOption.ConditionalRequiredOptions) requiredOption)) { + continue; + } + validateSingleChoice(option); + } + } } for (Option option : rule.getOptionalOptions()) { @@ -74,15 +78,15 @@ void validateSingleChoice(Option option) { Object o = singleChoiceOption.defaultValue(); if (o != null && !optionValues.contains(o)) { throw new OptionValidationException( - "These options(%s) are SingleChoiceOption, the defaultValue(%s) must be one of the optionValues.", - getOptionKeys(Arrays.asList(singleChoiceOption)), o); + "These options(%s) are SingleChoiceOption, the defaultValue(%s) must be one of the optionValues(%s).", + getOptionKeys(Arrays.asList(singleChoiceOption)), o, optionValues); } Object value = config.get(option); if (value != null && !optionValues.contains(value)) { throw new OptionValidationException( - "These options(%s) are SingleChoiceOption, the value(%s) must be one of the optionValues.", - getOptionKeys(Arrays.asList(singleChoiceOption)), value); + "These options(%s) are SingleChoiceOption, the value(%s) must be one of the optionValues(%s).", + getOptionKeys(Arrays.asList(singleChoiceOption)), value, optionValues); } } @@ -99,7 +103,7 @@ void validate(RequiredOption requiredOption) { validate((RequiredOption.ExclusiveRequiredOptions) requiredOption); return; } - if (requiredOption instanceof RequiredOption.ConditionalRequiredOptions) { + if (isConditionOption(requiredOption)) { validate((RequiredOption.ConditionalRequiredOptions) requiredOption); return; } @@ -181,8 +185,7 @@ void validate(RequiredOption.ExclusiveRequiredOptions exclusiveRequiredOptions) } void validate(RequiredOption.ConditionalRequiredOptions conditionalRequiredOptions) { - Expression expression = conditionalRequiredOptions.getExpression(); - boolean match = validate(expression); + boolean match = matchCondition(conditionalRequiredOptions); if (!match) { return; } @@ -193,7 +196,8 @@ void validate(RequiredOption.ConditionalRequiredOptions conditionalRequiredOptio } throw new OptionValidationException( "There are unconfigured options, the options(%s) are required because [%s] is true.", - getOptionKeys(absentOptions), expression.toString()); + getOptionKeys(absentOptions), + conditionalRequiredOptions.getExpression().toString()); } private boolean validate(Expression expression) { @@ -222,4 +226,14 @@ private boolean validate(Condition condition) { return match || validate(condition.getNext()); } } + + private boolean isConditionOption(RequiredOption requiredOption) { + return requiredOption instanceof RequiredOption.ConditionalRequiredOptions; + } + + private boolean matchCondition( + RequiredOption.ConditionalRequiredOptions conditionalRequiredOptions) { + Expression expression = conditionalRequiredOptions.getExpression(); + return validate(expression); + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java index 99c376d330e..6b8d19ea719 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java @@ -213,7 +213,9 @@ public static CatalogTable buildWithConfig(String catalogName, ReadonlyConfig re schemaConfig.get( TableSchemaOptions.TableIdentifierOptions.SCHEMA_FIRST)); } else { - tablePath = TablePath.EMPTY; + Optional resultTableNameOptional = + readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME); + tablePath = resultTableNameOptional.map(TablePath::of).orElse(TablePath.EMPTY); } return CatalogTable.of( diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java index 7b9a3b39df4..18a176b78cf 100644 --- a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java @@ -275,7 +275,7 @@ public void testSingleChoiceOptionDefaultValueValidator() { config.put(SINGLE_CHOICE_TEST.key(), "A"); Executable executable = () -> validate(config, optionRule); assertEquals( - "ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the defaultValue(M) must be one of the optionValues.", + "ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the defaultValue(M) must be one of the optionValues([A, B, C]).", assertThrows(OptionValidationException.class, executable).getMessage()); } @@ -290,7 +290,7 @@ public void testSingleChoiceOptionValueValidator() { config.put(SINGLE_CHOICE_VALUE_TEST.key(), "N"); executable = () -> validate(config, optionRule); assertEquals( - "ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the value(N) must be one of the optionValues.", + "ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the value(N) must be one of the optionValues([A, B, C]).", assertThrows(OptionValidationException.class, executable).getMessage()); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java index 4d8037ef5f1..3222dcd7a71 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java @@ -17,27 +17,94 @@ package org.apache.seatunnel.connectors.seatunnel.file.local.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.serialization.DefaultSerializer; +import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf; -import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; +import org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter.LocalFileSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter; +import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; +import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; +import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy; +import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory; + +import java.util.List; +import java.util.Optional; -import com.google.auto.service.AutoService; +public class LocalFileSink + implements SeaTunnelSink< + SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>, + SupportMultiTableSink { -@AutoService(SeaTunnelSink.class) -public class LocalFileSink extends BaseFileSink { + private final HadoopConf hadoopConf; + private final FileSystemUtils fileSystemUtils; + private final FileSinkConfig fileSinkConfig; + private final WriteStrategy writeStrategy; + private String jobId; + + public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + this.hadoopConf = new LocalFileHadoopConf(); + this.fileSinkConfig = + new FileSinkConfig(readonlyConfig.toConfig(), catalogTable.getSeaTunnelRowType()); + this.writeStrategy = + WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig); + this.fileSystemUtils = new FileSystemUtils(hadoopConf); + this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType()); + this.writeStrategy.setFileSystemUtils(fileSystemUtils); + } @Override - public String getPluginName() { - return FileSystemType.LOCAL.getFileSystemPluginName(); + public void setJobContext(JobContext jobContext) { + this.jobId = jobContext.getJobId(); + } + + @Override + public SinkWriter restoreWriter( + SinkWriter.Context context, List states) { + return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, jobId, states); + } + + @Override + public Optional> + createAggregatedCommitter() { + return Optional.of(new FileSinkAggregatedCommitter(fileSystemUtils)); + } + + @Override + public SinkWriter createWriter( + SinkWriter.Context context) { + return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, jobId); } @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - super.prepare(pluginConfig); - hadoopConf = new LocalFileHadoopConf(); + public Optional> getCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public Optional> getAggregatedCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public Optional> getWriterStateSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public String getPluginName() { + return FileSystemType.LOCAL.getFileSystemPluginName(); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java index b6f5dd50768..d9232f4ddc5 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java @@ -17,17 +17,32 @@ package org.apache.seatunnel.connectors.seatunnel.file.local.sink; +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkReplaceNameConstant; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; import com.google.auto.service.AutoService; @AutoService(Factory.class) -public class LocalFileSinkFactory implements TableSinkFactory { +public class LocalFileSinkFactory + implements TableSinkFactory< + SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo> { @Override public String factoryIdentifier() { return FileSystemType.LOCAL.getFileSystemPluginName(); @@ -82,4 +97,70 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.TIME_FORMAT) .build(); } + + @Override + public TableSink + createSink(TableSinkFactoryContext context) { + ReadonlyConfig readonlyConfig = context.getOptions(); + CatalogTable catalogTable = context.getCatalogTable(); + + ReadonlyConfig finalReadonlyConfig = + generateCurrentReadonlyConfig(readonlyConfig, catalogTable); + return () -> new LocalFileSink(finalReadonlyConfig, catalogTable); + } + + // replace the table name in sink config's path + private ReadonlyConfig generateCurrentReadonlyConfig( + ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + // Copy the config to avoid modifying the original config + Config config = readonlyConfig.toConfig(); + + if (config.hasPath(BaseSinkConfig.FILE_PATH.key())) { + String replacedPath = + replaceCatalogTableInPath( + config.getString(BaseSinkConfig.FILE_PATH.key()), catalogTable); + config = + config.withValue( + BaseSinkConfig.FILE_PATH.key(), + ConfigValueFactory.fromAnyRef(replacedPath)); + } + + if (config.hasPath(BaseSinkConfig.TMP_PATH.key())) { + String replacedPath = + replaceCatalogTableInPath( + config.getString(BaseSinkConfig.TMP_PATH.key()), catalogTable); + config = + config.withValue( + BaseSinkConfig.TMP_PATH.key(), + ConfigValueFactory.fromAnyRef(replacedPath)); + } + + return ReadonlyConfig.fromConfig(config); + } + + private String replaceCatalogTableInPath(String originString, CatalogTable catalogTable) { + String path = originString; + TableIdentifier tableIdentifier = catalogTable.getTableId(); + if (tableIdentifier != null) { + if (tableIdentifier.getDatabaseName() != null) { + path = + path.replace( + SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY, + tableIdentifier.getDatabaseName()); + } + if (tableIdentifier.getSchemaName() != null) { + path = + path.replace( + SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY, + tableIdentifier.getSchemaName()); + } + if (tableIdentifier.getTableName() != null) { + path = + path.replace( + SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY, + tableIdentifier.getTableName()); + } + } + return path; + } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java new file mode 100644 index 00000000000..88de32f8204 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; +import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy; + +import java.util.Collections; +import java.util.List; + +public class LocalFileSinkWriter extends BaseFileSinkWriter + implements SupportMultiTableSinkWriter { + + public LocalFileSinkWriter( + WriteStrategy writeStrategy, + HadoopConf hadoopConf, + Context context, + String jobId, + List fileSinkStates) { + // todo: do we need to set writeStrategy as share resource? then how to deal with the pre + // fileSinkStates? + super(writeStrategy, hadoopConf, context, jobId, fileSinkStates); + } + + public LocalFileSinkWriter( + WriteStrategy writeStrategy, + HadoopConf hadoopConf, + SinkWriter.Context context, + String jobId) { + this(writeStrategy, hadoopConf, context, jobId, Collections.emptyList()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java index 10d1a634296..35f29e635fa 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java @@ -64,8 +64,17 @@ public class LocalFileWithMultipleTableIT extends TestSuiteBase { "/text/e2e.txt", "/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt", container); + + container.execInContainer("mkdir", "-p", "/tmp/fake_empty"); }; + @TestTemplate + public void testFakeToLocalFileInMultipleTableMode_text(TestContainer testContainer) + throws IOException, InterruptedException { + TestHelper helper = new TestHelper(testContainer); + helper.execute("/text/fake_to_local_file_with_multiple_table.conf"); + } + @TestTemplate public void testLocalFileReadAndWriteInMultipleTableMode_excel(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_with_multiple_table.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_with_multiple_table.conf new file mode 100644 index 00000000000..d75c756ba63 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_with_multiple_table.conf @@ -0,0 +1,121 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "fake1" + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + }, + { + schema = { + table = "fake2" + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } + ] + } +} + +sink { + LocalFile { + path = "/tmp/fake_empty/text/${table_name}" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "text" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "lzo" + } +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java index 962e6aab5b1..fc9f2cb72f7 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java @@ -56,7 +56,7 @@ public void testLogicalGenerator() { LogicalDag logicalDag = logicalDagGenerator.generate(); JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson(); String result = - "{\"vertices\":[{\"id\":1,\"name\":\"Source[0]-FakeSource(id=1)\",\"parallelism\":3},{\"id\":2,\"name\":\"Source[1]-FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Sink[0]-LocalFile(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile\"},{\"inputVertex\":\"Source[1]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile\"}]}"; + "{\"vertices\":[{\"id\":1,\"name\":\"Source[0]-FakeSource(id=1)\",\"parallelism\":3},{\"id\":2,\"name\":\"Source[1]-FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Sink[0]-LocalFile-fake(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"},{\"inputVertex\":\"Source[1]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"}]}"; Assertions.assertEquals(result, logicalDagJson.toString()); } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java index 92518ae1b14..083e503d8b3 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java @@ -54,7 +54,7 @@ public void testSimpleJobParse() { ImmutablePair, Set> parse = jobConfigParser.parse(); List actions = parse.getLeft(); Assertions.assertEquals(1, actions.size()); - Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName()); + Assertions.assertEquals("Sink[0]-LocalFile-MultiTableSink", actions.get(0).getName()); Assertions.assertEquals(1, actions.get(0).getUpstream().size()); Assertions.assertEquals( "Source[0]-FakeSource", actions.get(0).getUpstream().get(0).getName()); @@ -75,7 +75,7 @@ public void testComplexJobParse() { List actions = parse.getLeft(); Assertions.assertEquals(1, actions.size()); - Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName()); + Assertions.assertEquals("Sink[0]-LocalFile-fake", actions.get(0).getName()); Assertions.assertEquals(2, actions.get(0).getUpstream().size()); String[] expected = {"Source[0]-FakeSource", "Source[1]-FakeSource"}; @@ -106,8 +106,11 @@ public void testMultipleSinkName() { List actions = parse.getLeft(); Assertions.assertEquals(2, actions.size()); - Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName()); - Assertions.assertEquals("Sink[1]-LocalFile", actions.get(1).getName()); + // This is union sink + Assertions.assertEquals("Sink[0]-LocalFile-fake", actions.get(0).getName()); + + // This is multiple table sink + Assertions.assertEquals("Sink[1]-LocalFile-MultiTableSink", actions.get(1).getName()); } @Test diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java index 955c694e3c5..45795cab5ca 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java @@ -459,7 +459,8 @@ private List generatePipelines(Set executionEdges) { actionCount++; } } - checkArgument(actionNames.size() == actionCount, "Action name is duplicated"); + checkArgument( + actionNames.size() == actionCount, "Action name is duplicated: " + actionNames); return pipelines; }