Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/rework task runner wdir #485

Merged
merged 2 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ dependencies {
// AWS libs: versions are managed by the Micronaut BOM
api platform("io.micronaut.platform:micronaut-platform")
api 'software.amazon.awssdk:cloudwatchlogs'
api 'software.amazon.awssdk:batch:2.25.60' // we can remove this after micronaut bump as long as it contains the RegisterJobDefinitionRequest.ecsProperties exists
api 'software.amazon.awssdk:batch:2.26.16' // we can remove this after micronaut bump as long as it contains the RegisterJobDefinitionRequest.ecsProperties exists
api 'software.amazon.awssdk:s3'
api 'software.amazon.awssdk:s3-transfer-manager'
api 'software.amazon.awssdk.crt:aws-crt:0.29.19' //used by s3-transfer-manager
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/io/kestra/plugin/aws/runner/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,22 +226,23 @@ public class Batch extends TaskRunner implements AbstractS3, AbstractConnectionI
private final Duration completionCheckInterval = Duration.ofSeconds(5);

@Override
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToUpload, List<String> filesToDownload) throws Exception {
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) throws Exception {
boolean hasS3Bucket = this.bucket != null;

String renderedBucket = runContext.render(bucket);

boolean hasFilesToUpload = !ListUtils.isEmpty(filesToUpload);
Logger logger = runContext.logger();
List<Path> relativeWorkingDirectoryFilesPaths = taskCommands.relativeWorkingDirectoryFilesPaths();
boolean hasFilesToUpload = !ListUtils.isEmpty(relativeWorkingDirectoryFilesPaths);
if (hasFilesToUpload && !hasS3Bucket) {
throw new IllegalArgumentException("You must provide an S3 bucket in order to use `inputFiles` or `namespaceFiles`");
logger.warn("Working directory is not empty but no S3 bucket are specified. You must provide an S3 bucket in order to use `inputFiles` or `namespaceFiles`. Skipping importing files to runner.");
}
boolean hasFilesToDownload = !ListUtils.isEmpty(filesToDownload);
boolean outputDirectoryEnabled = taskCommands.outputDirectoryEnabled();
if ((hasFilesToDownload || outputDirectoryEnabled) && !hasS3Bucket) {
throw new IllegalArgumentException("You must provide an S3 bucket in order to use `outputFiles` or `{{ outputDir }}`");
}

Logger logger = runContext.logger();
AbstractLogConsumer logConsumer = taskCommands.getLogConsumer();

String renderedRegion = runContext.render(this.region);
Expand Down Expand Up @@ -285,7 +286,7 @@ public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<S

if (hasFilesToUpload) {
try (S3TransferManager transferManager = transferManager(runContext)) {
filesToUpload.stream().map(relativePath ->
relativeWorkingDirectoryFilesPaths.stream().map(relativePath ->
UploadFileRequest.builder()
.putObjectRequest(
PutObjectRequest
Expand All @@ -295,7 +296,7 @@ public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<S
.key((batchWorkingDirectory + Path.of("/" + relativePath).toString()).substring(1))
.build()
)
.source(runContext.workingDir().resolve(Path.of(relativePath)))
.source(runContext.workingDir().resolve(relativePath))
.build()
).map(transferManager::uploadFile)
.map(FileUpload::completionFuture)
Expand Down Expand Up @@ -343,7 +344,7 @@ public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<S
.build();

if (hasFilesToUpload || outputDirectoryEnabled) {
Stream<String> commands = ListUtils.emptyOnNull(filesToUpload).stream()
Stream<String> commands = ListUtils.emptyOnNull(relativeWorkingDirectoryFilesPaths).stream()
.map(relativePath -> "aws s3 cp " + s3WorkingDir + Path.of("/" + relativePath) + " " + batchWorkingDirectory + Path.of("/" + relativePath));
if (outputDirectoryEnabled) {
commands = Stream.concat(commands, Stream.of("mkdir " + batchOutputDirectory));
Expand Down
Loading