Skip to content

Commit

Permalink
Upload with ".tmp" suffix and rename file name after upload
Browse files Browse the repository at this point in the history
  • Loading branch information
sakama committed Oct 31, 2017
1 parent 3bf10a2 commit 715a8f8
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 203 deletions.
221 changes: 21 additions & 200 deletions src/main/java/org/embulk/output/sftp/SftpFileOutput.java
Original file line number Diff line number Diff line change
@@ -1,37 +1,24 @@
package org.embulk.output.sftp;

import com.google.common.base.Function;
import com.google.common.base.Throwables;
import org.apache.commons.io.IOUtils;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.FileSystemOptions;
import org.apache.commons.vfs2.impl.StandardFileSystemManager;
import org.apache.commons.vfs2.provider.sftp.IdentityInfo;
import org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder;
import org.embulk.config.ConfigException;
import org.embulk.config.TaskReport;
import org.embulk.spi.Buffer;
import org.embulk.spi.Exec;
import org.embulk.spi.FileOutput;
import org.embulk.spi.TransactionalFileOutput;
import org.embulk.spi.unit.LocalFile;
import org.embulk.spi.util.RetryExecutor.RetryGiveupException;
import org.embulk.spi.util.RetryExecutor.Retryable;
import org.slf4j.Logger;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.embulk.output.sftp.SftpFileOutputPlugin.PluginTask;
import static org.embulk.spi.util.RetryExecutor.retryExecutor;

