diff --git a/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/AbstractJpaOperations.java b/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/AbstractJpaOperations.java index 7003213302635..b0680e8c18463 100644 --- a/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/AbstractJpaOperations.java +++ b/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/AbstractJpaOperations.java @@ -27,7 +27,6 @@ import io.quarkus.panache.hibernate.common.runtime.PanacheJpaUtil; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; -import io.vertx.core.Context; import io.vertx.core.Vertx; public abstract class AbstractJpaOperations { @@ -35,6 +34,29 @@ public abstract class AbstractJpaOperations { // FIXME: make it configurable? static final long TIMEOUT_MS = 5000; + private static void executeInVertxEventLoop(Runnable runnable) { + Vertx vertx = Arc.container().instance(Vertx.class).get(); + // this needs to be sync + CompletableFuture cf = new CompletableFuture<>(); + vertx.runOnContext(v -> { + try { + runnable.run(); + cf.complete(null); + } catch (Throwable t) { + cf.completeExceptionally(t); + } + }); + try { + cf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + private static Session lookupSessionFromArc() { + return Arc.container().instance(Session.class).get(); + } + protected abstract PanacheQueryType createPanacheQuery(Uni session, String query, String orderBy, Object paramsArrayOrMap); @@ -50,9 +72,9 @@ public Uni persist(Object entity) { } public Uni persist(Uni sessionUni, Object entity) { - return sessionUni.flatMap(session -> { + return sessionUni.chain(session -> { if (!session.contains(entity)) { - return session.persist(entity).map(v -> null); + return session.persist(entity); } return Uni.createFrom().nullItem(); }); @@ -82,13 +104,13 @@ public Uni persist(Stream entities) { } public Uni delete(Object entity) { - return getSession().flatMap(session -> session.remove(entity)).map(v -> null); + return getSession().chain(session -> session.remove(entity)); } public boolean isPersistent(Object entity) { // only attempt to look up the request context session if it's already there: do not // run the producer method otherwise, before we know which thread we're on - Session requestSession = isInRequestContext(Mutiny.Session.class) ? Arc.container().instance(Mutiny.Session.class).get() + Session requestSession = isInRequestContext(Mutiny.Session.class) ? lookupSessionFromArc() : null; if (requestSession != null) { return requestSession.contains(entity); @@ -98,52 +120,21 @@ public boolean isPersistent(Object entity) { } public Uni flush() { - return getSession().flatMap(session -> session.flush()).map(v -> null); + return getSession().chain(Session::flush); } // // Private stuff public static Uni getSession() { - // only attempt to look up the request context session if it's already there: do not - // run the producer method otherwise, before we know which thread we're on - Session requestSession = isInRequestContext(Mutiny.Session.class) ? Arc.container().instance(Mutiny.Session.class).get() - : null; - if (requestSession != null) { - return Uni.createFrom().item(requestSession); - } - - if (io.vertx.core.Context.isOnVertxThread()) { - return Uni.createFrom().item(Arc.container().instance(Mutiny.Session.class).get()); + // Always check if we're running on the event loop: if not, + // we need to delegate the execution of all tasks on it. + if (io.vertx.core.Context.isOnEventLoopThread()) { + return Uni.createFrom().item(lookupSessionFromArc()); } else { // FIXME: we may need context propagation - Vertx vertx = Arc.container().instance(Vertx.class).get(); - Executor executor = runnable -> { - // this will be the context for a VertxThread, or a ThreadLocal context otherwise, but not null - Context context = vertx.getOrCreateContext(); - // currentContext() returns null for non-VertxThread - if (Vertx.currentContext() == context) { - runnable.run(); - } else { - // this needs to be sync - CompletableFuture cf = new CompletableFuture<>(); - vertx.runOnContext(v -> { - try { - runnable.run(); - cf.complete(null); - } catch (Throwable t) { - cf.completeExceptionally(t); - } - }); - try { - cf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new RuntimeException(e); - } - } - - }; - return Uni.createFrom().item(() -> Arc.container().instance(Mutiny.Session.class).get()) + final Executor executor = AbstractJpaOperations::executeInVertxEventLoop; + return Uni.createFrom().item(AbstractJpaOperations::lookupSessionFromArc) .runSubscriptionOn(executor); } } @@ -185,12 +176,12 @@ public int paramCount(Map params) { // Queries public Uni findById(Class entityClass, Object id) { - return getSession().flatMap(session -> session.find(entityClass, id)); + return getSession().chain(session -> session.find(entityClass, id)); } public Uni findById(Class entityClass, Object id, LockModeType lockModeType) { return getSession() - .flatMap(session -> session.find(entityClass, id, LockModeConverter.convertToLockMode(lockModeType))); + .chain(session -> session.find(entityClass, id, LockModeConverter.convertToLockMode(lockModeType))); } public PanacheQueryType find(Class entityClass, String query, Object... params) { @@ -312,20 +303,20 @@ public Multi streamAll(Class entityClass, Sort sort) { @SuppressWarnings({ "rawtypes", "unchecked" }) public Uni count(Class entityClass) { return (Uni) getSession() - .flatMap(session -> session.createQuery("SELECT COUNT(*) FROM " + PanacheJpaUtil.getEntityName(entityClass)) + .chain(session -> session.createQuery("SELECT COUNT(*) FROM " + PanacheJpaUtil.getEntityName(entityClass)) .getSingleResult()); } @SuppressWarnings({ "unchecked", "rawtypes" }) public Uni count(Class entityClass, String query, Object... params) { - return (Uni) getSession().flatMap(session -> bindParameters( + return (Uni) getSession().chain(session -> bindParameters( session.createQuery(PanacheJpaUtil.createCountQuery(entityClass, query, paramCount(params))), params).getSingleResult()); } @SuppressWarnings({ "rawtypes", "unchecked" }) public Uni count(Class entityClass, String query, Map params) { - return (Uni) getSession().flatMap(session -> bindParameters( + return (Uni) getSession().chain(session -> bindParameters( session.createQuery(PanacheJpaUtil.createCountQuery(entityClass, query, paramCount(params))), params).getSingleResult()); } @@ -351,33 +342,33 @@ public Uni exists(Class entityClass, String query, Parameters params } public Uni deleteAll(Class entityClass) { - return getSession().flatMap( + return getSession().chain( session -> session.createQuery("DELETE FROM " + PanacheJpaUtil.getEntityName(entityClass)).executeUpdate() - .map(i -> i.longValue())); + .map(Integer::longValue)); } public Uni deleteById(Class entityClass, Object id) { // Impl note : we load the entity then delete it because it's the only implementation generic enough for any model, // and correct in all cases (composite key, graph of entities, ...). HQL cannot be directly used for these reasons. return findById(entityClass, id) - .flatMap(entity -> { + .chain(entity -> { if (entity == null) { return Uni.createFrom().item(false); } - return getSession().flatMap(session -> session.remove(entity).map(v -> true)); + return getSession().chain(session -> session.remove(entity).map(v -> true)); }); } public Uni delete(Class entityClass, String query, Object... params) { - return getSession().flatMap(session -> bindParameters( + return getSession().chain(session -> bindParameters( session.createQuery(PanacheJpaUtil.createDeleteQuery(entityClass, query, paramCount(params))), params) - .executeUpdate().map(i -> i.longValue())); + .executeUpdate().map(Integer::longValue)); } public Uni delete(Class entityClass, String query, Map params) { - return getSession().flatMap(session -> bindParameters( + return getSession().chain(session -> bindParameters( session.createQuery(PanacheJpaUtil.createDeleteQuery(entityClass, query, paramCount(params))), params) - .executeUpdate().map(i -> i.longValue())); + .executeUpdate().map(Integer::longValue)); } public Uni delete(Class entityClass, String query, Parameters params) { @@ -390,7 +381,7 @@ public IllegalStateException implementationInjectionMissing() { } public static Uni executeUpdate(String query, Object... params) { - return getSession().flatMap(session -> { + return getSession().chain(session -> { Mutiny.Query jpaQuery = session.createQuery(query); bindParameters(jpaQuery, params); return jpaQuery.executeUpdate(); @@ -398,7 +389,7 @@ public static Uni executeUpdate(String query, Object... params) { } public static Uni executeUpdate(String query, Map params) { - return getSession().flatMap(session -> { + return getSession().chain(session -> { Mutiny.Query jpaQuery = session.createQuery(query); bindParameters(jpaQuery, params); return jpaQuery.executeUpdate(); diff --git a/integration-tests/hibernate-reactive-panache-blocking/pom.xml b/integration-tests/hibernate-reactive-panache-blocking/pom.xml new file mode 100644 index 0000000000000..2c2eb81ed7c0f --- /dev/null +++ b/integration-tests/hibernate-reactive-panache-blocking/pom.xml @@ -0,0 +1,318 @@ + + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + 4.0.0 + + quarkus-integration-test-hibernate-reactive-panache-blocking + Quarkus - Integration Tests - Hibernate Reactive with Panache in Blocking mode + To test proper error reporting and safeguards when Panache Reactive is being used in combination with blocking technologies + + + postgres:13.1 + vertx-reactive:postgresql://localhost:5432/hibernate_orm_test + + + + + io.quarkus + quarkus-hibernate-reactive-panache + + + io.quarkus + quarkus-resteasy + + + io.quarkus + quarkus-resteasy-jsonb + + + io.quarkus + quarkus-core + + + io.quarkus + quarkus-reactive-pg-client + + + org.junit.jupiter + junit-jupiter-api + compile + + + + + io.quarkus + quarkus-junit5 + test + + + io.quarkus + quarkus-junit5-internal + test + + + io.rest-assured + rest-assured + test + + + io.quarkus + quarkus-test-h2 + test + + + io.quarkus + quarkus-panache-mock + test + + + net.bytebuddy + byte-buddy + + + + + org.assertj + assertj-core + test + + + org.awaitility + awaitility + test + + + + io.quarkus + quarkus-core-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-hibernate-reactive-panache-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-reactive-pg-client-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-jsonb-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + src/main/resources + true + + + + + maven-surefire-plugin + + true + + + + maven-failsafe-plugin + + true + + + + io.quarkus + quarkus-maven-plugin + + + + build + + + + + + + + + + test-postgresql + + + test-containers + + + + + + maven-surefire-plugin + + false + + + + + prod-mode + test + + test + + + **/*PMT.java + + + + + + maven-failsafe-plugin + + false + + + + + + + + docker-postgresql + + + start-containers + + + + vertx-reactive:postgresql://localhost:5431/hibernate_orm_test + + + + + io.fabric8 + docker-maven-plugin + + + + ${postgres.image} + postgresql + + + hibernate_orm_test + hibernate_orm_test + hibernate_orm_test + + + 5431:5432 + + + + mapped + + 5432 + + + + + + + + + true + + + + docker-start + compile + + stop + start + + + + docker-stop + post-integration-test + + stop + + + + + + org.codehaus.mojo + exec-maven-plugin + + + docker-prune + generate-resources + + exec + + + ${docker-prune.location} + + + + + + + + + diff --git a/integration-tests/hibernate-reactive-panache-blocking/src/main/java/io/quarkus/it/panache/reactive/Fruit.java b/integration-tests/hibernate-reactive-panache-blocking/src/main/java/io/quarkus/it/panache/reactive/Fruit.java new file mode 100644 index 0000000000000..e9c44f954f041 --- /dev/null +++ b/integration-tests/hibernate-reactive-panache-blocking/src/main/java/io/quarkus/it/panache/reactive/Fruit.java @@ -0,0 +1,21 @@ +package io.quarkus.it.panache.reactive; + +import javax.persistence.Entity; + +import io.quarkus.hibernate.reactive.panache.PanacheEntity; + +@Entity +public class Fruit extends PanacheEntity { + + public String name; + public String color; + + public Fruit(String name, String color) { + this.name = name; + this.color = color; + } + + public Fruit() { + } + +} diff --git a/integration-tests/hibernate-reactive-panache-blocking/src/main/java/io/quarkus/it/panache/reactive/TestEndpoint.java b/integration-tests/hibernate-reactive-panache-blocking/src/main/java/io/quarkus/it/panache/reactive/TestEndpoint.java new file mode 100644 index 0000000000000..7aa0925ae9c23 --- /dev/null +++ b/integration-tests/hibernate-reactive-panache-blocking/src/main/java/io/quarkus/it/panache/reactive/TestEndpoint.java @@ -0,0 +1,43 @@ +package io.quarkus.it.panache.reactive; + +import java.util.List; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; + +import io.quarkus.hibernate.reactive.panache.Panache; +import io.quarkus.hibernate.reactive.panache.PanacheEntityBase; + +/** + * These tests cover for "mixed mode" usage of Panache Reactive from a blocking thread; + * this is known to be tricky as Hibernate Reactive requires running on the event loop, + * while Panache relies on the notion of "current Session" being stored in the current + * CDI context. + */ +@Path("test") +public class TestEndpoint { + + @GET + @Path("store3fruits") + public String testStorage() { + Fruit apple = new Fruit("apple", "red"); + Fruit orange = new Fruit("orange", "orange"); + Fruit banana = new Fruit("banana", "yellow"); + + Panache.withTransaction(() -> Fruit.persist(apple, orange, banana)).subscribeAsCompletionStage().join(); + + //We wants this same request to also perform a read, so to trigger a second lookup of the Mutiny.Session from ArC + return verifyStored(); + } + + @GET + @Path("load3fruits") + public String verifyStored() { + final List fruitsList = Panache.withTransaction(() -> Fruit.find("select name, color from Fruit") + .list()) + .subscribeAsCompletionStage() + .join(); + return fruitsList.size() == 3 ? "OK" : "KO"; + } + +} diff --git a/integration-tests/hibernate-reactive-panache-blocking/src/main/resources/application.properties b/integration-tests/hibernate-reactive-panache-blocking/src/main/resources/application.properties new file mode 100644 index 0000000000000..6d0922b28d2fe --- /dev/null +++ b/integration-tests/hibernate-reactive-panache-blocking/src/main/resources/application.properties @@ -0,0 +1,6 @@ +quarkus.datasource.db-kind=postgresql +quarkus.datasource.username=hibernate_orm_test +quarkus.datasource.password=hibernate_orm_test +quarkus.datasource.reactive.url=${postgres.reactive.url} + +quarkus.hibernate-orm.database.generation=drop-and-create diff --git a/integration-tests/hibernate-reactive-panache-blocking/src/test/java/io/quarkus/it/panache/reactive/PanacheFunctionalityInGraalITCase.java b/integration-tests/hibernate-reactive-panache-blocking/src/test/java/io/quarkus/it/panache/reactive/PanacheFunctionalityInGraalITCase.java new file mode 100644 index 0000000000000..6f88d80421ba3 --- /dev/null +++ b/integration-tests/hibernate-reactive-panache-blocking/src/test/java/io/quarkus/it/panache/reactive/PanacheFunctionalityInGraalITCase.java @@ -0,0 +1,11 @@ +package io.quarkus.it.panache.reactive; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +/** + * Repeat all tests from {@link PanacheFunctionalityTest} in native image mode. + */ +@QuarkusIntegrationTest +public class PanacheFunctionalityInGraalITCase extends PanacheFunctionalityTest { + +} diff --git a/integration-tests/hibernate-reactive-panache-blocking/src/test/java/io/quarkus/it/panache/reactive/PanacheFunctionalityTest.java b/integration-tests/hibernate-reactive-panache-blocking/src/test/java/io/quarkus/it/panache/reactive/PanacheFunctionalityTest.java new file mode 100644 index 0000000000000..fac550debcebd --- /dev/null +++ b/integration-tests/hibernate-reactive-panache-blocking/src/test/java/io/quarkus/it/panache/reactive/PanacheFunctionalityTest.java @@ -0,0 +1,22 @@ +package io.quarkus.it.panache.reactive; + +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; + +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.RestAssured; + +/** + * Test various Panache operations running in Quarkus + */ +@QuarkusTest +public class PanacheFunctionalityTest { + + @Test + public void tests() { + RestAssured.when().get("/test/store3fruits").then().body(is("OK")); + RestAssured.when().get("/test/load3fruits").then().body(is("OK")); + } + +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 00447f238e83c..2f3b6665f5b2c 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -67,6 +67,7 @@ hibernate-reactive-mysql hibernate-reactive-postgresql hibernate-reactive-panache + hibernate-reactive-panache-blocking hibernate-search-orm-elasticsearch hibernate-search-orm-elasticsearch-aws hibernate-tenancy