From 37a54a7ab3cc871f6297ceec07cda37e30a8e900 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Fri, 5 Jan 2024 10:13:30 +0100 Subject: [PATCH] feat(*): Migrate from RxJava2 to Reactor --- build.gradle | 2 +- .../io/kestra/plugin/jdbc/AbstractJdbcBatch.java | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index 64c7e28c..2f3db685 100644 --- a/build.gradle +++ b/build.gradle @@ -56,7 +56,7 @@ subprojects { compileOnly "io.micronaut:micronaut-inject" compileOnly "io.micronaut.validation:micronaut-validation" compileOnly "io.micronaut:micronaut-jackson-databind" - compileOnly "io.micronaut.rxjava2:micronaut-rxjava2" + compileOnly "io.micronaut.reactor:micronaut-reactor" // kestra compileOnly group: "io.kestra", name: "core", version: kestraVersion diff --git a/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java b/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java index 1a668a16..cdae194d 100644 --- a/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java +++ b/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java @@ -5,8 +5,6 @@ import io.kestra.core.models.tasks.Task; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.FileSerde; -import io.reactivex.BackpressureStrategy; -import io.reactivex.Flowable; import io.swagger.v3.oas.annotations.media.Schema; import lombok.*; import lombok.experimental.SuperBuilder; @@ -23,6 +21,10 @@ import java.util.*; import java.util.concurrent.atomic.AtomicLong; import jakarta.validation.constraints.NotNull; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + +import static io.kestra.core.utils.Rethrow.throwFunction; @SuperBuilder @@ -91,12 +93,12 @@ public Output run(RunContext runContext) throws Exception { ) { connection.setAutoCommit(false); - Flowable flowable = Flowable.create(FileSerde.reader(bufferedReader), BackpressureStrategy.BUFFER) + Flux flowable = Flux.create(FileSerde.reader(bufferedReader), FluxSink.OverflowStrategy.BUFFER) .doOnNext(docWriteRequest -> { count.incrementAndGet(); }) .buffer(this.chunk, this.chunk) - .map(o -> { + .map(throwFunction(o -> { PreparedStatement ps = connection.prepareStatement(sql); ParameterType parameterMetaData = ParameterType.of(ps.getParameterMetaData()); @@ -111,11 +113,11 @@ public Output run(RunContext runContext) throws Exception { connection.commit(); return Arrays.stream(updatedRows).sum(); - }); + })); Integer updated = flowable .reduce(Integer::sum) - .blockingGet(); + .block(); runContext.metric(Counter.of("records", count.get())); runContext.metric(Counter.of("updated", updated == null ? 0 : updated));