-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] LocalFile sink support multiple table #5931
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,14 +45,18 @@ public void validate(OptionRule rule) { | |
List<RequiredOption> requiredOptions = rule.getRequiredOptions(); | ||
for (RequiredOption requiredOption : requiredOptions) { | ||
validate(requiredOption); | ||
requiredOption | ||
.getOptions() | ||
.forEach( | ||
option -> { | ||
if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) { | ||
validateSingleChoice(option); | ||
} | ||
}); | ||
|
||
for (Option<?> option : requiredOption.getOptions()) { | ||
if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) { | ||
// is required option and not match condition, skip validate | ||
if (isConditionOption(requiredOption) | ||
&& !matchCondition( | ||
(RequiredOption.ConditionalRequiredOptions) requiredOption)) { | ||
continue; | ||
} | ||
validateSingleChoice(option); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @Hisoka-X There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This fix we have some options which have the same key but different excepted values, which will be set under different condition. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make sense to me. |
||
} | ||
|
||
for (Option option : rule.getOptionalOptions()) { | ||
|
@@ -74,15 +78,15 @@ void validateSingleChoice(Option option) { | |
Object o = singleChoiceOption.defaultValue(); | ||
if (o != null && !optionValues.contains(o)) { | ||
throw new OptionValidationException( | ||
"These options(%s) are SingleChoiceOption, the defaultValue(%s) must be one of the optionValues.", | ||
getOptionKeys(Arrays.asList(singleChoiceOption)), o); | ||
"These options(%s) are SingleChoiceOption, the defaultValue(%s) must be one of the optionValues(%s).", | ||
getOptionKeys(Arrays.asList(singleChoiceOption)), o, optionValues); | ||
} | ||
|
||
Object value = config.get(option); | ||
if (value != null && !optionValues.contains(value)) { | ||
throw new OptionValidationException( | ||
"These options(%s) are SingleChoiceOption, the value(%s) must be one of the optionValues.", | ||
getOptionKeys(Arrays.asList(singleChoiceOption)), value); | ||
"These options(%s) are SingleChoiceOption, the value(%s) must be one of the optionValues(%s).", | ||
getOptionKeys(Arrays.asList(singleChoiceOption)), value, optionValues); | ||
} | ||
} | ||
|
||
|
@@ -99,7 +103,7 @@ void validate(RequiredOption requiredOption) { | |
validate((RequiredOption.ExclusiveRequiredOptions) requiredOption); | ||
return; | ||
} | ||
if (requiredOption instanceof RequiredOption.ConditionalRequiredOptions) { | ||
if (isConditionOption(requiredOption)) { | ||
validate((RequiredOption.ConditionalRequiredOptions) requiredOption); | ||
return; | ||
} | ||
|
@@ -181,8 +185,7 @@ void validate(RequiredOption.ExclusiveRequiredOptions exclusiveRequiredOptions) | |
} | ||
|
||
void validate(RequiredOption.ConditionalRequiredOptions conditionalRequiredOptions) { | ||
Expression expression = conditionalRequiredOptions.getExpression(); | ||
boolean match = validate(expression); | ||
boolean match = matchCondition(conditionalRequiredOptions); | ||
if (!match) { | ||
return; | ||
} | ||
|
@@ -193,7 +196,8 @@ void validate(RequiredOption.ConditionalRequiredOptions conditionalRequiredOptio | |
} | ||
throw new OptionValidationException( | ||
"There are unconfigured options, the options(%s) are required because [%s] is true.", | ||
getOptionKeys(absentOptions), expression.toString()); | ||
getOptionKeys(absentOptions), | ||
conditionalRequiredOptions.getExpression().toString()); | ||
} | ||
|
||
private boolean validate(Expression expression) { | ||
|
@@ -222,4 +226,14 @@ private <T> boolean validate(Condition<T> condition) { | |
return match || validate(condition.getNext()); | ||
} | ||
} | ||
|
||
private boolean isConditionOption(RequiredOption requiredOption) { | ||
return requiredOption instanceof RequiredOption.ConditionalRequiredOptions; | ||
} | ||
|
||
private boolean matchCondition( | ||
RequiredOption.ConditionalRequiredOptions conditionalRequiredOptions) { | ||
Expression expression = conditionalRequiredOptions.getExpression(); | ||
return validate(expression); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,27 +17,94 @@ | |
|
||
package org.apache.seatunnel.connectors.seatunnel.file.local.sink; | ||
|
||
import org.apache.seatunnel.shade.com.typesafe.config.Config; | ||
|
||
import org.apache.seatunnel.api.common.PrepareFailException; | ||
import org.apache.seatunnel.api.common.JobContext; | ||
import org.apache.seatunnel.api.configuration.ReadonlyConfig; | ||
import org.apache.seatunnel.api.serialization.DefaultSerializer; | ||
import org.apache.seatunnel.api.serialization.Serializer; | ||
import org.apache.seatunnel.api.sink.SeaTunnelSink; | ||
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; | ||
import org.apache.seatunnel.api.sink.SinkWriter; | ||
import org.apache.seatunnel.api.sink.SupportMultiTableSink; | ||
import org.apache.seatunnel.api.table.catalog.CatalogTable; | ||
import org.apache.seatunnel.api.table.type.SeaTunnelRow; | ||
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; | ||
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; | ||
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf; | ||
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; | ||
import org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter.LocalFileSinkWriter; | ||
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; | ||
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; | ||
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter; | ||
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; | ||
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; | ||
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; | ||
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy; | ||
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory; | ||
|
||
import java.util.List; | ||
import java.util.Optional; | ||
|
||
import com.google.auto.service.AutoService; | ||
public class LocalFileSink | ||
ruanwenjun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
implements SeaTunnelSink< | ||
SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>, | ||
SupportMultiTableSink { | ||
Comment on lines
+46
to
+49
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels like there is some fragmentation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find it may cause a lot of file changes if we want to make all file connector can support Create #5970 to describe this. |
||
|
||
@AutoService(SeaTunnelSink.class) | ||
public class LocalFileSink extends BaseFileSink { | ||
private final HadoopConf hadoopConf; | ||
private final FileSystemUtils fileSystemUtils; | ||
private final FileSinkConfig fileSinkConfig; | ||
private final WriteStrategy writeStrategy; | ||
private String jobId; | ||
|
||
public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { | ||
this.hadoopConf = new LocalFileHadoopConf(); | ||
this.fileSinkConfig = | ||
new FileSinkConfig(readonlyConfig.toConfig(), catalogTable.getSeaTunnelRowType()); | ||
this.writeStrategy = | ||
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig); | ||
this.fileSystemUtils = new FileSystemUtils(hadoopConf); | ||
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType()); | ||
this.writeStrategy.setFileSystemUtils(fileSystemUtils); | ||
} | ||
|
||
@Override | ||
public String getPluginName() { | ||
return FileSystemType.LOCAL.getFileSystemPluginName(); | ||
public void setJobContext(JobContext jobContext) { | ||
this.jobId = jobContext.getJobId(); | ||
} | ||
|
||
@Override | ||
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> restoreWriter( | ||
SinkWriter.Context context, List<FileSinkState> states) { | ||
return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, jobId, states); | ||
} | ||
|
||
@Override | ||
public Optional<SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo>> | ||
createAggregatedCommitter() { | ||
return Optional.of(new FileSinkAggregatedCommitter(fileSystemUtils)); | ||
} | ||
|
||
@Override | ||
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> createWriter( | ||
SinkWriter.Context context) { | ||
return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, jobId); | ||
} | ||
|
||
@Override | ||
public void prepare(Config pluginConfig) throws PrepareFailException { | ||
super.prepare(pluginConfig); | ||
hadoopConf = new LocalFileHadoopConf(); | ||
public Optional<Serializer<FileCommitInfo>> getCommitInfoSerializer() { | ||
return Optional.of(new DefaultSerializer<>()); | ||
} | ||
|
||
@Override | ||
public Optional<Serializer<FileAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() { | ||
return Optional.of(new DefaultSerializer<>()); | ||
} | ||
|
||
@Override | ||
public Optional<Serializer<FileSinkState>> getWriterStateSerializer() { | ||
return Optional.of(new DefaultSerializer<>()); | ||
} | ||
|
||
@Override | ||
public String getPluginName() { | ||
return FileSystemType.LOCAL.getFileSystemPluginName(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multi-table and single-table are the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple table is same with single table, but it can change the path by inject the table name from catalogTable(Although single table can also do this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update
For multiple table
toFor extract source metadata
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For extract source metadata
LGTM, I changed the doc.