Skip to content

Commit

Permalink
feat(batch-runner): take working dir files instead of a static list t…
Browse files Browse the repository at this point in the history
…o upload
  • Loading branch information
brian-mulier-p committed Jul 10, 2024
1 parent f8c4e71 commit 5fc3c2e
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions src/main/java/io/kestra/plugin/aws/runner/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,22 +226,23 @@ public class Batch extends TaskRunner implements AbstractS3, AbstractConnectionI
private final Duration completionCheckInterval = Duration.ofSeconds(5);

@Override
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToUpload, List<String> filesToDownload) throws Exception {
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) throws Exception {
boolean hasS3Bucket = this.bucket != null;

String renderedBucket = runContext.render(bucket);

boolean hasFilesToUpload = !ListUtils.isEmpty(filesToUpload);
Logger logger = runContext.logger();
List<Path> relativeWorkingDirectoryFilesPaths = taskCommands.relativeWorkingDirectoryFilesPaths();
boolean hasFilesToUpload = !ListUtils.isEmpty(relativeWorkingDirectoryFilesPaths);
if (hasFilesToUpload && !hasS3Bucket) {
throw new IllegalArgumentException("You must provide an S3 bucket in order to use `inputFiles` or `namespaceFiles`");
logger.warn("Working directory is not empty but no S3 bucket are specified. You must provide an S3 bucket in order to use `inputFiles` or `namespaceFiles`. Skipping importing files to runner.");
}
boolean hasFilesToDownload = !ListUtils.isEmpty(filesToDownload);
boolean outputDirectoryEnabled = taskCommands.outputDirectoryEnabled();
if ((hasFilesToDownload || outputDirectoryEnabled) && !hasS3Bucket) {
throw new IllegalArgumentException("You must provide an S3 bucket in order to use `outputFiles` or `{{ outputDir }}`");
}

Logger logger = runContext.logger();
AbstractLogConsumer logConsumer = taskCommands.getLogConsumer();

String renderedRegion = runContext.render(this.region);
Expand Down Expand Up @@ -285,7 +286,7 @@ public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<S

if (hasFilesToUpload) {
try (S3TransferManager transferManager = transferManager(runContext)) {
filesToUpload.stream().map(relativePath ->
relativeWorkingDirectoryFilesPaths.stream().map(relativePath ->
UploadFileRequest.builder()
.putObjectRequest(
PutObjectRequest
Expand All @@ -295,7 +296,7 @@ public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<S
.key((batchWorkingDirectory + Path.of("/" + relativePath).toString()).substring(1))
.build()
)
.source(runContext.workingDir().resolve(Path.of(relativePath)))
.source(runContext.workingDir().resolve(relativePath))
.build()
).map(transferManager::uploadFile)
.map(FileUpload::completionFuture)
Expand Down Expand Up @@ -343,7 +344,7 @@ public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<S
.build();

if (hasFilesToUpload || outputDirectoryEnabled) {
Stream<String> commands = ListUtils.emptyOnNull(filesToUpload).stream()
Stream<String> commands = ListUtils.emptyOnNull(relativeWorkingDirectoryFilesPaths).stream()
.map(relativePath -> "aws s3 cp " + s3WorkingDir + Path.of("/" + relativePath) + " " + batchWorkingDirectory + Path.of("/" + relativePath));
if (outputDirectoryEnabled) {
commands = Stream.concat(commands, Stream.of("mkdir " + batchOutputDirectory));
Expand Down

0 comments on commit 5fc3c2e

Please sign in to comment.