Skip to content

Commit

Permalink
chore: update runContext deprecated methods
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed May 13, 2024
1 parent 1aa96ac commit 6fe82c7
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private static List<PutEvents.OutputEntry> getOutputEntries(PutEvents put, RunCo
if (!from.getScheme().equals("kestra")) {
throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entry.");
}
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
outputEntries = Flux.create(FileSerde.reader(inputStream, PutEvents.OutputEntry.class), FluxSink.OverflowStrategy.BUFFER).collectList().block();
}
return outputEntries;
Expand Down Expand Up @@ -151,7 +151,7 @@ void runStorage() throws Exception {
.region(localstack.getRegion())
.accessKeyId(localstack.getAccessKey())
.secretKeyId(localstack.getSecretKey())
.entries(runContext.putTempFile(tempFile).toString())
.entries(runContext.storage().putFile(tempFile).toString())
.build();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private static List<PutRecords.OutputEntry> getOutputEntries(PutRecords put, Run
if (!from.getScheme().equals("kestra")) {
throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entry.");
}
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
outputEntries = Flux.create(FileSerde.reader(inputStream, PutRecords.OutputEntry.class), FluxSink.OverflowStrategy.BUFFER).collectList().block();
}
return outputEntries;
Expand Down Expand Up @@ -177,7 +177,7 @@ void runStorage() throws Exception {
.region(localstack.getRegion())
.accessKeyId(localstack.getAccessKey())
.secretKeyId(localstack.getSecretKey())
.records(runContext.putTempFile(tempFile).toString())
.records(runContext.storage().putFile(tempFile).toString())
.streamName("streamName")
.build();

Expand Down Expand Up @@ -230,7 +230,7 @@ void runStorageUpperCase() throws Exception {
.region(localstack.getRegion())
.accessKeyId(localstack.getAccessKey())
.secretKeyId(localstack.getSecretKey())
.records(runContext.putTempFile(tempFile).toString())
.records(runContext.storage().putFile(tempFile).toString())
.streamName("streamName")
.build();

Expand Down

0 comments on commit 6fe82c7

Please sign in to comment.