From ddc4879b6abc0ca1d8677000d2d27159320d15d3 Mon Sep 17 00:00:00 2001 From: ozangunalp Date: Wed, 14 Dec 2022 18:41:39 +0000 Subject: [PATCH] Bump Smallrye Reactive Messaging version to 3.22.1 --- bom/application/pom.xml | 2 +- .../deployment/ReactiveMessagingDotNames.java | 5 ++ .../SmallRyeReactiveMessagingProcessor.java | 7 ++- .../deployment/WiringHelper.java | 8 ++-- ...tedContextConnectorFactoryInterceptor.java | 29 ++++++++---- ...odeSupportConnectorFactoryInterceptor.java | 47 +++++++++++++++++++ jakarta/rewrite.yml | 2 +- 7 files changed, 82 insertions(+), 18 deletions(-) diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 83b28a473d5c0..09353bfc7b2a5 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -53,7 +53,7 @@ 1.0.13 2.7.0 2.29.0 - 3.22.0 + 3.22.1 1.3.3 1.2.1 1.3.5 diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java index 19c664f2c09be..19dc9c4b664eb 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java @@ -21,6 +21,8 @@ import io.smallrye.reactive.messaging.annotations.Incomings; import io.smallrye.reactive.messaging.annotations.Merge; import io.smallrye.reactive.messaging.annotations.OnOverflow; +import io.smallrye.reactive.messaging.connector.InboundConnector; +import io.smallrye.reactive.messaging.connector.OutboundConnector; public final class ReactiveMessagingDotNames { @@ -55,6 +57,9 @@ public final class ReactiveMessagingDotNames { static final DotName INCOMING_CONNECTOR_FACTORY = DotName.createSimple(IncomingConnectorFactory.class.getName()); static final DotName OUTGOING_CONNECTOR_FACTORY = DotName.createSimple(OutgoingConnectorFactory.class.getName()); + static final DotName INBOUND_CONNECTOR = DotName.createSimple(InboundConnector.class.getName()); + static final DotName OUTBOUND_CONNECTOR = DotName.createSimple(OutboundConnector.class.getName()); + static final DotName SMALLRYE_BLOCKING = DotName.createSimple(io.smallrye.common.annotation.Blocking.class.getName()); // Do not directly reference the MetricDecorator (due to its direct references to MP Metrics, which may not be present) diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index 67495210a2f63..c329c7f8f7891 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -428,7 +428,9 @@ public boolean appliesTo(AnnotationTarget.Kind kind) { public void transform(TransformationContext ctx) { ClassInfo clazz = ctx.getTarget().asClass(); if (doesImplement(clazz, ReactiveMessagingDotNames.INCOMING_CONNECTOR_FACTORY, index.getIndex()) - || doesImplement(clazz, ReactiveMessagingDotNames.OUTGOING_CONNECTOR_FACTORY, index.getIndex())) { + || doesImplement(clazz, ReactiveMessagingDotNames.INBOUND_CONNECTOR, index.getIndex()) + || doesImplement(clazz, ReactiveMessagingDotNames.OUTGOING_CONNECTOR_FACTORY, index.getIndex()) + || doesImplement(clazz, ReactiveMessagingDotNames.OUTBOUND_CONNECTOR, index.getIndex())) { ctx.transform().add(DevModeSupportConnectorFactory.class).done(); } } @@ -505,7 +507,8 @@ public boolean appliesTo(AnnotationTarget.Kind kind) { @Override public void transform(TransformationContext ctx) { ClassInfo clazz = ctx.getTarget().asClass(); - if (doesImplement(clazz, ReactiveMessagingDotNames.INCOMING_CONNECTOR_FACTORY, index.getIndex())) { + if (doesImplement(clazz, ReactiveMessagingDotNames.INCOMING_CONNECTOR_FACTORY, index.getIndex()) + || doesImplement(clazz, ReactiveMessagingDotNames.INBOUND_CONNECTOR, index.getIndex())) { ctx.transform().add(DuplicatedContextConnectorFactory.class).done(); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringHelper.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringHelper.java index 47e744385dd41..c09bffb28ef62 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringHelper.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringHelper.java @@ -110,8 +110,8 @@ static boolean isChannelEnabled(ChannelDirection direction, String channel) { * @return {@code true} if the class implements the inbound connector interface */ static boolean isInboundConnector(ClassInfo ci) { - // TODO Add the internal interface support - return ci.interfaceNames().contains(ReactiveMessagingDotNames.INCOMING_CONNECTOR_FACTORY); + return ci.interfaceNames().contains(ReactiveMessagingDotNames.INCOMING_CONNECTOR_FACTORY) + || ci.interfaceNames().contains(ReactiveMessagingDotNames.INBOUND_CONNECTOR); } /** @@ -121,8 +121,8 @@ static boolean isInboundConnector(ClassInfo ci) { * @return {@code true} if the class implements the outbound connector interface */ static boolean isOutboundConnector(ClassInfo ci) { - // TODO Add the internal interface support - return ci.interfaceNames().contains(ReactiveMessagingDotNames.OUTGOING_CONNECTOR_FACTORY); + return ci.interfaceNames().contains(ReactiveMessagingDotNames.OUTGOING_CONNECTOR_FACTORY) + || ci.interfaceNames().contains(ReactiveMessagingDotNames.OUTBOUND_CONNECTOR); } /** diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java index daa1c4e4467ed..a0659ecad6333 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java @@ -9,9 +9,11 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; +import org.reactivestreams.Publisher; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; import io.vertx.core.Context; @@ -24,18 +26,25 @@ public class DuplicatedContextConnectorFactoryInterceptor { public Object intercept(InvocationContext ctx) throws Exception { if (ctx.getMethod().getName().equals("getPublisherBuilder")) { PublisherBuilder> result = (PublisherBuilder>) ctx.proceed(); - return result.map(message -> { - Optional metadata = message.getMetadata(LocalContextMetadata.class); - if (metadata.isPresent()) { - Context context = metadata.get().context(); - if (context != null && VertxContext.isDuplicatedContext(context)) { - VertxContextSafetyToggle.setContextSafe(context, true); - } - } - return message; - }); + return result.map(DuplicatedContextConnectorFactoryInterceptor::setMessageContextSafe); + } + if (ctx.getMethod().getName().equals("getPublisher")) { + Publisher> result = (Publisher>) ctx.proceed(); + return Multi.createFrom().publisher(result) + .map(DuplicatedContextConnectorFactoryInterceptor::setMessageContextSafe); } return ctx.proceed(); } + + private static Message setMessageContextSafe(Message message) { + Optional metadata = message.getMetadata(LocalContextMetadata.class); + if (metadata.isPresent()) { + Context context = metadata.get().context(); + if (context != null && VertxContext.isDuplicatedContext(context)) { + VertxContextSafetyToggle.setContextSafe(context, true); + } + } + return message; + } } diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java index 0d5478919ee28..297ced7a23ef4 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java @@ -12,9 +12,13 @@ import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + @Interceptor @DevModeSupportConnectorFactory @Priority(Interceptor.Priority.PLATFORM_BEFORE + 10) @@ -45,6 +49,19 @@ public Object intercept(InvocationContext ctx) throws Exception { return future; }); } + if (ctx.getMethod().getName().equals("getPublisher")) { + Publisher> result = (Publisher>) ctx.proceed(); + return Multi.createFrom().publisher(result) + .onItem().transformToUniAndConcatenate(msg -> Uni.createFrom().emitter(e -> { + onMessage.get().whenComplete((restarted, error) -> { + if (!restarted) { + // if restarted, a new stream is already running, + // no point in emitting an event to the old stream + e.complete(msg); + } + }); + })); + } if (ctx.getMethod().getName().equals("getSubscriberBuilder")) { SubscriberBuilder, Void> result = (SubscriberBuilder, Void>) ctx.proceed(); @@ -76,6 +93,36 @@ public void onComplete() { } }); } + if (ctx.getMethod().getName().equals("getSubscriberBuilder")) { + Subscriber> result = (Subscriber>) ctx.proceed(); + return new Subscriber>() { + private Subscriber> subscriber; + + @Override + public void onSubscribe(Subscription s) { + subscriber = result; + subscriber.onSubscribe(s); + } + + @Override + public void onNext(Message o) { + subscriber.onNext(o); + onMessage.get(); + } + + @Override + public void onError(Throwable t) { + subscriber.onError(t); + onMessage.get(); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + onMessage.get(); + } + }; + } return ctx.proceed(); } diff --git a/jakarta/rewrite.yml b/jakarta/rewrite.yml index 3809006b4d55e..69a1e3798b882 100644 --- a/jakarta/rewrite.yml +++ b/jakarta/rewrite.yml @@ -623,7 +623,7 @@ recipeList: newValue: '3.0.0' - org.openrewrite.maven.ChangePropertyValue: key: smallrye-reactive-messaging.version - newValue: '4.1.0' + newValue: '4.1.1' - org.openrewrite.maven.ChangePropertyValue: key: microprofile-rest-client.version newValue: '3.0'