diff --git a/bom/application/pom.xml b/bom/application/pom.xml
index 83b28a473d5c05..09353bfc7b2a55 100644
--- a/bom/application/pom.xml
+++ b/bom/application/pom.xml
@@ -53,7 +53,7 @@
1.0.13
2.7.0
2.29.0
- 3.22.0
+ 3.22.1
1.3.3
1.2.1
1.3.5
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java
index 19c664f2c09be2..19dc9c4b664eb9 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java
@@ -21,6 +21,8 @@
import io.smallrye.reactive.messaging.annotations.Incomings;
import io.smallrye.reactive.messaging.annotations.Merge;
import io.smallrye.reactive.messaging.annotations.OnOverflow;
+import io.smallrye.reactive.messaging.connector.InboundConnector;
+import io.smallrye.reactive.messaging.connector.OutboundConnector;
public final class ReactiveMessagingDotNames {
@@ -55,6 +57,9 @@ public final class ReactiveMessagingDotNames {
static final DotName INCOMING_CONNECTOR_FACTORY = DotName.createSimple(IncomingConnectorFactory.class.getName());
static final DotName OUTGOING_CONNECTOR_FACTORY = DotName.createSimple(OutgoingConnectorFactory.class.getName());
+ static final DotName INBOUND_CONNECTOR = DotName.createSimple(InboundConnector.class.getName());
+ static final DotName OUTBOUND_CONNECTOR = DotName.createSimple(OutboundConnector.class.getName());
+
static final DotName SMALLRYE_BLOCKING = DotName.createSimple(io.smallrye.common.annotation.Blocking.class.getName());
// Do not directly reference the MetricDecorator (due to its direct references to MP Metrics, which may not be present)
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
index 67495210a2f63a..c329c7f8f78910 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
@@ -428,7 +428,9 @@ public boolean appliesTo(AnnotationTarget.Kind kind) {
public void transform(TransformationContext ctx) {
ClassInfo clazz = ctx.getTarget().asClass();
if (doesImplement(clazz, ReactiveMessagingDotNames.INCOMING_CONNECTOR_FACTORY, index.getIndex())
- || doesImplement(clazz, ReactiveMessagingDotNames.OUTGOING_CONNECTOR_FACTORY, index.getIndex())) {
+ || doesImplement(clazz, ReactiveMessagingDotNames.INBOUND_CONNECTOR, index.getIndex())
+ || doesImplement(clazz, ReactiveMessagingDotNames.OUTGOING_CONNECTOR_FACTORY, index.getIndex())
+ || doesImplement(clazz, ReactiveMessagingDotNames.OUTBOUND_CONNECTOR, index.getIndex())) {
ctx.transform().add(DevModeSupportConnectorFactory.class).done();
}
}
@@ -505,7 +507,8 @@ public boolean appliesTo(AnnotationTarget.Kind kind) {
@Override
public void transform(TransformationContext ctx) {
ClassInfo clazz = ctx.getTarget().asClass();
- if (doesImplement(clazz, ReactiveMessagingDotNames.INCOMING_CONNECTOR_FACTORY, index.getIndex())) {
+ if (doesImplement(clazz, ReactiveMessagingDotNames.INCOMING_CONNECTOR_FACTORY, index.getIndex())
+ || doesImplement(clazz, ReactiveMessagingDotNames.INBOUND_CONNECTOR, index.getIndex())) {
ctx.transform().add(DuplicatedContextConnectorFactory.class).done();
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringHelper.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringHelper.java
index 47e744385dd410..c09bffb28ef621 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringHelper.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringHelper.java
@@ -110,8 +110,8 @@ static boolean isChannelEnabled(ChannelDirection direction, String channel) {
* @return {@code true} if the class implements the inbound connector interface
*/
static boolean isInboundConnector(ClassInfo ci) {
- // TODO Add the internal interface support
- return ci.interfaceNames().contains(ReactiveMessagingDotNames.INCOMING_CONNECTOR_FACTORY);
+ return ci.interfaceNames().contains(ReactiveMessagingDotNames.INCOMING_CONNECTOR_FACTORY)
+ || ci.interfaceNames().contains(ReactiveMessagingDotNames.INBOUND_CONNECTOR);
}
/**
@@ -121,8 +121,8 @@ static boolean isInboundConnector(ClassInfo ci) {
* @return {@code true} if the class implements the outbound connector interface
*/
static boolean isOutboundConnector(ClassInfo ci) {
- // TODO Add the internal interface support
- return ci.interfaceNames().contains(ReactiveMessagingDotNames.OUTGOING_CONNECTOR_FACTORY);
+ return ci.interfaceNames().contains(ReactiveMessagingDotNames.OUTGOING_CONNECTOR_FACTORY)
+ || ci.interfaceNames().contains(ReactiveMessagingDotNames.OUTBOUND_CONNECTOR);
}
/**
diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java
index daa1c4e4467ed7..a0659ecad6333b 100644
--- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java
+++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java
@@ -9,9 +9,11 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
+import org.reactivestreams.Publisher;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.vertx.VertxContext;
+import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata;
import io.vertx.core.Context;
@@ -24,18 +26,25 @@ public class DuplicatedContextConnectorFactoryInterceptor {
public Object intercept(InvocationContext ctx) throws Exception {
if (ctx.getMethod().getName().equals("getPublisherBuilder")) {
PublisherBuilder> result = (PublisherBuilder>) ctx.proceed();
- return result.map(message -> {
- Optional metadata = message.getMetadata(LocalContextMetadata.class);
- if (metadata.isPresent()) {
- Context context = metadata.get().context();
- if (context != null && VertxContext.isDuplicatedContext(context)) {
- VertxContextSafetyToggle.setContextSafe(context, true);
- }
- }
- return message;
- });
+ return result.map(DuplicatedContextConnectorFactoryInterceptor::setMessageContextSafe);
+ }
+ if (ctx.getMethod().getName().equals("getPublisher")) {
+ Publisher> result = (Publisher>) ctx.proceed();
+ return Multi.createFrom().publisher(result)
+ .map(DuplicatedContextConnectorFactoryInterceptor::setMessageContextSafe);
}
return ctx.proceed();
}
+
+ private static Message> setMessageContextSafe(Message> message) {
+ Optional metadata = message.getMetadata(LocalContextMetadata.class);
+ if (metadata.isPresent()) {
+ Context context = metadata.get().context();
+ if (context != null && VertxContext.isDuplicatedContext(context)) {
+ VertxContextSafetyToggle.setContextSafe(context, true);
+ }
+ }
+ return message;
+ }
}
diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java
index 0d5478919ee28b..297ced7a23ef45 100644
--- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java
+++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java
@@ -12,9 +12,13 @@
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
+import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
@Interceptor
@DevModeSupportConnectorFactory
@Priority(Interceptor.Priority.PLATFORM_BEFORE + 10)
@@ -45,6 +49,19 @@ public Object intercept(InvocationContext ctx) throws Exception {
return future;
});
}
+ if (ctx.getMethod().getName().equals("getPublisher")) {
+ Publisher> result = (Publisher>) ctx.proceed();
+ return Multi.createFrom().publisher(result)
+ .onItem().transformToUniAndConcatenate(msg -> Uni.createFrom().emitter(e -> {
+ onMessage.get().whenComplete((restarted, error) -> {
+ if (!restarted) {
+ // if restarted, a new stream is already running,
+ // no point in emitting an event to the old stream
+ e.complete(msg);
+ }
+ });
+ }));
+ }
if (ctx.getMethod().getName().equals("getSubscriberBuilder")) {
SubscriberBuilder, Void> result = (SubscriberBuilder, Void>) ctx.proceed();
@@ -76,6 +93,36 @@ public void onComplete() {
}
});
}
+ if (ctx.getMethod().getName().equals("getSubscriberBuilder")) {
+ Subscriber> result = (Subscriber>) ctx.proceed();
+ return new Subscriber>() {
+ private Subscriber> subscriber;
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ subscriber = result;
+ subscriber.onSubscribe(s);
+ }
+
+ @Override
+ public void onNext(Message> o) {
+ subscriber.onNext(o);
+ onMessage.get();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ subscriber.onError(t);
+ onMessage.get();
+ }
+
+ @Override
+ public void onComplete() {
+ subscriber.onComplete();
+ onMessage.get();
+ }
+ };
+ }
return ctx.proceed();
}
diff --git a/jakarta/rewrite.yml b/jakarta/rewrite.yml
index 3809006b4d55e6..69a1e3798b8822 100644
--- a/jakarta/rewrite.yml
+++ b/jakarta/rewrite.yml
@@ -623,7 +623,7 @@ recipeList:
newValue: '3.0.0'
- org.openrewrite.maven.ChangePropertyValue:
key: smallrye-reactive-messaging.version
- newValue: '4.1.0'
+ newValue: '4.1.1'
- org.openrewrite.maven.ChangePropertyValue:
key: microprofile-rest-client.version
newValue: '3.0'