Skip to content

Commit

Permalink
[Feature][Connector-V2][File] Improve writer restore
Browse files Browse the repository at this point in the history
  • Loading branch information
TyrantLucifer committed Dec 7, 2022
1 parent cfa0aa1 commit 232957a
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand All @@ -56,17 +57,21 @@ public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, Si
List<Path> paths = FileSystemUtils.dirList(writeStrategy.getFileSinkConfig().getTmpPath());
List<String> transactions = paths.stream().map(Path::getName).collect(Collectors.toList());
FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(hadoopConf);
for (FileSinkState fileSinkState : fileSinkStates) {
if (transactions.contains(fileSinkState.getTransactionId())) {
HashMap<String, FileSinkState> fileStatesMap = new HashMap<>();
fileSinkStates.forEach(fileSinkState ->
fileStatesMap.put(fileSinkState.getTransactionId(), fileSinkState));
for (String transaction : transactions) {
if (fileStatesMap.containsKey(transaction)) {
// need commit
FileSinkState fileSinkState = fileStatesMap.get(transaction);
FileAggregatedCommitInfo fileCommitInfo = fileSinkAggregatedCommitter
.combine(Collections.singletonList(new FileCommitInfo(fileSinkState.getNeedMoveFiles(),
fileSinkState.getPartitionDirAndValuesMap(),
fileSinkState.getTransactionDir())));
fileSinkAggregatedCommitter.commit(Collections.singletonList(fileCommitInfo));
} else {
// need abort
writeStrategy.abortPrepare(fileSinkState.getTransactionId());
writeStrategy.abortPrepare(transaction);
}
}
} catch (IOException e) {
Expand Down

0 comments on commit 232957a

Please sign in to comment.