From 97c6154f612e41b33ff88026f4b9f5fef6af6293 Mon Sep 17 00:00:00 2001 From: sakama Date: Thu, 8 Mar 2018 13:32:28 +0900 Subject: [PATCH 1/2] Re-implement file rename feature after upload completed --- .../embulk/output/sftp/SftpFileOutput.java | 7 +--- .../output/sftp/SftpFileOutputPlugin.java | 34 +++++++++++-------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java index 7167074..01c51d3 100644 --- a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java +++ b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java @@ -84,12 +84,7 @@ public void finish() closeCurrentFile(); String fileName = getOutputFilePath(); String temporaryFileName = fileName + temporaryFileSuffix; - /* - #37 causes permission failure while renaming remote file. - https://github.com/embulk/embulk-output-sftp/issues/40 - */ - //sftpUtils.uploadFile(tempFile, temporaryFileName); - sftpUtils.uploadFile(tempFile, fileName); + sftpUtils.uploadFile(tempFile, temporaryFileName); Map executedFiles = new HashMap<>(); executedFiles.put("temporary_filename", temporaryFileName); diff --git a/src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java b/src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java index dce3d94..181b37e 100644 --- a/src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java +++ b/src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java @@ -14,6 +14,7 @@ import org.embulk.spi.unit.LocalFile; import java.util.List; +import java.util.Map; public class SftpFileOutputPlugin implements FileOutputPlugin @@ -98,21 +99,24 @@ public void cleanup(TaskSource taskSource, int taskCount, List successTaskReports) { - /* - #37 causes permission failure while renaming remote file. - https://github.com/embulk/embulk-output-sftp/issues/40 - */ -// SftpUtils sftpUtils = new SftpUtils(taskSource.loadTask(PluginTask.class)); -// for (TaskReport report : successTaskReports) { -// List> moveFileList = report.get(List.class, "file_list"); -// for (Map pairFiles : moveFileList) { -// String temporaryFileName = pairFiles.get("temporary_filename"); -// String realFileName = pairFiles.get("real_filename"); -// -// sftpUtils.renameFile(temporaryFileName, realFileName); -// } -// } -// sftpUtils.close(); + SftpUtils sftpUtils = null; + try { + new SftpUtils(taskSource.loadTask(PluginTask.class)); + for (TaskReport report : successTaskReports) { + List> moveFileList = report.get(List.class, "file_list"); + for (Map pairFiles : moveFileList) { + String temporaryFileName = pairFiles.get("temporary_filename"); + String realFileName = pairFiles.get("real_filename"); + + sftpUtils.renameFile(temporaryFileName, realFileName); + } + } + } + finally { + if (sftpUtils != null) { + sftpUtils.close(); + } + } } @Override From decf5dbabf88b863feda26cec067162c3a7e5a0c Mon Sep 17 00:00:00 2001 From: sakama Date: Fri, 9 Mar 2018 16:45:58 +0900 Subject: [PATCH 2/2] Add rename_file_after_upload option --- .../embulk/output/sftp/SftpFileOutput.java | 9 ++++- .../output/sftp/SftpFileOutputPlugin.java | 36 +++++++++++-------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java index 01c51d3..998b30d 100644 --- a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java +++ b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java @@ -30,6 +30,7 @@ public class SftpFileOutput private final String pathPrefix; private final String sequenceFormat; private final String fileNameExtension; + private final boolean renameFileAfterUpload; private final int taskIndex; private final SftpUtils sftpUtils; @@ -45,6 +46,7 @@ public class SftpFileOutput this.pathPrefix = task.getPathPrefix(); this.sequenceFormat = task.getSequenceFormat(); this.fileNameExtension = task.getFileNameExtension(); + this.renameFileAfterUpload = task.getRenameFileAfterUpload(); this.taskIndex = taskIndex; this.sftpUtils = new SftpUtils(task); } @@ -84,7 +86,12 @@ public void finish() closeCurrentFile(); String fileName = getOutputFilePath(); String temporaryFileName = fileName + temporaryFileSuffix; - sftpUtils.uploadFile(tempFile, temporaryFileName); + if (renameFileAfterUpload) { + sftpUtils.uploadFile(tempFile, temporaryFileName); + } + else { + sftpUtils.uploadFile(tempFile, fileName); + } Map executedFiles = new HashMap<>(); executedFiles.put("temporary_filename", temporaryFileName); diff --git a/src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java b/src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java index 181b37e..60c81db 100644 --- a/src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java +++ b/src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java @@ -70,6 +70,10 @@ public interface PluginTask @Config("proxy") @ConfigDefault("null") public Optional getProxy(); + + @Config("rename_file_after_upload") + @ConfigDefault("false") + public Boolean getRenameFileAfterUpload(); } @Override @@ -99,22 +103,26 @@ public void cleanup(TaskSource taskSource, int taskCount, List successTaskReports) { - SftpUtils sftpUtils = null; - try { - new SftpUtils(taskSource.loadTask(PluginTask.class)); - for (TaskReport report : successTaskReports) { - List> moveFileList = report.get(List.class, "file_list"); - for (Map pairFiles : moveFileList) { - String temporaryFileName = pairFiles.get("temporary_filename"); - String realFileName = pairFiles.get("real_filename"); - - sftpUtils.renameFile(temporaryFileName, realFileName); + PluginTask task = taskSource.loadTask(PluginTask.class); + + if (task.getRenameFileAfterUpload()) { + SftpUtils sftpUtils = null; + try { + sftpUtils = new SftpUtils(task); + for (TaskReport report : successTaskReports) { + List> moveFileList = report.get(List.class, "file_list"); + for (Map pairFiles : moveFileList) { + String temporaryFileName = pairFiles.get("temporary_filename"); + String realFileName = pairFiles.get("real_filename"); + + sftpUtils.renameFile(temporaryFileName, realFileName); + } } } - } - finally { - if (sftpUtils != null) { - sftpUtils.close(); + finally { + if (sftpUtils != null) { + sftpUtils.close(); + } } } }