Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Connector-V2][File] Fix the bug of incorrect path in windows environment #2980

Merged
merged 9 commits into from
Oct 8, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -237,7 +236,7 @@ public void beginTransaction(Long checkpointId) {
*/
public List<String> getTransactionIdFromStates(List<FileSinkState> fileStates) {
String[] pathSegments = new String[]{textFileSinkConfig.getPath(), Constant.SEATUNNEL, jobId};
String jobDir = String.join(File.separator, pathSegments) + "/";
String jobDir = String.join("/", pathSegments) + "/";
TyrantLucifer marked this conversation as resolved.
Show resolved Hide resolved
try {
List<String> transactionDirList = FileSystemUtils.dirList(jobDir).stream().map(Path::toString).collect(Collectors.toList());
return transactionDirList.stream().map(dir -> dir.replaceAll(jobDir, "")).collect(Collectors.toList());
Expand Down Expand Up @@ -267,7 +266,7 @@ public List<FileSinkState> snapshotState(long checkpointId) {
*/
private String getTransactionDir(@NonNull String transactionId) {
String[] strings = new String[]{textFileSinkConfig.getTmpPath(), Constant.SEATUNNEL, jobId, transactionId};
return String.join(File.separator, strings);
return String.join("/", strings);
}

public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow seaTunnelRow) {
Expand All @@ -279,7 +278,7 @@ public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow seaTunnelRow
return beingWrittenFilePath;
} else {
String[] pathSegments = new String[]{transactionDirectory, beingWrittenFileKey, generateFileName(transactionId)};
String newBeingWrittenFilePath = String.join(File.separator, pathSegments);
String newBeingWrittenFilePath = String.join("/", pathSegments);
beingWrittenFile.put(beingWrittenFileKey, newBeingWrittenFilePath);
if (!Constant.NON_PARTITION.equals(dataPartitionDirAndValuesMap.keySet().toArray()[0].toString())){
partitionDirAndValuesMap.putAll(dataPartitionDirAndValuesMap);
Expand All @@ -290,6 +289,6 @@ public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow seaTunnelRow

public String getTargetLocation(@NonNull String seaTunnelFilePath) {
String tmpPath = seaTunnelFilePath.replaceAll(transactionDirectory, textFileSinkConfig.getPath());
return tmpPath.replaceAll(Constant.NON_PARTITION + File.separator, "");
return tmpPath.replaceAll(Constant.NON_PARTITION + "/", "");
}
}