Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] LocalFile sink support multiple table #5931

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docs/en/connector-v2/sink/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ By default, we use 2PC commit to ensure `exactly-once`

### path [string]

The target dir path is required.
The target dir path is required, you can inject the upstream CatalogTable into the path by using: `${database_name}`, `${table_name}` and `${schema_name}`.

### custom_filename [boolean]

Expand Down Expand Up @@ -237,6 +237,18 @@ LocalFile {

```

For extract source metadata from upstream, you can use `${database_name}`, `${table_name}` and `${schema_name}` in the path.

```bash

LocalFile {
path = "/tmp/hive/warehouse/${table_name}"
file_format_type = "parquet"
sink_columns = ["name","age"]
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multi-table and single-table are the same

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple table is same with single table, but it can change the path by inject the table name from catalogTable(Although single table can also do this)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update For multiple table to For extract source metadata

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For extract source metadata LGTM, I changed the doc.


```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,18 @@ public void validate(OptionRule rule) {
List<RequiredOption> requiredOptions = rule.getRequiredOptions();
for (RequiredOption requiredOption : requiredOptions) {
validate(requiredOption);
requiredOption
.getOptions()
.forEach(
option -> {
if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) {
validateSingleChoice(option);
}
});

for (Option<?> option : requiredOption.getOptions()) {
if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) {
// is required option and not match condition, skip validate
if (isConditionOption(requiredOption)
&& !matchCondition(
(RequiredOption.ConditionalRequiredOptions) requiredOption)) {
continue;
}
validateSingleChoice(option);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix we have some options which have the same key but different excepted values, which will be set under different condition.
e.g. compress_code in file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense to me.

}

for (Option option : rule.getOptionalOptions()) {
Expand All @@ -74,15 +78,15 @@ void validateSingleChoice(Option option) {
Object o = singleChoiceOption.defaultValue();
if (o != null && !optionValues.contains(o)) {
throw new OptionValidationException(
"These options(%s) are SingleChoiceOption, the defaultValue(%s) must be one of the optionValues.",
getOptionKeys(Arrays.asList(singleChoiceOption)), o);
"These options(%s) are SingleChoiceOption, the defaultValue(%s) must be one of the optionValues(%s).",
getOptionKeys(Arrays.asList(singleChoiceOption)), o, optionValues);
}

Object value = config.get(option);
if (value != null && !optionValues.contains(value)) {
throw new OptionValidationException(
"These options(%s) are SingleChoiceOption, the value(%s) must be one of the optionValues.",
getOptionKeys(Arrays.asList(singleChoiceOption)), value);
"These options(%s) are SingleChoiceOption, the value(%s) must be one of the optionValues(%s).",
getOptionKeys(Arrays.asList(singleChoiceOption)), value, optionValues);
}
}

Expand All @@ -99,7 +103,7 @@ void validate(RequiredOption requiredOption) {
validate((RequiredOption.ExclusiveRequiredOptions) requiredOption);
return;
}
if (requiredOption instanceof RequiredOption.ConditionalRequiredOptions) {
if (isConditionOption(requiredOption)) {
validate((RequiredOption.ConditionalRequiredOptions) requiredOption);
return;
}
Expand Down Expand Up @@ -181,8 +185,7 @@ void validate(RequiredOption.ExclusiveRequiredOptions exclusiveRequiredOptions)
}

void validate(RequiredOption.ConditionalRequiredOptions conditionalRequiredOptions) {
Expression expression = conditionalRequiredOptions.getExpression();
boolean match = validate(expression);
boolean match = matchCondition(conditionalRequiredOptions);
if (!match) {
return;
}
Expand All @@ -193,7 +196,8 @@ void validate(RequiredOption.ConditionalRequiredOptions conditionalRequiredOptio
}
throw new OptionValidationException(
"There are unconfigured options, the options(%s) are required because [%s] is true.",
getOptionKeys(absentOptions), expression.toString());
getOptionKeys(absentOptions),
conditionalRequiredOptions.getExpression().toString());
}

private boolean validate(Expression expression) {
Expand Down Expand Up @@ -222,4 +226,14 @@ private <T> boolean validate(Condition<T> condition) {
return match || validate(condition.getNext());
}
}

private boolean isConditionOption(RequiredOption requiredOption) {
return requiredOption instanceof RequiredOption.ConditionalRequiredOptions;
}

private boolean matchCondition(
RequiredOption.ConditionalRequiredOptions conditionalRequiredOptions) {
Expression expression = conditionalRequiredOptions.getExpression();
return validate(expression);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ public static CatalogTable buildWithConfig(String catalogName, ReadonlyConfig re
schemaConfig.get(
TableSchemaOptions.TableIdentifierOptions.SCHEMA_FIRST));
} else {
tablePath = TablePath.EMPTY;
Optional<String> resultTableNameOptional =
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME);
tablePath = resultTableNameOptional.map(TablePath::of).orElse(TablePath.EMPTY);
}

return CatalogTable.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public void testSingleChoiceOptionDefaultValueValidator() {
config.put(SINGLE_CHOICE_TEST.key(), "A");
Executable executable = () -> validate(config, optionRule);
assertEquals(
"ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the defaultValue(M) must be one of the optionValues.",
"ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the defaultValue(M) must be one of the optionValues([A, B, C]).",
assertThrows(OptionValidationException.class, executable).getMessage());
}

Expand All @@ -290,7 +290,7 @@ public void testSingleChoiceOptionValueValidator() {
config.put(SINGLE_CHOICE_VALUE_TEST.key(), "N");
executable = () -> validate(config, optionRule);
assertEquals(
"ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the value(N) must be one of the optionValues.",
"ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the value(N) must be one of the optionValues([A, B, C]).",
assertThrows(OptionValidationException.class, executable).getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,94 @@

package org.apache.seatunnel.connectors.seatunnel.file.local.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
import org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter.LocalFileSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;

import java.util.List;
import java.util.Optional;

import com.google.auto.service.AutoService;
public class LocalFileSink
ruanwenjun marked this conversation as resolved.
Show resolved Hide resolved
implements SeaTunnelSink<
SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>,
SupportMultiTableSink {
Comment on lines +46 to +49
Copy link
Member

@Hisoka-X Hisoka-X Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like there is some fragmentation. LocalFileSink is not based onBaseFileSink, but other file-related connectors are based on BaseFileSink. Do we have plans to unify all file-related connectors? Can you create an issue? It is best to mark BaseFileSink as deprecated in the code and link to the issue. cc @TyrantLucifer

Copy link
Member Author

@ruanwenjun ruanwenjun Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it may cause a lot of file changes if we want to make all file connector can support MultipleTable in one PR, since all file connectors doesn't use the new Factory API.So this pr only modify LocalFileSink, after we make all File connector support MultipleTable, then we can consider add a new common interface BaseMultipleTableFileSink(I am not clear if we need to add this class).

Create #5970 to describe this.


@AutoService(SeaTunnelSink.class)
public class LocalFileSink extends BaseFileSink {
private final HadoopConf hadoopConf;
private final FileSystemUtils fileSystemUtils;
private final FileSinkConfig fileSinkConfig;
private final WriteStrategy writeStrategy;
private String jobId;

public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
this.hadoopConf = new LocalFileHadoopConf();
this.fileSinkConfig =
new FileSinkConfig(readonlyConfig.toConfig(), catalogTable.getSeaTunnelRowType());
this.writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
this.fileSystemUtils = new FileSystemUtils(hadoopConf);
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
this.writeStrategy.setFileSystemUtils(fileSystemUtils);
}

@Override
public String getPluginName() {
return FileSystemType.LOCAL.getFileSystemPluginName();
public void setJobContext(JobContext jobContext) {
this.jobId = jobContext.getJobId();
}

@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> restoreWriter(
SinkWriter.Context context, List<FileSinkState> states) {
return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, jobId, states);
}

@Override
public Optional<SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo>>
createAggregatedCommitter() {
return Optional.of(new FileSinkAggregatedCommitter(fileSystemUtils));
}

@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> createWriter(
SinkWriter.Context context) {
return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, jobId);
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
super.prepare(pluginConfig);
hadoopConf = new LocalFileHadoopConf();
public Optional<Serializer<FileCommitInfo>> getCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<Serializer<FileAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public String getPluginName() {
return FileSystemType.LOCAL.getFileSystemPluginName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,32 @@

package org.apache.seatunnel.connectors.seatunnel.file.local.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkReplaceNameConstant;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class LocalFileSinkFactory implements TableSinkFactory {
public class LocalFileSinkFactory
implements TableSinkFactory<
SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo> {
@Override
public String factoryIdentifier() {
return FileSystemType.LOCAL.getFileSystemPluginName();
Expand Down Expand Up @@ -82,4 +97,70 @@ public OptionRule optionRule() {
.optional(BaseSinkConfig.TIME_FORMAT)
.build();
}

@Override
public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>
createSink(TableSinkFactoryContext context) {
ReadonlyConfig readonlyConfig = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();

ReadonlyConfig finalReadonlyConfig =
generateCurrentReadonlyConfig(readonlyConfig, catalogTable);
return () -> new LocalFileSink(finalReadonlyConfig, catalogTable);
}

// replace the table name in sink config's path
private ReadonlyConfig generateCurrentReadonlyConfig(
ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
// Copy the config to avoid modifying the original config
Config config = readonlyConfig.toConfig();

if (config.hasPath(BaseSinkConfig.FILE_PATH.key())) {
String replacedPath =
replaceCatalogTableInPath(
config.getString(BaseSinkConfig.FILE_PATH.key()), catalogTable);
config =
config.withValue(
BaseSinkConfig.FILE_PATH.key(),
ConfigValueFactory.fromAnyRef(replacedPath));
}

if (config.hasPath(BaseSinkConfig.TMP_PATH.key())) {
String replacedPath =
replaceCatalogTableInPath(
config.getString(BaseSinkConfig.TMP_PATH.key()), catalogTable);
config =
config.withValue(
BaseSinkConfig.TMP_PATH.key(),
ConfigValueFactory.fromAnyRef(replacedPath));
}

return ReadonlyConfig.fromConfig(config);
}

private String replaceCatalogTableInPath(String originString, CatalogTable catalogTable) {
String path = originString;
TableIdentifier tableIdentifier = catalogTable.getTableId();
if (tableIdentifier != null) {
if (tableIdentifier.getDatabaseName() != null) {
path =
path.replace(
SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY,
tableIdentifier.getDatabaseName());
}
if (tableIdentifier.getSchemaName() != null) {
path =
path.replace(
SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY,
tableIdentifier.getSchemaName());
}
if (tableIdentifier.getTableName() != null) {
path =
path.replace(
SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY,
tableIdentifier.getTableName());
}
}
return path;
}
}
Loading