Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local_buffering & temp_file_threshold configs #50

Merged
merged 40 commits into from
Aug 27, 2018

Conversation

tvhung83
Copy link
Contributor

@tvhung83 tvhung83 commented Aug 14, 2018

CHANGELOG

This PR introduces 2 new configs:

  • local_buffering: whether to use local temp file as buffer. If false (new behavior), plugin will use remote file as buffering, ie. writing directly to remote OutputStream. This new behavior can help to reduce processing time, and not to use local file, which will reduce usage of disk space.
  • temp_file_threshold: long, maximum file size of local temp file, before plugin will flush to server.

Note

To implement local_buffering, SftpFileOutput is now SftpLocalFileOutput and SftpRemoteFileOutput:

  • SftpLocalFileOutput keeps buffering records into local temp file, and only flush to remote file when its size reaches temp_file_threshold.
  • SftpRemoteFileOutput will use remote file as buffer, hence there is no threshold. The progress will show number of uploaded records, instead of file size.

I also added retry into some operations of SftpUtils.

…`deleteFile()` method, `withRetry()` utility function
…5 minutes, add `resolve` and `openStream` (with retry)
@tvhung83 tvhung83 changed the title [WIP] Add local_buffering & temp_file_threshold configs Add local_buffering & temp_file_threshold configs Aug 23, 2018
@sakama sakama self-requested a review August 24, 2018 02:32
Copy link
Contributor

@sakama sakama left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, LGTM
Put some minor comments.

build.gradle Outdated
compile "org.embulk:embulk-core:0.8.6"
provided "org.embulk:embulk-core:0.8.6"
compile "org.embulk:embulk-core:0.8.+"
provided "org.embulk:embulk-core:0.8.+"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed version might better and embulk v0.9.x is ok for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though I'm not very confident with 0.9.x, but I can try that.

// if `false`, plugin will use remote file as buffer
@Config("local_buffering")
@ConfigDefault("true")
public Boolean getLocalBuffering();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ will update.

@@ -74,6 +77,17 @@
@Config("rename_file_after_upload")
@ConfigDefault("false")
public Boolean getRenameFileAfterUpload();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

public Void call() throws Exception
{
final FileObject remoteFile = newSftpFile(getSftpFileUri(remotePath));
final OutputStream outputStream = openStream(remoteFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: BufferedOutputStream is correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The instance is already BufferedOutputStream, and here I only need to use basic operations of OutputStream, I'd like to keep it simple, but I can change it to make it explicit.

long startTime = System.nanoTime();

// start uploading
try (InputStream inputStream = new FileInputStream(localTempFile)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although original implementation was wrong, Use try (BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(localTempFile)) {} is better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, will fix it, thanks.

* @param outputStream
* @throws IOException
*/
void appendFile(final File localTempFile, final FileObject remoteFile, final OutputStream outputStream) throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: BurreredOutputStream outputStream is correct?
I think you already passed BurreredOutputStream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update.

}

@VisibleForTesting
OutputStream getRemoteOutput()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: BufferedOutputStream is correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just for testing, can I keep it that way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to you!

@tvhung83
Copy link
Contributor Author

Checking unit tests...

Copy link
Contributor

@sakama sakama left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@tvhung83
Copy link
Contributor Author

tvhung83 commented Aug 24, 2018

@sakama -san Please help take another look: tvhung83/embulk-output-sftp@bc94c9f...8b8ca77
After more tests, I found out that it's not safe to ignore TimeoutException in TimeoutCloser. When the connection is stalling (close output stream → flush → stalled), the remote file would be broken. Below are what changed:

  • Revert input stream back to non-buffered stream, because we already read by buffer, 2 level of buffers is unpredictable.
  • Abort when closing resource got timed out.
  • Increase default time-out duration to 5 minutes (instead of 2 minutes).
  • Remove obsolete method callNonInterruptible()

Copy link
Contributor

@sakama sakama left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@tvhung83
Copy link
Contributor Author

Thanks, going to merge now.

@tvhung83 tvhung83 merged commit 6b61c6e into embulk:master Aug 27, 2018
@tvhung83 tvhung83 deleted the add_local_buffering_config branch August 27, 2018 07:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants