Skip to content

Commit

Permalink
Merge pull request #18651 from Sanne/HR3
Browse files Browse the repository at this point in the history
Panache Reactive to work correctly when invoked from a blocking worker thread
  • Loading branch information
Sanne authored Jul 14, 2021
2 parents edc5c9a + 523b38b commit 087e562
Show file tree
Hide file tree
Showing 8 changed files with 471 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,36 @@
import io.quarkus.panache.hibernate.common.runtime.PanacheJpaUtil;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

public abstract class AbstractJpaOperations<PanacheQueryType> {

// FIXME: make it configurable?
static final long TIMEOUT_MS = 5000;

private static void executeInVertxEventLoop(Runnable runnable) {
Vertx vertx = Arc.container().instance(Vertx.class).get();
// this needs to be sync
CompletableFuture<Void> cf = new CompletableFuture<>();
vertx.runOnContext(v -> {
try {
runnable.run();
cf.complete(null);
} catch (Throwable t) {
cf.completeExceptionally(t);
}
});
try {
cf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}

private static Session lookupSessionFromArc() {
return Arc.container().instance(Session.class).get();
}

protected abstract PanacheQueryType createPanacheQuery(Uni<Mutiny.Session> session, String query, String orderBy,
Object paramsArrayOrMap);

Expand All @@ -50,9 +72,9 @@ public Uni<Void> persist(Object entity) {
}

public Uni<Void> persist(Uni<Mutiny.Session> sessionUni, Object entity) {
return sessionUni.flatMap(session -> {
return sessionUni.chain(session -> {
if (!session.contains(entity)) {
return session.persist(entity).map(v -> null);
return session.persist(entity);
}
return Uni.createFrom().nullItem();
});
Expand Down Expand Up @@ -82,13 +104,13 @@ public Uni<Void> persist(Stream<?> entities) {
}

public Uni<Void> delete(Object entity) {
return getSession().flatMap(session -> session.remove(entity)).map(v -> null);
return getSession().chain(session -> session.remove(entity));
}

public boolean isPersistent(Object entity) {
// only attempt to look up the request context session if it's already there: do not
// run the producer method otherwise, before we know which thread we're on
Session requestSession = isInRequestContext(Mutiny.Session.class) ? Arc.container().instance(Mutiny.Session.class).get()
Session requestSession = isInRequestContext(Mutiny.Session.class) ? lookupSessionFromArc()
: null;
if (requestSession != null) {
return requestSession.contains(entity);
Expand All @@ -98,52 +120,21 @@ public boolean isPersistent(Object entity) {
}

public Uni<Void> flush() {
return getSession().flatMap(session -> session.flush()).map(v -> null);
return getSession().chain(Session::flush);
}

//
// Private stuff

public static Uni<Mutiny.Session> getSession() {
// only attempt to look up the request context session if it's already there: do not
// run the producer method otherwise, before we know which thread we're on
Session requestSession = isInRequestContext(Mutiny.Session.class) ? Arc.container().instance(Mutiny.Session.class).get()
: null;
if (requestSession != null) {
return Uni.createFrom().item(requestSession);
}

if (io.vertx.core.Context.isOnVertxThread()) {
return Uni.createFrom().item(Arc.container().instance(Mutiny.Session.class).get());
// Always check if we're running on the event loop: if not,
// we need to delegate the execution of all tasks on it.
if (io.vertx.core.Context.isOnEventLoopThread()) {
return Uni.createFrom().item(lookupSessionFromArc());
} else {
// FIXME: we may need context propagation
Vertx vertx = Arc.container().instance(Vertx.class).get();
Executor executor = runnable -> {
// this will be the context for a VertxThread, or a ThreadLocal context otherwise, but not null
Context context = vertx.getOrCreateContext();
// currentContext() returns null for non-VertxThread
if (Vertx.currentContext() == context) {
runnable.run();
} else {
// this needs to be sync
CompletableFuture<Void> cf = new CompletableFuture<>();
vertx.runOnContext(v -> {
try {
runnable.run();
cf.complete(null);
} catch (Throwable t) {
cf.completeExceptionally(t);
}
});
try {
cf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}

};
return Uni.createFrom().item(() -> Arc.container().instance(Mutiny.Session.class).get())
final Executor executor = AbstractJpaOperations::executeInVertxEventLoop;
return Uni.createFrom().item(AbstractJpaOperations::lookupSessionFromArc)
.runSubscriptionOn(executor);
}
}
Expand Down Expand Up @@ -185,12 +176,12 @@ public int paramCount(Map<String, Object> params) {
// Queries

public Uni<?> findById(Class<?> entityClass, Object id) {
return getSession().flatMap(session -> session.find(entityClass, id));
return getSession().chain(session -> session.find(entityClass, id));
}

public Uni<?> findById(Class<?> entityClass, Object id, LockModeType lockModeType) {
return getSession()
.flatMap(session -> session.find(entityClass, id, LockModeConverter.convertToLockMode(lockModeType)));
.chain(session -> session.find(entityClass, id, LockModeConverter.convertToLockMode(lockModeType)));
}

public PanacheQueryType find(Class<?> entityClass, String query, Object... params) {
Expand Down Expand Up @@ -312,20 +303,20 @@ public Multi<?> streamAll(Class<?> entityClass, Sort sort) {
@SuppressWarnings({ "rawtypes", "unchecked" })
public Uni<Long> count(Class<?> entityClass) {
return (Uni) getSession()
.flatMap(session -> session.createQuery("SELECT COUNT(*) FROM " + PanacheJpaUtil.getEntityName(entityClass))
.chain(session -> session.createQuery("SELECT COUNT(*) FROM " + PanacheJpaUtil.getEntityName(entityClass))
.getSingleResult());
}

@SuppressWarnings({ "unchecked", "rawtypes" })
public Uni<Long> count(Class<?> entityClass, String query, Object... params) {
return (Uni) getSession().flatMap(session -> bindParameters(
return (Uni) getSession().chain(session -> bindParameters(
session.createQuery(PanacheJpaUtil.createCountQuery(entityClass, query, paramCount(params))),
params).getSingleResult());
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public Uni<Long> count(Class<?> entityClass, String query, Map<String, Object> params) {
return (Uni) getSession().flatMap(session -> bindParameters(
return (Uni) getSession().chain(session -> bindParameters(
session.createQuery(PanacheJpaUtil.createCountQuery(entityClass, query, paramCount(params))),
params).getSingleResult());
}
Expand All @@ -351,33 +342,33 @@ public Uni<Boolean> exists(Class<?> entityClass, String query, Parameters params
}

public Uni<Long> deleteAll(Class<?> entityClass) {
return getSession().flatMap(
return getSession().chain(
session -> session.createQuery("DELETE FROM " + PanacheJpaUtil.getEntityName(entityClass)).executeUpdate()
.map(i -> i.longValue()));
.map(Integer::longValue));
}

public Uni<Boolean> deleteById(Class<?> entityClass, Object id) {
// Impl note : we load the entity then delete it because it's the only implementation generic enough for any model,
// and correct in all cases (composite key, graph of entities, ...). HQL cannot be directly used for these reasons.
return findById(entityClass, id)
.flatMap(entity -> {
.chain(entity -> {
if (entity == null) {
return Uni.createFrom().item(false);
}
return getSession().flatMap(session -> session.remove(entity).map(v -> true));
return getSession().chain(session -> session.remove(entity).map(v -> true));
});
}

public Uni<Long> delete(Class<?> entityClass, String query, Object... params) {
return getSession().flatMap(session -> bindParameters(
return getSession().chain(session -> bindParameters(
session.createQuery(PanacheJpaUtil.createDeleteQuery(entityClass, query, paramCount(params))), params)
.executeUpdate().map(i -> i.longValue()));
.executeUpdate().map(Integer::longValue));
}

public Uni<Long> delete(Class<?> entityClass, String query, Map<String, Object> params) {
return getSession().flatMap(session -> bindParameters(
return getSession().chain(session -> bindParameters(
session.createQuery(PanacheJpaUtil.createDeleteQuery(entityClass, query, paramCount(params))), params)
.executeUpdate().map(i -> i.longValue()));
.executeUpdate().map(Integer::longValue));
}

public Uni<Long> delete(Class<?> entityClass, String query, Parameters params) {
Expand All @@ -390,15 +381,15 @@ public IllegalStateException implementationInjectionMissing() {
}

public static Uni<Integer> executeUpdate(String query, Object... params) {
return getSession().flatMap(session -> {
return getSession().chain(session -> {
Mutiny.Query<?> jpaQuery = session.createQuery(query);
bindParameters(jpaQuery, params);
return jpaQuery.executeUpdate();
});
}

public static Uni<Integer> executeUpdate(String query, Map<String, Object> params) {
return getSession().flatMap(session -> {
return getSession().chain(session -> {
Mutiny.Query<?> jpaQuery = session.createQuery(query);
bindParameters(jpaQuery, params);
return jpaQuery.executeUpdate();
Expand Down
Loading

0 comments on commit 087e562

Please sign in to comment.