diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java index 3f8a203d22f..32b4b040aad 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java @@ -22,14 +22,20 @@ import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter; import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; +import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy; +import org.apache.hadoop.fs.Path; + import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; public class BaseFileSinkWriter implements SinkWriter { private final WriteStrategy writeStrategy; @@ -46,8 +52,27 @@ public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, Si this.subTaskIndex = context.getIndexOfSubtask(); writeStrategy.init(hadoopConf, jobId, subTaskIndex); if (!fileSinkStates.isEmpty()) { - List transactionIds = writeStrategy.getTransactionIdFromStates(fileSinkStates); - transactionIds.forEach(writeStrategy::abortPrepare); + try { + List paths = FileSystemUtils.dirList(writeStrategy.getFileSinkConfig().getTmpPath()); + List transactions = paths.stream().map(Path::getName).collect(Collectors.toList()); + FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(hadoopConf); + for (FileSinkState fileSinkState : fileSinkStates) { + if (transactions.contains(fileSinkState.getTransactionId())) { + // need commit + FileAggregatedCommitInfo fileCommitInfo = fileSinkAggregatedCommitter + .combine(Collections.singletonList(new FileCommitInfo(fileSinkState.getNeedMoveFiles(), + fileSinkState.getPartitionDirAndValuesMap(), + fileSinkState.getTransactionDir()))); + fileSinkAggregatedCommitter.commit(Collections.singletonList(fileCommitInfo)); + } else { + // need abort + writeStrategy.abortPrepare(fileSinkState.getTransactionId()); + } + } + } catch (IOException e) { + String errorMsg = String.format("Try to process these fileStates %s failed", fileSinkStates); + throw new FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e); + } writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId() + 1); } else { writeStrategy.beginTransaction(1L); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java index 3b0ed862e33..5e48496d425 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java @@ -21,10 +21,15 @@ import lombok.Data; import java.io.Serializable; +import java.util.List; +import java.util.Map; @Data @AllArgsConstructor public class FileSinkState implements Serializable { private final String transactionId; private final Long checkpointId; + private final Map needMoveFiles; + private final Map> partitionDirAndValuesMap; + private final String transactionDir; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java index a3c5e45c9e6..9ce7673aafb 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java @@ -37,6 +37,7 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; import com.google.common.collect.Lists; +import lombok.Getter; import lombok.NonNull; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -60,6 +61,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy { protected final Logger log = LoggerFactory.getLogger(this.getClass()); + @Getter protected final TextFileSinkConfig textFileSinkConfig; protected final List sinkColumnsIndexInRow; protected String jobId; @@ -286,7 +288,12 @@ public List getTransactionIdFromStates(List fileStates) { */ @Override public List snapshotState(long checkpointId) { - ArrayList fileState = Lists.newArrayList(new FileSinkState(this.transactionId, this.checkpointId)); + Map> commitMap = this.partitionDirAndValuesMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new ArrayList<>(e.getValue()))); + ArrayList fileState = Lists.newArrayList(new FileSinkState(this.transactionId, + this.checkpointId, new HashMap<>(this.needMoveFiles), + commitMap, this.getTransactionDir(transactionId))); + this.beingWrittenFile.clear(); this.beginTransaction(checkpointId + 1); return fileState; } @@ -325,7 +332,13 @@ public String getTargetLocation(@NonNull String seaTunnelFilePath) { return tmpPath.replaceAll(BaseSinkConfig.NON_PARTITION + Matcher.quoteReplacement(File.separator), ""); } + @Override public long getCheckpointId() { return this.checkpointId; } + + @Override + public TextFileSinkConfig getFileSinkConfig() { + return textFileSinkConfig; + } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java index c1370345c2e..e6d299530ba 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig; import org.apache.hadoop.conf.Configuration; @@ -75,4 +76,6 @@ public interface WriteStrategy extends Transaction, Serializable { void finishAndCloseFile(); long getCheckpointId(); + + TextFileSinkConfig getFileSinkConfig(); }