Skip to content

Commit

Permalink
Merge pull request #11 from embulk/support-incremental-flag
Browse files Browse the repository at this point in the history
Support incremental flag
  • Loading branch information
sakama authored Aug 16, 2016
2 parents d358f72 + dad1f69 commit a81564c
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Reads files stored on remote server using SFTP
- **user_directory_is_root**: (boolean, default: `true`)
- **timeout**: sftp connection timeout seconds (integer, default: `600`)
- **path_prefix**: Prefix of output paths (string, required)
- **incremental**: enables incremental loading(boolean, optional. default: `true`). If incremental loading is enabled, config diff for the next execution will include `last_path` parameter so that next execution skips files before the path. Otherwise, `last_path` will not be included.
- **path_match_pattern**: regexp to match file paths. If a file path doesn't match with this pattern, the file will be skipped (regexp string, optional)
- **total_file_count_limit**: maximum number of files to read (integer, optional)
- **min_task_size (experimental)**: minimum size of a task. If this is larger than 0, one task includes multiple input files. This is useful if too many number of tasks impacts performance of output or executor plugins badly. (integer, optional)
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/embulk/input/sftp/PluginTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public interface PluginTask
@Config("path_prefix")
String getPathPrefix();

@Config("incremental")
@ConfigDefault("true")
boolean getIncremental();

@Config("last_path")
@ConfigDefault("null")
Optional<String> getLastPath();
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/embulk/input/sftp/SftpFileInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public ConfigDiff resume(TaskSource taskSource,
control.run(taskSource, taskCount);

ConfigDiff configDiff = Exec.newConfigDiff();
configDiff.set("last_path", SftpFileInput.getRelativePath(task.getFiles().getLastPath(task.getLastPath())));
if (task.getIncremental()) {
configDiff.set("last_path", SftpFileInput.getRelativePath(task.getFiles().getLastPath(task.getLastPath())));
}

return configDiff;
}
Expand Down
18 changes: 18 additions & 0 deletions src/test/java/org/embulk/input/sftp/TestSftpFileInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public void checkDefaultValues()

PluginTask task = config.loadConfig(PluginTask.class);
assertEquals(22, task.getPort());
assertEquals(true, task.getIncremental());
assertEquals(true, task.getUserDirIsRoot());
assertEquals(600, task.getSftpConnectionTimeout());
assertEquals(5, task.getMaxConnectionRetry());
Expand Down Expand Up @@ -173,6 +174,23 @@ public List<TaskReport> run(TaskSource taskSource, int taskCount)
assertEquals("in/aa/a", configDiff.get(String.class, "last_path"));
}

@Test
public void testResumeIncrementalFalse()
{
ConfigSource newConfig = config.deepCopy().set("incremental", false);
PluginTask task = newConfig.loadConfig(PluginTask.class);
task.setFiles(createFileList(Arrays.asList("in/aa/a"), task));
ConfigDiff configDiff = plugin.resume(task.dump(), 0, new FileInputPlugin.Control()
{
@Override
public List<TaskReport> run(TaskSource taskSource, int taskCount)
{
return emptyTaskReports(taskCount);
}
});
assertEquals("{}", configDiff.toString());
}

@Test
public void testCleanup()
{
Expand Down

0 comments on commit a81564c

Please sign in to comment.