Skip to content

Commit

Permalink
[Improve][Connector-V2][File] Optimize file commit
Browse files Browse the repository at this point in the history
  • Loading branch information
TyrantLucifer committed Dec 7, 2022
1 parent 5616303 commit cfa0aa1
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;

import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> {
private final WriteStrategy writeStrategy;
Expand All @@ -46,8 +52,27 @@ public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, Si
this.subTaskIndex = context.getIndexOfSubtask();
writeStrategy.init(hadoopConf, jobId, subTaskIndex);
if (!fileSinkStates.isEmpty()) {
List<String> transactionIds = writeStrategy.getTransactionIdFromStates(fileSinkStates);
transactionIds.forEach(writeStrategy::abortPrepare);
try {
List<Path> paths = FileSystemUtils.dirList(writeStrategy.getFileSinkConfig().getTmpPath());
List<String> transactions = paths.stream().map(Path::getName).collect(Collectors.toList());
FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(hadoopConf);
for (FileSinkState fileSinkState : fileSinkStates) {
if (transactions.contains(fileSinkState.getTransactionId())) {
// need commit
FileAggregatedCommitInfo fileCommitInfo = fileSinkAggregatedCommitter
.combine(Collections.singletonList(new FileCommitInfo(fileSinkState.getNeedMoveFiles(),
fileSinkState.getPartitionDirAndValuesMap(),
fileSinkState.getTransactionDir())));
fileSinkAggregatedCommitter.commit(Collections.singletonList(fileCommitInfo));
} else {
// need abort
writeStrategy.abortPrepare(fileSinkState.getTransactionId());
}
}
} catch (IOException e) {
String errorMsg = String.format("Try to process these fileStates %s failed", fileSinkStates);
throw new FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
}
writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId() + 1);
} else {
writeStrategy.beginTransaction(1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@
import lombok.Data;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

@Data
@AllArgsConstructor
public class FileSinkState implements Serializable {
private final String transactionId;
private final Long checkpointId;
private final Map<String, String> needMoveFiles;
private final Map<String, List<String>> partitionDirAndValuesMap;
private final String transactionDir;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;

import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -60,6 +61,7 @@

public abstract class AbstractWriteStrategy implements WriteStrategy {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
@Getter
protected final TextFileSinkConfig textFileSinkConfig;
protected final List<Integer> sinkColumnsIndexInRow;
protected String jobId;
Expand Down Expand Up @@ -286,7 +288,12 @@ public List<String> getTransactionIdFromStates(List<FileSinkState> fileStates) {
*/
@Override
public List<FileSinkState> snapshotState(long checkpointId) {
ArrayList<FileSinkState> fileState = Lists.newArrayList(new FileSinkState(this.transactionId, this.checkpointId));
Map<String, List<String>> commitMap = this.partitionDirAndValuesMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new ArrayList<>(e.getValue())));
ArrayList<FileSinkState> fileState = Lists.newArrayList(new FileSinkState(this.transactionId,
this.checkpointId, new HashMap<>(this.needMoveFiles),
commitMap, this.getTransactionDir(transactionId)));
this.beingWrittenFile.clear();
this.beginTransaction(checkpointId + 1);
return fileState;
}
Expand Down Expand Up @@ -325,7 +332,13 @@ public String getTargetLocation(@NonNull String seaTunnelFilePath) {
return tmpPath.replaceAll(BaseSinkConfig.NON_PARTITION + Matcher.quoteReplacement(File.separator), "");
}

@Override
public long getCheckpointId() {
return this.checkpointId;
}

@Override
public TextFileSinkConfig getFileSinkConfig() {
return textFileSinkConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;

import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -75,4 +76,6 @@ public interface WriteStrategy extends Transaction, Serializable {
void finishAndCloseFile();

long getCheckpointId();

TextFileSinkConfig getFileSinkConfig();
}

0 comments on commit cfa0aa1

Please sign in to comment.