/**
* Created by takahiro.nakayama on 10/20/15.
Expand All @@ -40,110 +27,26 @@ public class SftpFileOutput
implements FileOutput, TransactionalFileOutput
{
private final Logger logger = Exec.getLogger(SftpFileOutput.class);
private final StandardFileSystemManager manager;
private final FileSystemOptions fsOptions;
private final String userInfo;
private final String host;
private final int port;
private final int maxConnectionRetry;
private final String pathPrefix;
private final String sequenceFormat;
private final String fileNameExtension;

private final int taskIndex;
private final SftpUtils sftpUtils;
private int fileIndex = 0;
private File tempFile;
private BufferedOutputStream localOutput = null;
private List<Map<String, String>> fileList = new ArrayList<>();

private StandardFileSystemManager initializeStandardFileSystemManager()
{
if (!logger.isDebugEnabled()) {
// TODO: change logging format: org.apache.commons.logging.Log
System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.NoOpLog");
}
StandardFileSystemManager manager = new StandardFileSystemManager();
manager.setClassLoader(SftpFileOutput.class.getClassLoader());
try {
manager.init();
}
catch (FileSystemException e) {
logger.error(e.getMessage());
throw new ConfigException(e);
}

return manager;
}

private String initializeUserInfo(PluginTask task)
{
String userInfo = task.getUser();
if (task.getPassword().isPresent()) {
userInfo += ":" + task.getPassword().get();
}
return userInfo;
}

private FileSystemOptions initializeFsOptions(PluginTask task)
{
FileSystemOptions fsOptions = new FileSystemOptions();

try {
SftpFileSystemConfigBuilder builder = SftpFileSystemConfigBuilder.getInstance();
builder.setUserDirIsRoot(fsOptions, task.getUserDirIsRoot());
builder.setTimeout(fsOptions, task.getSftpConnectionTimeout() * 1000);
builder.setStrictHostKeyChecking(fsOptions, "no");
if (task.getSecretKeyFilePath().isPresent()) {
IdentityInfo identityInfo = new IdentityInfo(
new File((task.getSecretKeyFilePath().transform(localFileToPathString()).get())),
task.getSecretKeyPassphrase().getBytes()
);
builder.setIdentityInfo(fsOptions, identityInfo);
logger.info("set identity: {}", task.getSecretKeyFilePath().get());
}

if (task.getProxy().isPresent()) {
ProxyTask proxy = task.getProxy().get();

ProxyTask.ProxyType.setProxyType(builder, fsOptions, proxy.getType());

if (proxy.getHost().isPresent()) {
builder.setProxyHost(fsOptions, proxy.getHost().get());
builder.setProxyPort(fsOptions, proxy.getPort());
}

if (proxy.getUser().isPresent()) {
builder.setProxyUser(fsOptions, proxy.getUser().get());
}

if (proxy.getPassword().isPresent()) {
builder.setProxyPassword(fsOptions, proxy.getPassword().get());
}

if (proxy.getCommand().isPresent()) {
builder.setProxyCommand(fsOptions, proxy.getCommand().get());
}
}
}
catch (FileSystemException e) {
logger.error(e.getMessage());
throw new ConfigException(e);
}

return fsOptions;
}
private final String temporaryFileSuffix = ".tmp";

SftpFileOutput(PluginTask task, int taskIndex)
{
this.manager = initializeStandardFileSystemManager();
this.userInfo = initializeUserInfo(task);
this.fsOptions = initializeFsOptions(task);
this.host = task.getHost();
this.port = task.getPort();
this.maxConnectionRetry = task.getMaxConnectionRetry();
this.pathPrefix = task.getPathPrefix();
this.sequenceFormat = task.getSequenceFormat();
this.fileNameExtension = task.getFileNameExtension();
this.taskIndex = taskIndex;
this.sftpUtils = new SftpUtils(task);
}

@Override
Expand Down Expand Up @@ -179,15 +82,23 @@ public void add(final Buffer buffer)
public void finish()
{
closeCurrentFile();
uploadFile(getOutputFilePath());
String fileName = getOutputFilePath();
String temporaryFileName = fileName + temporaryFileSuffix;
sftpUtils.uploadFile(tempFile, temporaryFileName);

Map<String, String> executedFiles = new HashMap<>();
executedFiles.put("temporary_filename", fileName + temporaryFileSuffix);
executedFiles.put("real_filename", fileName);
fileList.add(executedFiles);
fileIndex++;
}

@Override
public void close()
{
closeCurrentFile();
manager.close();
// TODO
sftpUtils.close();
}

@Override
Expand All @@ -198,7 +109,9 @@ public void abort()
@Override
public TaskReport commit()
{
return Exec.newTaskReport();
TaskReport report = Exec.newTaskReport();
report.set("file_list", fileList);
return report;
}

private void closeCurrentFile()
Expand All @@ -213,100 +126,8 @@ private void closeCurrentFile()
}
}

private Void uploadFile(final String remotePath)
{
try {
return retryExecutor()
.withRetryLimit(maxConnectionRetry)
.withInitialRetryWait(500)
.withMaxRetryWait(30 * 1000)
.runInterruptible(new Retryable<Void>() {
@Override
public Void call() throws IOException
{
FileObject remoteFile = newSftpFile(getSftpFileUri(remotePath));
logger.info("new sftp file: {}", remoteFile.getPublicURIString());
try (BufferedOutputStream outputStream = new BufferedOutputStream(remoteFile.getContent().getOutputStream())) {
try (BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(tempFile))) {
IOUtils.copy(inputStream, outputStream);
}
}
return null;
}

@Override
public boolean isRetryableException(Exception exception)
{
return true;
}

@Override
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException
{
String message = String.format("SFTP output failed. Retrying %d/%d after %d seconds. Message: %s",
retryCount, retryLimit, retryWait / 1000, exception.getMessage());
if (retryCount % 3 == 0) {
logger.warn(message, exception);
}
else {
logger.warn(message);
}
}

@Override
public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException
{
}
});
}
catch (RetryGiveupException ex) {
throw Throwables.propagate(ex.getCause());
}
catch (InterruptedException ex) {
throw Throwables.propagate(ex);
}
}

private URI getSftpFileUri(String remoteFilePath)
{
try {
return new URI("sftp", userInfo, host, port, remoteFilePath, null, null);
}
catch (URISyntaxException e) {
logger.error(e.getMessage());
throw new ConfigException(e);
}
}

private String getOutputFilePath()
{
return pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + fileNameExtension;
}

private FileObject newSftpFile(final URI sftpUri) throws FileSystemException
{
FileObject file = manager.resolveFile(sftpUri.toString(), fsOptions);
if (file.exists()) {
file.delete();
}
if (file.getParent().exists()) {
logger.info("parent directory {} exists there", file.getParent().getPublicURIString());
}
else {
logger.info("trying to create parent directory {}", file.getParent().getPublicURIString());
file.getParent().createFolder();
}
return file;
}

private Function<LocalFile, String> localFileToPathString()
{
return new Function<LocalFile, String>()
{
public String apply(LocalFile file)
{
return file.getPath().toString();
}
};
}
}
15 changes: 12 additions & 3 deletions src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@
import org.embulk.spi.FileOutputPlugin;
import org.embulk.spi.TransactionalFileOutput;
import org.embulk.spi.unit.LocalFile;
import org.slf4j.Logger;

import java.util.List;
import java.util.Map;

public class SftpFileOutputPlugin
implements FileOutputPlugin
{
private Logger logger = Exec.getLogger(SftpFileOutputPlugin.class);

public interface PluginTask
extends Task
{
Expand Down Expand Up @@ -101,6 +99,17 @@ public void cleanup(TaskSource taskSource,
int taskCount,
List<TaskReport> successTaskReports)
{
SftpUtils sftpUtils = new SftpUtils(taskSource.loadTask(PluginTask.class));
for (TaskReport report : successTaskReports) {
List<Map<String, String>> moveFileList = report.get(List.class, "file_list");
for (Map<String, String> pairFiles : moveFileList) {
String temporaryFileName = pairFiles.get("temporary_filename");
String realFileName = pairFiles.get("real_filename");

sftpUtils.renameFile(temporaryFileName, realFileName);
}
}
sftpUtils.close();
}

@Override
Expand Down
Loading

0 comments on commit 715a8f8

Please sign in to comment.