Skip to content

Commit

Permalink
Merge branch 'apache:dev' into add_clickhouse_e2e_test_case
Browse files Browse the repository at this point in the history
  • Loading branch information
FWLamb authored Oct 24, 2022
2 parents 80e8684 + 389872a commit bc0a3d3
Show file tree
Hide file tree
Showing 27 changed files with 664 additions and 156 deletions.
2 changes: 1 addition & 1 deletion docs/en/connector-v2/sink/OssFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
Output data to oss file system.

> Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OSS and this connector need some hadoop dependencies.
> It's only support hadoop version **2.9.X+**.
> It only supports hadoop version **2.9.X+**.
## Key features

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, Si
if (!fileSinkStates.isEmpty()) {
List<String> transactionIds = writeStrategy.getTransactionIdFromStates(fileSinkStates);
transactionIds.forEach(writeStrategy::abortPrepare);
writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId());
writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId() + 1);
} else {
writeStrategy.beginTransaction(1L);
}
}

public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, SinkWriter.Context context, String jobId) {
this(writeStrategy, hadoopConf, context, jobId, Collections.emptyList());
writeStrategy.beginTransaction(1L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public FileAggregatedCommitInfo combine(List<FileCommitInfo> commitInfos) {
*/
@Override
public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws Exception {
log.info("rollback aggregate commit");
if (aggregatedCommitInfos == null || aggregatedCommitInfos.size() == 0) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,19 @@ public static void deleteFile(@NonNull String file) throws IOException {
* @param rmWhenExist if this is true, we will delete the target file when it already exists
* @throws IOException throw IOException
*/
public static void renameFile(@NonNull String oldName, @NonNull String newName, boolean rmWhenExist) throws IOException {
public static void renameFile(@NonNull String oldName, @NonNull String newName, boolean rmWhenExist)
throws IOException {
FileSystem fileSystem = getFileSystem(newName);
log.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]");

Path oldPath = new Path(oldName);
Path newPath = new Path(newName);

if (!fileExist(oldPath.toString())) {
log.warn("rename file :[" + oldPath + "] to [" + newPath + "] already finished in the last commit, skip");
return;
}

if (rmWhenExist) {
if (fileExist(newName) && fileExist(oldName)) {
fileSystem.delete(newPath, true);
Expand Down Expand Up @@ -119,6 +126,9 @@ public static boolean fileExist(@NonNull String filePath) throws IOException {
public static List<Path> dirList(@NonNull String filePath) throws FileNotFoundException, IOException {
FileSystem fileSystem = getFileSystem(filePath);
List<Path> pathList = new ArrayList<>();
if (!fileExist(filePath)) {
return pathList;
}
Path fileName = new Path(filePath);
FileStatus[] status = fileSystem.listStatus(fileName);
if (status != null && status.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
protected Map<String, String> beingWrittenFile;
private Map<String, List<String>> partitionDirAndValuesMap;
protected SeaTunnelRowType seaTunnelRowType;
protected Long checkpointId = 1L;

// Checkpoint id from engine is start with 1
protected Long checkpointId = 0L;

public AbstractWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
this.textFileSinkConfig = textFileSinkConfig;
Expand All @@ -88,7 +90,6 @@ public void init(HadoopConf conf, String jobId, int subTaskIndex) {
this.jobId = jobId;
this.subTaskIndex = subTaskIndex;
FileSystemUtils.CONF = getConfiguration(hadoopConf);
this.beginTransaction(this.checkpointId);
}

/**
Expand Down Expand Up @@ -233,6 +234,7 @@ public void abortPrepare(String transactionId) {
* @param checkpointId checkpoint id
*/
public void beginTransaction(Long checkpointId) {
this.checkpointId = checkpointId;
this.transactionId = "T" + Constant.TRANSACTION_ID_SPLIT + jobId + Constant.TRANSACTION_ID_SPLIT + subTaskIndex + Constant.TRANSACTION_ID_SPLIT + checkpointId;
this.transactionDirectory = getTransactionDir(this.transactionId);
this.needMoveFiles = new HashMap<>();
Expand Down Expand Up @@ -265,8 +267,7 @@ public List<String> getTransactionIdFromStates(List<FileSinkState> fileStates) {
@Override
public List<FileSinkState> snapshotState(long checkpointId) {
ArrayList<FileSinkState> fileState = Lists.newArrayList(new FileSinkState(this.transactionId, this.checkpointId));
this.checkpointId = checkpointId;
this.beginTransaction(checkpointId);
this.beginTransaction(checkpointId + 1);
return fileState;
}

Expand Down Expand Up @@ -303,4 +304,8 @@ public String getTargetLocation(@NonNull String seaTunnelFilePath) {
Matcher.quoteReplacement(textFileSinkConfig.getPath()));
return tmpPath.replaceAll(Constant.NON_PARTITION + Matcher.quoteReplacement(File.separator), "");
}

public long getCheckpointId() {
return this.checkpointId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,6 @@ public interface WriteStrategy extends Transaction, Serializable {
* when a transaction is triggered, release resources
*/
void finishAndCloseFile();

long getCheckpointId();
}
Loading

0 comments on commit bc0a3d3

Please sign in to comment.