Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local_buffering & temp_file_threshold configs #50

Merged
merged 40 commits into from
Aug 27, 2018
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
633df74
Add utility class: default as always retry
tvhung83 Aug 14, 2018
40372f0
A time-out Callable implementation, which exposes 2 methods: call (in…
tvhung83 Aug 14, 2018
5beb673
Rename configs
tvhung83 Aug 14, 2018
a3e3eeb
Add `flush()` and `abort()` implementation
tvhung83 Aug 14, 2018
9d7eabb
Rename `TMP_SUFFIX` constant
tvhung83 Aug 14, 2018
8d5a8a3
Adjust `uploadFile()` method: non-retry to preserve idempotence, add …
tvhung83 Aug 14, 2018
344d4cd
Not retry for Auth error
tvhung83 Aug 17, 2018
5ff77dc
Revert to 5 retries limit
tvhung83 Aug 17, 2018
3e4ba17
Fix some logic mistakes: closing resources, rename in `finish`, etc.
tvhung83 Aug 17, 2018
8f6edd4
Add progress watcher and overwrite `add`, `closeCurrentFile`
tvhung83 Aug 17, 2018
df0fa17
Separate `uploadFile` and `appendFile`, increase default time-out to …
tvhung83 Aug 17, 2018
d5057a7
Fix a bug of `task.cancel()`, which keeps the task running forever wh…
tvhung83 Aug 17, 2018
8f664a5
A wrapper class to close resource and time-out if needed
tvhung83 Aug 17, 2018
ac8794c
Default local temp file threshold to 5GiB
tvhung83 Aug 20, 2018
c78538d
Refactor: rearrange in attempt to make workflow cleaner
tvhung83 Aug 20, 2018
f6623ba
Fix bug: close/flush final file, move rename right after closing file
tvhung83 Aug 20, 2018
aa0f97c
Update code comments
tvhung83 Aug 20, 2018
ba26217
Reset `appending` when open next file
tvhung83 Aug 21, 2018
2ee2a04
Increase time-out to 2 minutes
tvhung83 Aug 21, 2018
f6e49d1
Switch to BufferedOutputStream for remote output stream
tvhung83 Aug 21, 2018
3c9d9eb
Fix bug: remote file was not flushed properly
tvhung83 Aug 21, 2018
300df83
Fix bug: close remote output stream instead of file
tvhung83 Aug 21, 2018
03e1eec
Revert name of test class, remove JDK7 from `.travis.yml`
tvhung83 Aug 21, 2018
590d967
Upgrade dist to `trusty` in attempt to make openjdk8 works
tvhung83 Aug 21, 2018
221ac5e
Update `embulk-*` to latest 0.8, add `mockito` for unit test
tvhung83 Aug 23, 2018
c43da7d
Fix checkstyle
tvhung83 Aug 23, 2018
e43e18f
Minor bug fix, open some methods for unit tests
tvhung83 Aug 23, 2018
d563378
Add more unit tests
tvhung83 Aug 23, 2018
0b3bc4f
Minor bug fix, open `writeTimeout` for unit tests
tvhung83 Aug 23, 2018
82356c7
Open `timeout` for testing
tvhung83 Aug 23, 2018
1e62158
Skip closing `remoteFile` because it hangs jobs
tvhung83 Aug 24, 2018
a8bb85b
Remove retry for `resolve` since it's causing nested retries
tvhung83 Aug 24, 2018
37e50b6
Address review comments, also removed an obsolete test
tvhung83 Aug 24, 2018
5c24bf4
Update `embulk-*` to 0.9.7
tvhung83 Aug 24, 2018
bc94c9f
Fix bug: move renaming to `finish()` instead of `closeRemoteFile()`
tvhung83 Aug 24, 2018
85241a4
Revert to non-buffered input stream, because we already read into buffer
tvhung83 Aug 24, 2018
4ae6231
Remove obsolete `callNonInterruptible()` method
tvhung83 Aug 24, 2018
70a5429
Remove obsolete `callNonInterruptible()` method
tvhung83 Aug 24, 2018
7ebdd87
Update unit test of time-out closer
tvhung83 Aug 24, 2018
8b8ca77
Remove unnecessary close (it could cause another hang)
tvhung83 Aug 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
dist: precise
dist: trusty
language: java
jdk:
- openjdk7
- oraclejdk7
- openjdk8
- oraclejdk8
script:
- ./gradlew test
Expand Down
9 changes: 5 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@ sourceCompatibility = 1.7
targetCompatibility = 1.7

dependencies {
compile "org.embulk:embulk-core:0.8.6"
provided "org.embulk:embulk-core:0.8.6"
compile "org.embulk:embulk-core:0.9.7"
provided "org.embulk:embulk-core:0.9.7"
// compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION"
compile "org.apache.commons:commons-vfs2:2.2"
compile "commons-io:commons-io:2.6"
compile "com.jcraft:jsch:0.1.54"
testCompile "junit:junit:4.+"
testCompile "org.embulk:embulk-core:0.8.6:tests"
testCompile "org.embulk:embulk-standards:0.8.6"
testCompile "org.embulk:embulk-core:0.9.7:tests"
testCompile "org.embulk:embulk-standards:0.9.7"
testCompile "org.apache.sshd:apache-sshd:1.1.0+"
testCompile "org.littleshoot:littleproxy:1.1.0-beta1"
testCompile "io.netty:netty-all:4.0.34.Final"
testCompile "org.mockito:mockito-core:2.+"
}

jacocoTestReport {
Expand Down
140 changes: 0 additions & 140 deletions src/main/java/org/embulk/output/sftp/SftpFileOutput.java

This file was deleted.

23 changes: 20 additions & 3 deletions src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.embulk.spi.TransactionalFileOutput;
import org.embulk.spi.unit.LocalFile;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -47,7 +50,7 @@ public interface PluginTask

@Config("user_directory_is_root")
@ConfigDefault("true")
public Boolean getUserDirIsRoot();
public boolean getUserDirIsRoot();

@Config("timeout")
@ConfigDefault("600") // 10 minutes
Expand All @@ -73,7 +76,18 @@ public interface PluginTask

@Config("rename_file_after_upload")
@ConfigDefault("false")
public Boolean getRenameFileAfterUpload();
public boolean getRenameFileAfterUpload();

// if `false`, plugin will use remote file as buffer
@Config("local_buffering")
@ConfigDefault("true")
public boolean getLocalBuffering();

@Min(50L * 1024 * 1024) // 50MiB
@Max(10L * 1024 * 1024 * 1024) // 10GiB
@Config("temp_file_threshold")
@ConfigDefault("5368709120") // 5GiB
public long getTempFileThreshold();
}

@Override
Expand Down Expand Up @@ -141,6 +155,9 @@ public void cleanup(TaskSource taskSource,
public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex)
{
final PluginTask task = taskSource.loadTask(PluginTask.class);
return new SftpFileOutput(task, taskIndex);
if (task.getLocalBuffering()) {
return new SftpLocalFileOutput(task, taskIndex);
}
return new SftpRemoteFileOutput(task, taskIndex);
}
}
Loading