-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add local_buffering
& temp_file_threshold
configs
#50
Conversation
…terruptible) and callNonInterruptible
…`deleteFile()` method, `withRetry()` utility function
…5 minutes, add `resolve` and `openStream` (with retry)
…en no exception is thrown
local_buffering
& temp_file_threshold
configslocal_buffering
& temp_file_threshold
configs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, LGTM
Put some minor comments.
build.gradle
Outdated
compile "org.embulk:embulk-core:0.8.6" | ||
provided "org.embulk:embulk-core:0.8.6" | ||
compile "org.embulk:embulk-core:0.8.+" | ||
provided "org.embulk:embulk-core:0.8.+" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed version might better and embulk v0.9.x is ok for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though I'm not very confident with 0.9.x
, but I can try that.
// if `false`, plugin will use remote file as buffer | ||
@Config("local_buffering") | ||
@ConfigDefault("true") | ||
public Boolean getLocalBuffering(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ will update.
@@ -74,6 +77,17 @@ | |||
@Config("rename_file_after_upload") | |||
@ConfigDefault("false") | |||
public Boolean getRenameFileAfterUpload(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
public Void call() throws Exception | ||
{ | ||
final FileObject remoteFile = newSftpFile(getSftpFileUri(remotePath)); | ||
final OutputStream outputStream = openStream(remoteFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: BufferedOutputStream is correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The instance is already BufferedOutputStream
, and here I only need to use basic operations of OutputStream
, I'd like to keep it simple, but I can change it to make it explicit.
long startTime = System.nanoTime(); | ||
|
||
// start uploading | ||
try (InputStream inputStream = new FileInputStream(localTempFile)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although original implementation was wrong, Use try (BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(localTempFile)) {}
is better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, will fix it, thanks.
* @param outputStream | ||
* @throws IOException | ||
*/ | ||
void appendFile(final File localTempFile, final FileObject remoteFile, final OutputStream outputStream) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: BurreredOutputStream outputStream
is correct?
I think you already passed BurreredOutputStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update.
} | ||
|
||
@VisibleForTesting | ||
OutputStream getRemoteOutput() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: BufferedOutputStream
is correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just for testing, can I keep it that way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Up to you!
Checking unit tests... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@sakama -san Please help take another look: tvhung83/embulk-output-sftp@bc94c9f...8b8ca77
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks, going to merge now. |
CHANGELOG
This PR introduces 2 new configs:
local_buffering
: whether to use local temp file as buffer. Iffalse
(new behavior), plugin will use remote file as buffering, ie. writing directly to remoteOutputStream
. This new behavior can help to reduce processing time, and not to use local file, which will reduce usage of disk space.temp_file_threshold
:long
, maximum file size of local temp file, before plugin will flush to server.Note
To implement
local_buffering
,SftpFileOutput
is nowSftpLocalFileOutput
andSftpRemoteFileOutput
:SftpLocalFileOutput
keeps buffering records into local temp file, and only flush to remote file when its size reachestemp_file_threshold
.SftpRemoteFileOutput
will use remote file as buffer, hence there is no threshold. The progress will show number of uploaded records, instead of file size.I also added retry into some operations of
SftpUtils
.