From 7a1aa2089d51a0633618813f9874315a424cf298 Mon Sep 17 00:00:00 2001 From: Dan Kristensen <1816596+dankristensen@users.noreply.github.com> Date: Tue, 30 Apr 2024 09:18:39 +0200 Subject: [PATCH 1/4] WIP: Initial commit for fault strategies --- .../messaging/jms/IncomingJmsMessage.java | 10 ++- .../reactive/messaging/jms/JmsConnector.java | 10 ++- .../reactive/messaging/jms/JmsSource.java | 41 ++++++++- .../messaging/jms/fault/JmsFailStop.java | 49 +++++++++++ .../jms/fault/JmsFailureHandler.java | 60 ++++++++++++++ .../messaging/jms/fault/JmsIgnoreFailure.java | 45 ++++++++++ .../messaging/jms/i18n/JmsExceptions.java | 3 + .../messaging/jms/i18n/JmsLogging.java | 12 +++ .../reactive/messaging/jms/JmsSourceTest.java | 4 +- .../messaging/support/JmsTestBase.java | 11 ++- .../messaging/support/MultipleInstance.java | 83 +++++++++++++++++++ .../messaging/support/SingletonInstance.java | 81 ++++++++++++++++++ .../support/UnsatisfiedInstance.java | 73 ++++++++++++++++ 13 files changed, 475 insertions(+), 7 deletions(-) create mode 100644 smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailStop.java create mode 100644 smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailureHandler.java create mode 100644 smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsIgnoreFailure.java create mode 100644 smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/MultipleInstance.java create mode 100644 smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/SingletonInstance.java create mode 100644 smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/UnsatisfiedInstance.java diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java index d2c2c71971..7768567659 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java @@ -12,6 +12,7 @@ import org.eclipse.microprofile.reactive.messaging.Metadata; import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler; import io.smallrye.reactive.messaging.json.JsonMapping; public class IncomingJmsMessage implements org.eclipse.microprofile.reactive.messaging.Message { @@ -21,11 +22,13 @@ public class IncomingJmsMessage implements org.eclipse.microprofile.reactive. private final JsonMapping jsonMapping; private final IncomingJmsMessageMetadata jmsMetadata; private final Metadata metadata; + private final JmsFailureHandler failureHandler; - IncomingJmsMessage(Message message, Executor executor, JsonMapping jsonMapping) { + IncomingJmsMessage(Message message, Executor executor, JsonMapping jsonMapping, JmsFailureHandler failureHandler) { this.delegate = message; this.jsonMapping = jsonMapping; this.executor = executor; + this.failureHandler = failureHandler; String cn = null; try { cn = message.getStringProperty("_classname"); @@ -127,6 +130,11 @@ public Metadata getMetadata() { return metadata; } + @Override + public CompletionStage nack(Throwable reason, Metadata metadata) { + return failureHandler.handle(this, reason, metadata).subscribeAsCompletionStage(); + } + @SuppressWarnings({ "unchecked" }) @Override public C unwrap(Class unwrapType) { diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java index b54e0a9b91..7d52b624ea 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java @@ -33,6 +33,7 @@ import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction; import io.smallrye.reactive.messaging.connector.InboundConnector; import io.smallrye.reactive.messaging.connector.OutboundConnector; +import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler; import io.smallrye.reactive.messaging.json.JsonMapping; import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging; @@ -66,6 +67,9 @@ @ConnectorAttribute(name = "retry.initial-delay", direction = Direction.INCOMING_AND_OUTGOING, description = "The initial delay for the retry.", type = "string", defaultValue = "PT1S") @ConnectorAttribute(name = "retry.max-delay", direction = Direction.INCOMING_AND_OUTGOING, description = "The maximum delay", type = "string", defaultValue = "PT10S") @ConnectorAttribute(name = "retry.jitter", direction = Direction.INCOMING_AND_OUTGOING, description = "How much the delay jitters as a multiplier between 0 and 1. The formula is current delay * jitter. For example, with a current delay of 2H, a jitter of 0.5 will result in an actual delay somewhere between 1H and 3H.", type = "double", defaultValue = "0.5") +@ConnectorAttribute(name = "failure-strategy", type = "string", direction = Direction.INCOMING, description = "Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be `fail` (default), `ignore`, or `dead-letter-queue`", defaultValue = "fail") +@ConnectorAttribute(name = "dead-letter-queue.topic", type = "string", direction = Direction.INCOMING, description = "When the `failure-strategy` is set to `dead-letter-queue` indicates on which queue the message is sent. Defaults is `dead-letter-topic-$channel`") +@ConnectorAttribute(name = "dead-letter-queue.producer-client-id", type = "string", direction = Direction.INCOMING, description = "When the `failure-strategy` is set to `dead-letter-queue` indicates what client id the generated producer should use. Defaults is `jms-dead-letter-topic-producer-$client-id`") public class JmsConnector implements InboundConnector, OutboundConnector { /** @@ -100,6 +104,10 @@ public class JmsConnector implements InboundConnector, OutboundConnector { @ConfigProperty(name = "smallrye.jms.threads.ttl", defaultValue = DEFAULT_THREAD_TTL) int ttl; + @Inject + @Any + Instance failureHandlerFactories; + private ExecutorService executor; private JsonMapping jsonMapping; private final List sources = new CopyOnWriteArrayList<>(); @@ -134,7 +142,7 @@ public Flow.Publisher> getPublisher(Config config) { JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config); JmsResourceHolder holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic)); contexts.add(holder); - JmsSource source = new JmsSource(holder, ic, jsonMapping, executor); + JmsSource source = new JmsSource(holder, ic, jsonMapping, executor, failureHandlerFactories); sources.add(source); return source.getSource(); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java index 4d7dbc7493..dd8d293a49 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java @@ -4,6 +4,8 @@ import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -11,21 +13,29 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import jakarta.enterprise.inject.Instance; import jakarta.jms.*; +import io.smallrye.common.annotation.Identifier; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler; import io.smallrye.reactive.messaging.json.JsonMapping; +import io.vertx.mutiny.core.Vertx; class JmsSource { private final Multi> source; private final JmsResourceHolder resourceHolder; + private final List failures = new ArrayList<>(); private final JmsPublisher publisher; + private final Instance failureHandlerFactories; + private final JmsConnectorIncomingConfiguration config; + private final JmsFailureHandler failureHandler; JmsSource(JmsResourceHolder resourceHolder, JmsConnectorIncomingConfiguration config, JsonMapping jsonMapping, - Executor executor) { + Executor executor, Instance failureHandlerFactories) { String channel = config.getChannel(); final String destinationName = config.getDestination().orElseGet(config::getChannel); String selector = config.getSelector().orElse(null); @@ -33,6 +43,7 @@ class JmsSource { boolean durable = config.getDurable(); String type = config.getDestinationType(); boolean retry = config.getRetry(); + this.config = config; this.resourceHolder = resourceHolder.configure(r -> getDestination(r.getContext(), destinationName, type), r -> { if (durable) { @@ -47,8 +58,10 @@ class JmsSource { }); resourceHolder.getClient(); this.publisher = new JmsPublisher(resourceHolder); + this.failureHandlerFactories = failureHandlerFactories; + this.failureHandler = createFailureHandler(Vertx.vertx()); //FIXME Find correct vertx instance source = Multi.createFrom().publisher(publisher) - .> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping)) + .> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping, failureHandler)) .onFailure(t -> { log.terminalErrorOnChannel(channel); this.resourceHolder.close(); @@ -92,6 +105,30 @@ Multi> getSource() { return source; } + private JmsFailureHandler createFailureHandler(Vertx vertx) { + String strategy = config.getFailureStrategy(); + Instance failureHandlerFactory = failureHandlerFactories + .select(Identifier.Literal.of(strategy)); + if (failureHandlerFactory.isResolvable()) { + return failureHandlerFactory.get().create(config, vertx, resourceHolder.getClient(), this::reportFailure); + } else { + throw ex.illegalArgumentInvalidFailureStrategy(strategy); + } + } + + public synchronized void reportFailure(Throwable failure, boolean fatal) { + //log.failureReported(topics, failure); + // Don't keep all the failures, there are only there for reporting. + if (failures.size() == 10) { + failures.remove(0); + } + failures.add(failure); + + if (fatal) { + close(); + } + } + @SuppressWarnings("PublisherImplementation") private static class JmsPublisher implements Flow.Publisher, Flow.Subscription { diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailStop.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailStop.java new file mode 100644 index 0000000000..00b6d3639d --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailStop.java @@ -0,0 +1,49 @@ +package io.smallrye.reactive.messaging.jms.fault; + +import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log; + +import java.util.function.BiConsumer; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.jms.JMSConsumer; + +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import io.smallrye.common.annotation.Identifier; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.jms.IncomingJmsMessage; +import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration; +import io.vertx.mutiny.core.Vertx; + +public class JmsFailStop implements JmsFailureHandler { + + private final String channel; + private final BiConsumer reportFailure; + + @ApplicationScoped + @Identifier(Strategy.FAIL) + public static class Factory implements JmsFailureHandler.Factory { + + @Override + public JmsFailureHandler create(JmsConnectorIncomingConfiguration config, Vertx vertx, JMSConsumer consumer, + BiConsumer reportFailure) { + return new JmsFailStop(config.getChannel(), reportFailure); + } + } + + public JmsFailStop(String channel, BiConsumer reportFailure) { + this.channel = channel; + this.reportFailure = reportFailure; + } + + @Override + public Uni handle( + IncomingJmsMessage message, Throwable reason, Metadata metadata) { + // We don't commit, we just fail and stop the client. + log.messageNackedFailStop(channel); + // report failure to the connector for health check + reportFailure.accept(reason, true); + return Uni.createFrom(). failure(reason); + //.emitOn(message::runOnMessageContext); + } +} diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailureHandler.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailureHandler.java new file mode 100644 index 0000000000..7ed097f9d7 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailureHandler.java @@ -0,0 +1,60 @@ +package io.smallrye.reactive.messaging.jms.fault; + +import java.util.function.BiConsumer; + +import jakarta.jms.JMSConsumer; + +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import io.smallrye.common.annotation.Experimental; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.jms.IncomingJmsMessage; +import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration; +import io.vertx.mutiny.core.Vertx; + +/** + * Jms Failure handling strategy + */ +@Experimental("Experimental API") +public interface JmsFailureHandler { + + /** + * Identifiers of default failure strategies + */ + interface Strategy { + String FAIL = "fail"; + String IGNORE = "ignore"; + String DEAD_LETTER_QUEUE = "dead-letter-queue"; + + } + + /** + * Factory interface for {@link JmsFailureHandler} + */ + interface Factory { + JmsFailureHandler create( + JmsConnectorIncomingConfiguration config, + Vertx vertx, + JMSConsumer consumer, + BiConsumer reportFailure); + } + + /** + * Handle message negative-acknowledgment + * + * @param record incoming jms message + * @param reason nack reason + * @param metadata associated metadata with negative-acknowledgment + * @param type of payload + * @return a completion stage completed when the message is negative-acknowledgement has completed. + */ + Uni handle(IncomingJmsMessage record, Throwable reason, Metadata metadata); + + /** + * Called on channel shutdown + */ + default void terminate() { + // do nothing by default + } + +} diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsIgnoreFailure.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsIgnoreFailure.java new file mode 100644 index 0000000000..900bf4c658 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsIgnoreFailure.java @@ -0,0 +1,45 @@ +package io.smallrye.reactive.messaging.jms.fault; + +import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log; + +import java.util.function.BiConsumer; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.jms.JMSConsumer; + +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import io.smallrye.common.annotation.Identifier; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.jms.IncomingJmsMessage; +import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration; +import io.vertx.mutiny.core.Vertx; + +public class JmsIgnoreFailure implements JmsFailureHandler { + + private final String channel; + + @ApplicationScoped + @Identifier(Strategy.IGNORE) + public static class Factory implements JmsFailureHandler.Factory { + + @Override + public JmsFailureHandler create(JmsConnectorIncomingConfiguration config, Vertx vertx, + JMSConsumer consumer, BiConsumer reportFailure) { + return new JmsIgnoreFailure(config.getChannel()); + } + } + + public JmsIgnoreFailure(String channel) { + this.channel = channel; + } + + @Override + public Uni handle( + IncomingJmsMessage message, Throwable reason, Metadata metadata) { + // We commit the message, log and continue + log.messageNackedIgnore(channel, reason.getMessage()); + log.messageNackedFullIgnored(reason); + return Uni.createFrom().completionStage(message.ack()); + } +} diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsExceptions.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsExceptions.java index 82cbd3bca8..b1f74820d9 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsExceptions.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsExceptions.java @@ -56,4 +56,7 @@ public interface JmsExceptions { @Message(id = 15613, value = "There is already a subscriber") IllegalStateException illegalStateAlreadySubscriber(); + @Message(id = 18614, value = "Invalid failure strategy: %s") + IllegalArgumentException illegalArgumentInvalidFailureStrategy(String strategy); + } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsLogging.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsLogging.java index f84ac3064e..13ffd5089f 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsLogging.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsLogging.java @@ -43,4 +43,16 @@ public interface JmsLogging extends BasicLogger { @LogMessage(level = Logger.Level.WARN) @Message(id = 15806, value = "JMS Exception occurred. Closing the JMS context %s") void jmsException(String channelName, @Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 15807, value = "A message sent to channel `%s` has been nacked, ignored failure is: %s.") + void messageNackedIgnore(String channel, String message); + + @LogMessage(level = Logger.Level.DEBUG) + @Message(id = 15808, value = "The full ignored failure is") + void messageNackedFullIgnored(@Cause Throwable reason); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 15809, value = "A message sent to channel `%s` has been nacked, fail-stop") + void messageNackedFailStop(String channel); } diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java index ef5021320d..bf48b5b6ca 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java @@ -187,7 +187,7 @@ public void testReceptionOfMultipleMessages() { public void testMultipleRequests() { JmsSource source = new JmsSource(getResourceHolder("queue"), new JmsConnectorIncomingConfiguration(new MapBasedConfig().put("channel-name", "queue")), - null, null); + null, null, failureHandlerFactories); Publisher> publisher = source.getSource(); new Thread(() -> { @@ -241,7 +241,7 @@ public void testBroadcast() { JmsSource source = new JmsSource(getResourceHolder("queue"), new JmsConnectorIncomingConfiguration(new MapBasedConfig() .with("channel-name", "queue").with("broadcast", true)), - null, null); + null, null, failureHandlerFactories); Flow.Publisher> publisher = source.getSource(); List> list1 = new ArrayList<>(); diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java index f666494424..8065d0f785 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java @@ -1,5 +1,7 @@ package io.smallrye.reactive.messaging.support; +import jakarta.enterprise.inject.Instance; + import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.weld.environment.se.Weld; import org.jboss.weld.environment.se.WeldContainer; @@ -10,6 +12,9 @@ import io.smallrye.config.inject.ConfigExtension; import io.smallrye.reactive.messaging.jms.JmsConnector; import io.smallrye.reactive.messaging.jms.TestMapping; +import io.smallrye.reactive.messaging.jms.fault.JmsFailStop; +import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler; +import io.smallrye.reactive.messaging.jms.fault.JmsIgnoreFailure; import io.smallrye.reactive.messaging.providers.MediatorFactory; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry; @@ -30,6 +35,9 @@ public class JmsTestBase { private static final ArtemisHolder holder = new ArtemisHolder(); private Weld weld; + public static Instance failureHandlerFactories = new MultipleInstance<>( + new JmsFailStop.Factory(), + new JmsIgnoreFailure.Factory()); @BeforeEach public void startArtemis() { @@ -85,6 +93,8 @@ protected Weld initWithoutConnectionFactory() { weld.addBeanClass(ConfiguredChannelFactory.class); weld.addBeanClass(ChannelProducer.class); weld.addBeanClass(ExecutionHolder.class); + weld.addBeanClass(JmsFailStop.Factory.class); + weld.addBeanClass(JmsIgnoreFailure.Factory.class); weld.addBeanClass(WorkerPoolRegistry.class); weld.addBeanClass(HealthCenter.class); weld.addBeanClass(Wiring.class); @@ -106,5 +116,4 @@ protected void addConfig(MapBasedConfig config) { MapBasedConfig.cleanup(); } } - } diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/MultipleInstance.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/MultipleInstance.java new file mode 100644 index 0000000000..21f4f151be --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/MultipleInstance.java @@ -0,0 +1,83 @@ +package io.smallrye.reactive.messaging.support; + +import java.lang.annotation.Annotation; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.util.TypeLiteral; + +import io.smallrye.common.annotation.Identifier; + +public class MultipleInstance implements Instance { + + private final Map instances; + + public MultipleInstance(T... instances) { + this.instances = new HashMap<>(); + for (T instance : instances) { + Identifier identifier = instance.getClass().getAnnotation(Identifier.class); + this.instances.put(identifier.value(), instance); + } + } + + @Override + public Instance select(Annotation... qualifiers) { + if (qualifiers.length == 0) { + return this; + } + if (qualifiers.length == 1 && qualifiers[0] instanceof Identifier) { + String name = ((Identifier) qualifiers[0]).value(); + if (instances.containsKey(name)) { + return new SingletonInstance<>(name, instances.get(name)); + } + } + return UnsatisfiedInstance.instance(); + } + + @Override + public Instance select(Class subtype, Annotation... qualifiers) { + throw new UnsupportedOperationException(); + } + + @Override + public Instance select(TypeLiteral subtype, Annotation... qualifiers) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isUnsatisfied() { + return false; + } + + @Override + public boolean isAmbiguous() { + return false; + } + + @Override + public void destroy(T instance) { + throw new UnsupportedOperationException(); + } + + @Override + public Handle getHandle() { + return null; + } + + @Override + public Iterable> handles() { + return null; + } + + @Override + public Iterator iterator() { + return instances.values().iterator(); + } + + @Override + public T get() { + return instances.values().stream().findFirst().orElse(null); + } +} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/SingletonInstance.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/SingletonInstance.java new file mode 100644 index 0000000000..4ad1a9b975 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/SingletonInstance.java @@ -0,0 +1,81 @@ +package io.smallrye.reactive.messaging.support; + +import java.lang.annotation.Annotation; +import java.util.Collections; +import java.util.Iterator; + +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.util.TypeLiteral; + +import io.smallrye.common.annotation.Identifier; + +public class SingletonInstance implements Instance { + + private final T instance; + private final String name; + + public SingletonInstance(String name, T instance) { + this.name = name; + this.instance = instance; + } + + @Override + public Instance select(Annotation... qualifiers) { + if (qualifiers.length == 0) { + return this; + } + if (qualifiers.length == 1 && qualifiers[0] instanceof Identifier) { + if (((Identifier) qualifiers[0]).value().equalsIgnoreCase(name)) { + return this; + } + } + return UnsatisfiedInstance.instance(); + + } + + @Override + public Instance select(Class subtype, Annotation... qualifiers) { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public Instance select(TypeLiteral subtype, + Annotation... qualifiers) { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public boolean isUnsatisfied() { + return false; + } + + @Override + public boolean isAmbiguous() { + return false; + } + + @Override + public void destroy(T instance) { + throw new UnsupportedOperationException(); + } + + @Override + public Handle getHandle() { + return null; + } + + @Override + public Iterable> handles() { + return null; + } + + @Override + public Iterator iterator() { + return Collections.singleton(instance).iterator(); + } + + @Override + public T get() { + return instance; + } +} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/UnsatisfiedInstance.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/UnsatisfiedInstance.java new file mode 100644 index 0000000000..05956f9c3e --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/UnsatisfiedInstance.java @@ -0,0 +1,73 @@ +package io.smallrye.reactive.messaging.support; + +import java.lang.annotation.Annotation; +import java.util.Collections; +import java.util.Iterator; + +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.util.TypeLiteral; + +public class UnsatisfiedInstance implements Instance { + + private static final UnsatisfiedInstance INSTANCE = new UnsatisfiedInstance<>(); + + @SuppressWarnings("unchecked") + public static Instance instance() { + return (Instance) INSTANCE; + } + + private UnsatisfiedInstance() { + // avoid direct instantiation + } + + @Override + public Instance select(Annotation... qualifiers) { + return instance(); + } + + @Override + public Instance select(Class subtype, Annotation... qualifiers) { + return instance(); + } + + @Override + public Instance select(TypeLiteral subtype, + Annotation... qualifiers) { + return instance(); + } + + @Override + public boolean isUnsatisfied() { + return true; + } + + @Override + public boolean isAmbiguous() { + return false; + } + + @Override + public void destroy(T instance) { + throw new UnsupportedOperationException(); + } + + @Override + public Handle getHandle() { + return null; + } + + @Override + public Iterable> handles() { + return null; + } + + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } + + @Override + public T get() { + throw new UnsupportedOperationException(); + } +} From 2c1568a9f1d67054d6b367dedad44c3f88752821 Mon Sep 17 00:00:00 2001 From: Dan Kristensen <1816596+dankristensen@users.noreply.github.com> Date: Thu, 2 May 2024 05:59:22 +0200 Subject: [PATCH 2/4] Ensure fail strategies are defined in Weld container --- .../io/smallrye/reactive/messaging/support/JmsTestBase.java | 5 ++++- .../io/smallrye/reactive/messaging/support/JmsTestBase.java | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/smallrye-reactive-messaging-jackson/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java b/smallrye-reactive-messaging-jackson/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java index 908ef03d4a..1f7e4d25e1 100644 --- a/smallrye-reactive-messaging-jackson/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java +++ b/smallrye-reactive-messaging-jackson/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java @@ -9,6 +9,8 @@ import io.smallrye.config.SmallRyeConfigProviderResolver; import io.smallrye.config.inject.ConfigExtension; import io.smallrye.reactive.messaging.jms.JmsConnector; +import io.smallrye.reactive.messaging.jms.fault.JmsFailStop; +import io.smallrye.reactive.messaging.jms.fault.JmsIgnoreFailure; import io.smallrye.reactive.messaging.json.jackson.JacksonMapping; import io.smallrye.reactive.messaging.json.jackson.ObjectMapperProvider; import io.smallrye.reactive.messaging.providers.MediatorFactory; @@ -96,7 +98,8 @@ protected Weld initWithoutConnectionFactory() { weld.addBeanClass(EmitterFactoryImpl.class); weld.addBeanClass(MutinyEmitterFactoryImpl.class); weld.addBeanClass(LegacyEmitterFactoryImpl.class); - + weld.addBeanClass(JmsFailStop.Factory.class); + weld.addBeanClass(JmsIgnoreFailure.Factory.class); weld.addBeanClass(JmsConnector.class); weld.addBeanClass(ObjectMapperProvider.class); weld.addBeanClass(JacksonMapping.class); diff --git a/smallrye-reactive-messaging-jsonb/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java b/smallrye-reactive-messaging-jsonb/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java index 2bc22720f4..85d1d63687 100644 --- a/smallrye-reactive-messaging-jsonb/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java +++ b/smallrye-reactive-messaging-jsonb/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java @@ -9,6 +9,8 @@ import io.smallrye.config.SmallRyeConfigProviderResolver; import io.smallrye.config.inject.ConfigExtension; import io.smallrye.reactive.messaging.jms.JmsConnector; +import io.smallrye.reactive.messaging.jms.fault.JmsFailStop; +import io.smallrye.reactive.messaging.jms.fault.JmsIgnoreFailure; import io.smallrye.reactive.messaging.json.jsonb.JsonBMapping; import io.smallrye.reactive.messaging.json.jsonb.JsonBProvider; import io.smallrye.reactive.messaging.providers.MediatorFactory; @@ -96,6 +98,8 @@ protected Weld initWithoutConnectionFactory() { weld.addBeanClass(EmitterFactoryImpl.class); weld.addBeanClass(MutinyEmitterFactoryImpl.class); weld.addBeanClass(LegacyEmitterFactoryImpl.class); + weld.addBeanClass(JmsFailStop.Factory.class); + weld.addBeanClass(JmsIgnoreFailure.Factory.class); weld.addBeanClass(JmsConnector.class); weld.addBeanClass(JsonBProvider.class); From 549f06e96b20d820bbdff90fc44ea3e1dac0a495 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 7 May 2024 14:19:36 +0200 Subject: [PATCH 3/4] Added Vert.x context dispatching to the JMS connector --- .../messaging/jms/IncomingJmsMessage.java | 7 +- .../reactive/messaging/jms/JmsConnector.java | 6 +- .../reactive/messaging/jms/JmsSource.java | 14 +- .../messaging/jms/fault/JmsFailStop.java | 11 +- .../jms/fault/JmsFailureHandler.java | 5 - .../messaging/jms/fault/JmsIgnoreFailure.java | 12 +- .../reactive/messaging/jms/JmsSourceTest.java | 5 +- .../jms/LocalPropagationAckTest.java | 138 ++++ .../messaging/jms/LocalPropagationTest.java | 637 ++++++++++++++++++ .../messaging/support/JmsTestBase.java | 17 + 10 files changed, 824 insertions(+), 28 deletions(-) create mode 100644 smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationAckTest.java create mode 100644 smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationTest.java diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java index 7768567659..49bc076987 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java @@ -1,6 +1,7 @@ package io.smallrye.reactive.messaging.jms; import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex; +import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; @@ -14,8 +15,9 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler; import io.smallrye.reactive.messaging.json.JsonMapping; +import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; -public class IncomingJmsMessage implements org.eclipse.microprofile.reactive.messaging.Message { +public class IncomingJmsMessage implements ContextAwareMessage { private final Message delegate; private final Executor executor; private final Class clazz; @@ -45,7 +47,7 @@ public class IncomingJmsMessage implements org.eclipse.microprofile.reactive. } this.jmsMetadata = new IncomingJmsMessageMetadata(message); - this.metadata = Metadata.of(this.jmsMetadata); + this.metadata = captureContextMetadata(this.jmsMetadata); } @SuppressWarnings("unchecked") @@ -122,6 +124,7 @@ public CompletionStage ack(Metadata metadata) { } }) .runSubscriptionOn(executor) + .emitOn(this::runOnMessageContext) .subscribeAsCompletionStage(); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java index 7d52b624ea..53ce4670eb 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java @@ -35,6 +35,7 @@ import io.smallrye.reactive.messaging.connector.OutboundConnector; import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler; import io.smallrye.reactive.messaging.json.JsonMapping; +import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging; @ApplicationScoped @@ -108,6 +109,9 @@ public class JmsConnector implements InboundConnector, OutboundConnector { @Any Instance failureHandlerFactories; + @Inject + ExecutionHolder executionHolder; + private ExecutorService executor; private JsonMapping jsonMapping; private final List sources = new CopyOnWriteArrayList<>(); @@ -142,7 +146,7 @@ public Flow.Publisher> getPublisher(Config config) { JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config); JmsResourceHolder holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic)); contexts.add(holder); - JmsSource source = new JmsSource(holder, ic, jsonMapping, executor, failureHandlerFactories); + JmsSource source = new JmsSource(executionHolder.vertx(), holder, ic, jsonMapping, executor, failureHandlerFactories); sources.add(source); return source.getSource(); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java index dd8d293a49..3b047df0f8 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java @@ -21,6 +21,8 @@ import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler; import io.smallrye.reactive.messaging.json.JsonMapping; +import io.vertx.core.impl.VertxInternal; +import io.vertx.mutiny.core.Context; import io.vertx.mutiny.core.Vertx; class JmsSource { @@ -33,8 +35,10 @@ class JmsSource { private final Instance failureHandlerFactories; private final JmsConnectorIncomingConfiguration config; private final JmsFailureHandler failureHandler; + private final Context context; - JmsSource(JmsResourceHolder resourceHolder, JmsConnectorIncomingConfiguration config, JsonMapping jsonMapping, + JmsSource(Vertx vertx, JmsResourceHolder resourceHolder, JmsConnectorIncomingConfiguration config, + JsonMapping jsonMapping, Executor executor, Instance failureHandlerFactories) { String channel = config.getChannel(); final String destinationName = config.getDestination().orElseGet(config::getChannel); @@ -59,8 +63,10 @@ class JmsSource { resourceHolder.getClient(); this.publisher = new JmsPublisher(resourceHolder); this.failureHandlerFactories = failureHandlerFactories; - this.failureHandler = createFailureHandler(Vertx.vertx()); //FIXME Find correct vertx instance + this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); + this.failureHandler = createFailureHandler(); source = Multi.createFrom().publisher(publisher) + .emitOn(r -> context.runOnContext(r)) .> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping, failureHandler)) .onFailure(t -> { log.terminalErrorOnChannel(channel); @@ -105,12 +111,12 @@ Multi> getSource() { return source; } - private JmsFailureHandler createFailureHandler(Vertx vertx) { + private JmsFailureHandler createFailureHandler() { String strategy = config.getFailureStrategy(); Instance failureHandlerFactory = failureHandlerFactories .select(Identifier.Literal.of(strategy)); if (failureHandlerFactory.isResolvable()) { - return failureHandlerFactory.get().create(config, vertx, resourceHolder.getClient(), this::reportFailure); + return failureHandlerFactory.get().create(config, this::reportFailure); } else { throw ex.illegalArgumentInvalidFailureStrategy(strategy); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailStop.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailStop.java index 00b6d3639d..da61f19d54 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailStop.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailStop.java @@ -5,7 +5,6 @@ import java.util.function.BiConsumer; import jakarta.enterprise.context.ApplicationScoped; -import jakarta.jms.JMSConsumer; import org.eclipse.microprofile.reactive.messaging.Metadata; @@ -13,7 +12,6 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.jms.IncomingJmsMessage; import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration; -import io.vertx.mutiny.core.Vertx; public class JmsFailStop implements JmsFailureHandler { @@ -25,7 +23,7 @@ public class JmsFailStop implements JmsFailureHandler { public static class Factory implements JmsFailureHandler.Factory { @Override - public JmsFailureHandler create(JmsConnectorIncomingConfiguration config, Vertx vertx, JMSConsumer consumer, + public JmsFailureHandler create(JmsConnectorIncomingConfiguration config, BiConsumer reportFailure) { return new JmsFailStop(config.getChannel(), reportFailure); } @@ -37,13 +35,12 @@ public JmsFailStop(String channel, BiConsumer reportF } @Override - public Uni handle( - IncomingJmsMessage message, Throwable reason, Metadata metadata) { + public Uni handle(IncomingJmsMessage message, Throwable reason, Metadata metadata) { // We don't commit, we just fail and stop the client. log.messageNackedFailStop(channel); // report failure to the connector for health check reportFailure.accept(reason, true); - return Uni.createFrom(). failure(reason); - //.emitOn(message::runOnMessageContext); + return Uni.createFrom(). failure(reason) + .emitOn(message::runOnMessageContext); } } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailureHandler.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailureHandler.java index 7ed097f9d7..e76aa8d4df 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailureHandler.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailureHandler.java @@ -2,15 +2,12 @@ import java.util.function.BiConsumer; -import jakarta.jms.JMSConsumer; - import org.eclipse.microprofile.reactive.messaging.Metadata; import io.smallrye.common.annotation.Experimental; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.jms.IncomingJmsMessage; import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration; -import io.vertx.mutiny.core.Vertx; /** * Jms Failure handling strategy @@ -34,8 +31,6 @@ interface Strategy { interface Factory { JmsFailureHandler create( JmsConnectorIncomingConfiguration config, - Vertx vertx, - JMSConsumer consumer, BiConsumer reportFailure); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsIgnoreFailure.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsIgnoreFailure.java index 900bf4c658..e6ab89ad36 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsIgnoreFailure.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsIgnoreFailure.java @@ -5,7 +5,6 @@ import java.util.function.BiConsumer; import jakarta.enterprise.context.ApplicationScoped; -import jakarta.jms.JMSConsumer; import org.eclipse.microprofile.reactive.messaging.Metadata; @@ -13,7 +12,6 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.jms.IncomingJmsMessage; import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration; -import io.vertx.mutiny.core.Vertx; public class JmsIgnoreFailure implements JmsFailureHandler { @@ -24,8 +22,8 @@ public class JmsIgnoreFailure implements JmsFailureHandler { public static class Factory implements JmsFailureHandler.Factory { @Override - public JmsFailureHandler create(JmsConnectorIncomingConfiguration config, Vertx vertx, - JMSConsumer consumer, BiConsumer reportFailure) { + public JmsFailureHandler create(JmsConnectorIncomingConfiguration config, + BiConsumer reportFailure) { return new JmsIgnoreFailure(config.getChannel()); } } @@ -35,11 +33,11 @@ public JmsIgnoreFailure(String channel) { } @Override - public Uni handle( - IncomingJmsMessage message, Throwable reason, Metadata metadata) { + public Uni handle(IncomingJmsMessage message, Throwable reason, Metadata metadata) { // We commit the message, log and continue log.messageNackedIgnore(channel, reason.getMessage()); log.messageNackedFullIgnored(reason); - return Uni.createFrom().completionStage(message.ack()); + return Uni.createFrom().completionStage(message.ack()) + .emitOn(message::runOnMessageContext); } } diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java index bf48b5b6ca..4b5a0e7f7a 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java @@ -27,6 +27,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.support.JmsTestBase; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.core.Vertx; @SuppressWarnings("ReactiveStreamsSubscriberImplementation") public class JmsSourceTest extends JmsTestBase { @@ -185,7 +186,7 @@ public void testReceptionOfMultipleMessages() { @Test public void testMultipleRequests() { - JmsSource source = new JmsSource(getResourceHolder("queue"), + JmsSource source = new JmsSource(Vertx.vertx(), getResourceHolder("queue"), new JmsConnectorIncomingConfiguration(new MapBasedConfig().put("channel-name", "queue")), null, null, failureHandlerFactories); Publisher> publisher = source.getSource(); @@ -238,7 +239,7 @@ public void onComplete() { @Test public void testBroadcast() { - JmsSource source = new JmsSource(getResourceHolder("queue"), + JmsSource source = new JmsSource(Vertx.vertx(), getResourceHolder("queue"), new JmsConnectorIncomingConfiguration(new MapBasedConfig() .with("channel-name", "queue").with("broadcast", true)), null, null, failureHandlerFactories); diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationAckTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationAckTest.java new file mode 100644 index 0000000000..38d965d394 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationAckTest.java @@ -0,0 +1,138 @@ +package io.smallrye.reactive.messaging.jms; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.lang.reflect.Method; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.jms.ConnectionFactory; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.smallrye.reactive.messaging.support.JmsTestBase; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.core.Vertx; + +public class LocalPropagationAckTest extends JmsTestBase { + + private WeldContainer container; + + private String destination; + + @BeforeEach + public void initTopic(TestInfo testInfo) { + String cn = testInfo.getTestClass().map(Class::getSimpleName).orElse(UUID.randomUUID().toString()); + String mn = testInfo.getTestMethod().map(Method::getName).orElse(UUID.randomUUID().toString()); + destination = cn + "-" + mn + "-" + UUID.randomUUID().getMostSignificantBits(); + } + + private MapBasedConfig dataconfig() { + return new MapBasedConfig() + .with("mp.messaging.incoming.data.connector", JmsConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.data.destination", destination) + .with("mp.messaging.incoming.data.durable", false) + .with("mp.messaging.incoming.data.tracing.enabled", false); + } + + private void produceIntegers() { + ConnectionFactory cf = container.getBeanManager().createInstance().select(ConnectionFactory.class).get(); + AtomicInteger counter = new AtomicInteger(1); + produceIntegers(cf, destination, 5, counter::getAndIncrement); + } + + private T runApplication(MapBasedConfig config, Class beanClass) { + config.write(); + container = deploy(beanClass); + + return container.getBeanManager().createInstance().select(beanClass).get(); + } + + @Test + public void testChannelWithAckOnMessageContext() { + IncomingChannelWithAckOnMessageContext bean = runApplication(dataconfig(), + IncomingChannelWithAckOnMessageContext.class); + bean.process(i -> i + 1); + + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> bean.getResults().size() >= 5); + assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6); + } + + @Test + public void testChannelWithNackOnMessageContextFail() { + IncomingChannelWithAckOnMessageContext bean = runApplication(dataconfig() + .with("mp.messaging.incoming.data.failure-strategy", "fail"), + IncomingChannelWithAckOnMessageContext.class); + bean.process(i -> { + throw new RuntimeException("boom"); + }); + + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> bean.getResults().size() >= 1); + assertThat(bean.getResults()).contains(1); + } + + @Test + public void testChannelWithNackOnMessageContextIgnore() { + IncomingChannelWithAckOnMessageContext bean = runApplication(dataconfig() + .with("mp.messaging.incoming.data.failure-strategy", "ignore"), + IncomingChannelWithAckOnMessageContext.class); + bean.process(i -> { + throw new RuntimeException("boom"); + }); + + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> bean.getResults().size() >= 5); + assertThat(bean.getResults()).containsExactly(1, 2, 3, 4, 5); + } + + @ApplicationScoped + public static class IncomingChannelWithAckOnMessageContext { + + private final List list = new CopyOnWriteArrayList<>(); + + @Inject + @Channel("data") + Multi> incoming; + + void process(Function mapper) { + incoming.onItem() + .transformToUniAndConcatenate(msg -> Uni.createFrom() + .item(() -> msg.withPayload(mapper.apply(msg.getPayload()))) + .chain(m -> Uni.createFrom().completionStage(m.ack()).replaceWith(m)) + .onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(msg.nack(t)) + .onItemOrFailure().transform((unused, throwable) -> msg))) + .subscribe().with(m -> { + m.getMetadata(LocalContextMetadata.class).map(LocalContextMetadata::context).ifPresent(context -> { + if (Vertx.currentContext().getDelegate() == context) { + list.add(m.getPayload()); + } + }); + }); + } + + public List getResults() { + return list; + } + } + +} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationTest.java new file mode 100644 index 0000000000..e1ecb5c7b0 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationTest.java @@ -0,0 +1,637 @@ +package io.smallrye.reactive.messaging.jms; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.lang.reflect.Method; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.jms.ConnectionFactory; + +import org.eclipse.microprofile.reactive.messaging.Acknowledgment; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +import io.smallrye.common.vertx.ContextLocals; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.reactive.messaging.annotations.Blocking; +import io.smallrye.reactive.messaging.annotations.Broadcast; +import io.smallrye.reactive.messaging.annotations.Merge; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.smallrye.reactive.messaging.support.JmsTestBase; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.core.impl.ConcurrentHashSet; +import io.vertx.mutiny.core.Vertx; + +public class LocalPropagationTest extends JmsTestBase { + + private WeldContainer container; + + private String destination; + + @BeforeEach + public void initTopic(TestInfo testInfo) { + String cn = testInfo.getTestClass().map(Class::getSimpleName).orElse(UUID.randomUUID().toString()); + String mn = testInfo.getTestMethod().map(Method::getName).orElse(UUID.randomUUID().toString()); + destination = cn + "-" + mn + "-" + UUID.randomUUID().getMostSignificantBits(); + } + + private MapBasedConfig dataconfig() { + return new MapBasedConfig() + .with("mp.messaging.incoming.data.connector", JmsConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.data.destination", destination) + .with("mp.messaging.incoming.data.durable", false) + .with("mp.messaging.incoming.data.tracing.enabled", false); + } + + private void produceIntegers() { + ConnectionFactory cf = container.getBeanManager().createInstance().select(ConnectionFactory.class).get(); + AtomicInteger counter = new AtomicInteger(1); + produceIntegers(cf, destination, 5, counter::getAndIncrement); + } + + private T runApplication(MapBasedConfig config, Class beanClass) { + config.write(); + container = deploy(beanClass); + + return container.getBeanManager().createInstance().select(beanClass).get(); + } + + @Test + public void testLinearPipeline() { + LinearPipeline bean = runApplication(dataconfig(), LinearPipeline.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6); + } + + @Test + public void testPipelineWithABlockingStage() { + PipelineWithABlockingStage bean = runApplication(dataconfig(), PipelineWithABlockingStage.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6); + + } + + @Test + public void testPipelineWithAnUnorderedBlockingStage() { + PipelineWithAnUnorderedBlockingStage bean = runApplication(dataconfig(), PipelineWithAnUnorderedBlockingStage.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactlyInAnyOrder(2, 3, 4, 5, 6); + + } + + @Test + public void testPipelineWithMultipleBlockingStages() { + PipelineWithMultipleBlockingStages bean = runApplication(dataconfig(), PipelineWithMultipleBlockingStages.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactlyInAnyOrder(2, 3, 4, 5, 6); + } + + @Test + public void testPipelineWithBroadcastAndMerge() { + PipelineWithBroadcastAndMerge bean = runApplication(dataconfig(), PipelineWithBroadcastAndMerge.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 10; + }); + assertThat(bean.getResults()).hasSize(10).contains(2, 3, 4, 5, 6); + } + + @Test + public void testLinearPipelineWithAckOnCustomThread() { + LinearPipelineWithAckOnCustomThread bean = runApplication(dataconfig(), LinearPipelineWithAckOnCustomThread.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6); + + } + + @Test + public void testPipelineWithAnAsyncStage() { + PipelineWithAnAsyncStage bean = runApplication(dataconfig(), PipelineWithAnAsyncStage.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6); + + } + + @ApplicationScoped + public static class LinearPipeline { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + return input.withPayload(input.getPayload() + 1); + } + + @Incoming("process") + @Outgoing("after-process") + public Integer handle(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class LinearPipelineWithAckOnCustomThread { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + private final Executor executor = Executors.newFixedThreadPool(4); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + return input.withPayload(input.getPayload() + 1) + .withAck(() -> { + CompletableFuture cf = new CompletableFuture<>(); + executor.execute(() -> { + cf.complete(null); + }); + return cf; + }); + } + + @Incoming("process") + @Outgoing("after-process") + public Integer handle(int payload) { + try { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + } catch (Exception e) { + e.printStackTrace(); + } + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + try { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + } catch (Exception e) { + e.printStackTrace(); + } + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithABlockingStage { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + @Incoming("process") + @Outgoing("after-process") + @Blocking + public Integer handle(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithAnUnorderedBlockingStage { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + private final Random random = new Random(); + + @Incoming("process") + @Outgoing("after-process") + @Blocking(ordered = false) + public Integer handle(int payload) throws InterruptedException { + Thread.sleep(random.nextInt(10)); + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithMultipleBlockingStages { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + private final Random random = new Random(); + + @Incoming("process") + @Outgoing("second-blocking") + @Blocking(ordered = false) + public Integer handle(int payload) throws InterruptedException { + Thread.sleep(random.nextInt(10)); + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("second-blocking") + @Outgoing("after-process") + @Blocking + public Integer handle2(int payload) throws InterruptedException { + Thread.sleep(random.nextInt(10)); + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithBroadcastAndMerge { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set branch1 = new ConcurrentHashSet<>(); + private final Set branch2 = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + @Broadcast(2) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + private final Random random = new Random(); + + @Incoming("process") + @Outgoing("after-process") + public Integer branch1(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + assertThat(branch1.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("process") + @Outgoing("after-process") + public Integer branch2(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + assertThat(branch2.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + @Merge + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithAnAsyncStage { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + assertThat((String) ContextLocals.get("uuid", null)).isNull(); + ContextLocals.put("uuid", value); + ContextLocals.put("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + @Incoming("process") + @Outgoing("after-process") + public Uni handle(int payload) { + String uuid = ContextLocals.get("uuid", null); + assertThat(uuid).isNotNull(); + + assertThat(uuids.add(uuid)).isTrue(); + + int p = ContextLocals.get("input", null); + assertThat(p + 1).isEqualTo(payload); + return Uni.createFrom().item(() -> payload) + .runSubscriptionOn(Infrastructure.getDefaultExecutor()); + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = ContextLocals.get("uuid", null); + assertThat(uuid).isNotNull(); + + int p = ContextLocals.get("input", null); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = ContextLocals.get("uuid", null); + assertThat(uuid).isNotNull(); + + int p = ContextLocals.get("input", null); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + +} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java index 8065d0f785..eb8979d974 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java @@ -1,6 +1,12 @@ package io.smallrye.reactive.messaging.support; +import java.util.function.Supplier; + import jakarta.enterprise.inject.Instance; +import jakarta.jms.ConnectionFactory; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSProducer; +import jakarta.jms.Queue; import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.weld.environment.se.Weld; @@ -116,4 +122,15 @@ protected void addConfig(MapBasedConfig config) { MapBasedConfig.cleanup(); } } + + public void produceIntegers(ConnectionFactory cf, String destination, int numberOfMessages, + Supplier messageSupplier) { + try (JMSContext context = cf.createContext()) { + JMSProducer producer = context.createProducer(); + Queue q = context.createQueue(destination); + for (int i = 0; i < numberOfMessages; i++) { + producer.send(q, messageSupplier.get()); + } + } + } } From 3d55060d12953a94466ba0ae8a04a5f0b4a934de Mon Sep 17 00:00:00 2001 From: Dan Kristensen <1816596+dankristensen@users.noreply.github.com> Date: Fri, 9 Aug 2024 14:11:31 +0200 Subject: [PATCH 4/4] Added dlq failure handler --- .../reactive/messaging/jms/JmsConnector.java | 5 +- .../messaging/jms/JmsPropertiesBuilder.java | 8 + .../reactive/messaging/jms/JmsSource.java | 7 +- .../messaging/jms/fault/JmsDlqFailure.java | 150 +++++++++ .../messaging/jms/fault/JmsFailStop.java | 3 +- .../jms/fault/JmsFailureHandler.java | 2 + .../messaging/jms/fault/JmsIgnoreFailure.java | 3 +- .../messaging/jms/i18n/JmsExceptions.java | 10 +- .../messaging/jms/i18n/JmsLogging.java | 5 + .../messaging/jms/impl/ConfigHelper.java | 309 ++++++++++++++++++ .../reactive/messaging/jms/JmsSourceTest.java | 4 +- .../jms/LocalPropagationAckTest.java | 24 +- .../messaging/support/JmsTestBase.java | 2 + 13 files changed, 521 insertions(+), 11 deletions(-) create mode 100644 smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsDlqFailure.java create mode 100644 smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/impl/ConfigHelper.java diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java index 53ce4670eb..136effb414 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java @@ -69,7 +69,7 @@ @ConnectorAttribute(name = "retry.max-delay", direction = Direction.INCOMING_AND_OUTGOING, description = "The maximum delay", type = "string", defaultValue = "PT10S") @ConnectorAttribute(name = "retry.jitter", direction = Direction.INCOMING_AND_OUTGOING, description = "How much the delay jitters as a multiplier between 0 and 1. The formula is current delay * jitter. For example, with a current delay of 2H, a jitter of 0.5 will result in an actual delay somewhere between 1H and 3H.", type = "double", defaultValue = "0.5") @ConnectorAttribute(name = "failure-strategy", type = "string", direction = Direction.INCOMING, description = "Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be `fail` (default), `ignore`, or `dead-letter-queue`", defaultValue = "fail") -@ConnectorAttribute(name = "dead-letter-queue.topic", type = "string", direction = Direction.INCOMING, description = "When the `failure-strategy` is set to `dead-letter-queue` indicates on which queue the message is sent. Defaults is `dead-letter-topic-$channel`") +@ConnectorAttribute(name = "dead-letter-queue.destination", type = "string", direction = Direction.INCOMING, description = "When the `failure-strategy` is set to `dead-letter-queue` indicates on which queue the message is sent. Defaults is `dead-letter-topic-$channel`") @ConnectorAttribute(name = "dead-letter-queue.producer-client-id", type = "string", direction = Direction.INCOMING, description = "When the `failure-strategy` is set to `dead-letter-queue` indicates what client id the generated producer should use. Defaults is `jms-dead-letter-topic-producer-$client-id`") public class JmsConnector implements InboundConnector, OutboundConnector { @@ -146,7 +146,8 @@ public Flow.Publisher> getPublisher(Config config) { JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config); JmsResourceHolder holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic)); contexts.add(holder); - JmsSource source = new JmsSource(executionHolder.vertx(), holder, ic, jsonMapping, executor, failureHandlerFactories); + JmsSource source = new JmsSource(this, executionHolder.vertx(), holder, ic, jsonMapping, executor, + failureHandlerFactories); sources.add(source); return source.getSource(); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsPropertiesBuilder.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsPropertiesBuilder.java index 87b63b1c76..1faf31287d 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsPropertiesBuilder.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsPropertiesBuilder.java @@ -82,6 +82,14 @@ public JmsPropertiesBuilder with(String key, String value) { return this; } + public JmsPropertiesBuilder with(String key, Object value) { + validate(key, value); + properties.put(key, new Property<>(key, + value, + JmsTask.wrap(m -> m.setObjectProperty(key, value)))); + return this; + } + public JmsPropertiesBuilder without(String key) { properties.remove(key); return this; diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java index 3b047df0f8..881b3d9e8f 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java @@ -36,10 +36,13 @@ class JmsSource { private final JmsConnectorIncomingConfiguration config; private final JmsFailureHandler failureHandler; private final Context context; + private final JmsConnector jmsConnector; - JmsSource(Vertx vertx, JmsResourceHolder resourceHolder, JmsConnectorIncomingConfiguration config, + JmsSource(JmsConnector jmsConnector, Vertx vertx, JmsResourceHolder resourceHolder, + JmsConnectorIncomingConfiguration config, JsonMapping jsonMapping, Executor executor, Instance failureHandlerFactories) { + this.jmsConnector = jmsConnector; String channel = config.getChannel(); final String destinationName = config.getDestination().orElseGet(config::getChannel); String selector = config.getSelector().orElse(null); @@ -116,7 +119,7 @@ private JmsFailureHandler createFailureHandler() { Instance failureHandlerFactory = failureHandlerFactories .select(Identifier.Literal.of(strategy)); if (failureHandlerFactory.isResolvable()) { - return failureHandlerFactory.get().create(config, this::reportFailure); + return failureHandlerFactory.get().create(jmsConnector, config, this::reportFailure); } else { throw ex.illegalArgumentInvalidFailureStrategy(strategy); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsDlqFailure.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsDlqFailure.java new file mode 100644 index 0000000000..6fda0f7813 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsDlqFailure.java @@ -0,0 +1,150 @@ +package io.smallrye.reactive.messaging.jms.fault; + +import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log; +import static io.smallrye.reactive.messaging.providers.wiring.Wiring.wireOutgoingConnectorToUpstream; +import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.INCOMING_PREFIX; + +import java.util.Enumeration; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow; +import java.util.function.BiConsumer; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import io.smallrye.common.annotation.Identifier; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor; +import io.smallrye.reactive.messaging.SubscriberDecorator; +import io.smallrye.reactive.messaging.jms.*; +import io.smallrye.reactive.messaging.jms.impl.ConfigHelper; +import io.smallrye.reactive.messaging.jms.impl.ImmutableJmsProperties; +import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig; +import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig; + +public class JmsDlqFailure implements JmsFailureHandler { + public static final String CHANNEL_DLQ_SUFFIX = "dead-letter-queue"; + public static final String DEAD_LETTER_EXCEPTION_CLASS_NAME = "dead-letter-exception-class-name"; + public static final String DEAD_LETTER_CAUSE_CLASS_NAME = "dead-letter-cause-class-name"; + public static final String DEAD_LETTER_REASON = "dead-letter-reason"; + public static final String DEAD_LETTER_CAUSE = "dead-letter-cause"; + + private final JmsConnectorIncomingConfiguration config; + private final String dlqDestination; + private final UnicastProcessor> dlqSource; + + @ApplicationScoped + @Identifier(Strategy.DEAD_LETTER_QUEUE) + public static class Factory implements JmsFailureHandler.Factory { + @Inject + Instance subscriberDecorators; + @Inject + Instance rootConfig; + @Inject + @Any + Instance> configurations; + + @Override + public JmsFailureHandler create(JmsConnector connector, JmsConnectorIncomingConfiguration config, + BiConsumer reportFailure) { + + Optional deadLetterQueueDestination = config.getDeadLetterQueueDestination(); + ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(), + JmsConnector.CONNECTOR_NAME, config.getChannel(), null, + Map.of("destination", c -> deadLetterQueueDestination.orElse("dead-letter-queue-" + config.getChannel()))); + + Config jmsConfig = ConfigHelper.retrieveChannelConfiguration(configurations, connectorConfig); + + JmsConnectorOutgoingConfiguration producerConfig = new JmsConnectorOutgoingConfiguration(jmsConfig); + String destination = producerConfig.getDestination().get(); + + String deadQueueDestination = config.getDeadLetterQueueDestination() + .orElse("dead-letter-queue-" + config.getChannel()); + + UnicastProcessor> processor = UnicastProcessor.create(); + Flow.Subscriber> subscriber = connector.getSubscriber(jmsConfig); + wireOutgoingConnectorToUpstream(processor, subscriber, subscriberDecorators, + producerConfig.getChannel() + "-" + CHANNEL_DLQ_SUFFIX); + + return new JmsDlqFailure(config, deadQueueDestination, processor); + } + } + + public JmsDlqFailure(JmsConnectorIncomingConfiguration config, String dlqDestination, + UnicastProcessor> dlqSource) { + this.config = config; + this.dlqDestination = dlqDestination; + this.dlqSource = dlqSource; + } + + @Override + public Uni handle(IncomingJmsMessage incomingMessage, Throwable reason, Metadata metadata) { + OutgoingJmsMessageMetadata outgoingJmsMessageMetadata = getOutgoingJmsMessageMetadata(incomingMessage, reason); + + Message dead = Message.of(incomingMessage.getPayload()); + dead.addMetadata(outgoingJmsMessageMetadata); + + log.messageNackedDeadLetter(config.getChannel(), dlqDestination); + + CompletableFuture future = new CompletableFuture<>(); + dlqSource.onNext(dead + .withAck(() -> { + return dead.ack().thenAccept(__ -> future.complete(null)); + }) + .withNack(throwable -> { + future.completeExceptionally(throwable); + return future; + })); + return Uni.createFrom().completionStage(future) + .emitOn(incomingMessage::runOnMessageContext); + } + + private OutgoingJmsMessageMetadata getOutgoingJmsMessageMetadata(IncomingJmsMessage message, Throwable reason) { + Optional optionalJmsProperties = message.getMetadata(IncomingJmsMessageMetadata.class).map(a -> { + return a.getProperties(); + }); + JmsProperties jmsProperties = optionalJmsProperties + .orElse(new ImmutableJmsProperties(message.unwrap(jakarta.jms.Message.class))); + Enumeration propertyNames = jmsProperties.getPropertyNames(); + JmsPropertiesBuilder jmsPropertiesBuilder = new JmsPropertiesBuilder(); + while (propertyNames.hasMoreElements()) { + String propertyName = propertyNames.nextElement(); + Object propertyValue = jmsProperties.getObjectProperty(propertyName); + jmsPropertiesBuilder.with(propertyName, propertyValue); + } + + jmsPropertiesBuilder.with(DEAD_LETTER_EXCEPTION_CLASS_NAME, reason.getClass().getName()); + jmsPropertiesBuilder.with(DEAD_LETTER_REASON, getThrowableMessage(reason)); + if (reason.getCause() != null) { + jmsPropertiesBuilder.with(DEAD_LETTER_CAUSE_CLASS_NAME, reason.getCause().getClass().getName()); + jmsPropertiesBuilder.with(DEAD_LETTER_CAUSE, getThrowableMessage(reason.getCause())); + } + + JmsProperties outgoingJmsProperties = jmsPropertiesBuilder.build(); + OutgoingJmsMessageMetadata outgoingJmsMessageMetadata = new OutgoingJmsMessageMetadata.OutgoingJmsMessageMetadataBuilder() + .withProperties(outgoingJmsProperties).build(); + return outgoingJmsMessageMetadata; + } + + @Override + public void terminate() { + dlqSource.cancel(); + //dlqSink.closeQuietly(); + } + + private String getThrowableMessage(Throwable throwable) { + String text = throwable.getMessage(); + if (text == null) { + text = throwable.toString(); + } + return text; + } +} diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailStop.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailStop.java index da61f19d54..3d7f498e2c 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailStop.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailStop.java @@ -11,6 +11,7 @@ import io.smallrye.common.annotation.Identifier; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.jms.IncomingJmsMessage; +import io.smallrye.reactive.messaging.jms.JmsConnector; import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration; public class JmsFailStop implements JmsFailureHandler { @@ -23,7 +24,7 @@ public class JmsFailStop implements JmsFailureHandler { public static class Factory implements JmsFailureHandler.Factory { @Override - public JmsFailureHandler create(JmsConnectorIncomingConfiguration config, + public JmsFailureHandler create(JmsConnector jmsConnector, JmsConnectorIncomingConfiguration config, BiConsumer reportFailure) { return new JmsFailStop(config.getChannel(), reportFailure); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailureHandler.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailureHandler.java index e76aa8d4df..56522706e4 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailureHandler.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsFailureHandler.java @@ -7,6 +7,7 @@ import io.smallrye.common.annotation.Experimental; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.jms.IncomingJmsMessage; +import io.smallrye.reactive.messaging.jms.JmsConnector; import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration; /** @@ -30,6 +31,7 @@ interface Strategy { */ interface Factory { JmsFailureHandler create( + JmsConnector connector, JmsConnectorIncomingConfiguration config, BiConsumer reportFailure); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsIgnoreFailure.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsIgnoreFailure.java index e6ab89ad36..645697b18f 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsIgnoreFailure.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsIgnoreFailure.java @@ -11,6 +11,7 @@ import io.smallrye.common.annotation.Identifier; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.jms.IncomingJmsMessage; +import io.smallrye.reactive.messaging.jms.JmsConnector; import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration; public class JmsIgnoreFailure implements JmsFailureHandler { @@ -22,7 +23,7 @@ public class JmsIgnoreFailure implements JmsFailureHandler { public static class Factory implements JmsFailureHandler.Factory { @Override - public JmsFailureHandler create(JmsConnectorIncomingConfiguration config, + public JmsFailureHandler create(JmsConnector jmsConnector, JmsConnectorIncomingConfiguration config, BiConsumer reportFailure) { return new JmsIgnoreFailure(config.getChannel()); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsExceptions.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsExceptions.java index b1f74820d9..b900f7dd14 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsExceptions.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsExceptions.java @@ -1,5 +1,7 @@ package io.smallrye.reactive.messaging.jms.i18n; +import java.util.NoSuchElementException; + import org.jboss.logging.Messages; import org.jboss.logging.annotations.Cause; import org.jboss.logging.annotations.Message; @@ -56,7 +58,13 @@ public interface JmsExceptions { @Message(id = 15613, value = "There is already a subscriber") IllegalStateException illegalStateAlreadySubscriber(); - @Message(id = 18614, value = "Invalid failure strategy: %s") + @Message(id = 15614, value = "Invalid failure strategy: %s") IllegalArgumentException illegalArgumentInvalidFailureStrategy(String strategy); + @Message(id = 15615, value = "The config property '%s' is required but it could not be found in any config source") + NoSuchElementException missingProperty(String propertyName); + + @Message(id = 15616, value = "Cannot convert property '%s' of type %s to %s") + NoSuchElementException cannotConvertProperty(String propertyName, Class type, Class targetType); + } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsLogging.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsLogging.java index 13ffd5089f..cc9ea8bcd0 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsLogging.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/i18n/JmsLogging.java @@ -55,4 +55,9 @@ public interface JmsLogging extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 15809, value = "A message sent to channel `%s` has been nacked, fail-stop") void messageNackedFailStop(String channel); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 15810, value = "A message sent to channel `%s` has been nacked, sending the message to a dead letter queue %s") + void messageNackedDeadLetter(String channel, String dlq); + } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/impl/ConfigHelper.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/impl/ConfigHelper.java new file mode 100644 index 0000000000..dfdea72d73 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/impl/ConfigHelper.java @@ -0,0 +1,309 @@ +package io.smallrye.reactive.messaging.jms.impl; + +import java.util.*; + +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.UnsatisfiedResolutionException; +import jakarta.enterprise.inject.literal.NamedLiteral; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigValue; +import org.eclipse.microprofile.config.spi.ConfigSource; +import org.eclipse.microprofile.config.spi.Converter; +import org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory; + +import io.smallrye.common.annotation.Identifier; +import io.smallrye.reactive.messaging.jms.i18n.JmsExceptions; +import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging; + +public class ConfigHelper { + + public static final String JMS_CONFIGURATION_NAME_ATTRIBUTE = "jms-configuration"; + public static final String DEFAULT_JMS_BROKER = "default-jms-broker"; + + private ConfigHelper() { + // Avoid direct instantiation. + } + + /** + * Computes the channel configuration + * The channel configuration is extracted from 3 places (from the most important to the less important): + *
    + *
  1. The channel configuration in the application configuration (the + * mp.messaging.[incoming|outgoing].channel.attr=value
  2. + *
  3. From a bean exposing a Map%lt;String, Object> exposed with @Identifier(channel). The content + * of this map is generated by the runtime. The name can be configured using the + * mp.messaging.[incoming|outgoing].channel.jms-configuration attribute
  4. + *
  5. The default jms configuration (generally the jms.attr=value properties), also exposed as an identified + * Map%lt;String, Object>.
  6. + *
+ * + * @param config the received config + * @return the computed configuration + */ + public static Config retrieveChannelConfiguration(Instance> instances, Config config) { + // Retrieve the default jms configuration (3) + Map defaultJmsConfig = ConfigHelper.retrieveDefaultJmsConfig(instances); + // Retrieve the channel jms configuration (2) + Map channelSpecificConfig = ConfigHelper.getChannelSpecificConfig(instances, config); + + return ConfigHelper.merge(config, channelSpecificConfig, defaultJmsConfig); + } + + public static Config merge(Config passedCfg, Map defaultJmsCfg) { + return new Config() { + @SuppressWarnings("unchecked") + @Override + public T getValue(String propertyName, Class propertyType) { + T passedCgfValue = passedCfg.getOptionalValue(propertyName, propertyType).orElse(null); + if (passedCgfValue == null) { + Object o = defaultJmsCfg.get(propertyName); + if (o == null) { + throw JmsExceptions.ex.missingProperty(propertyName); + } + if (propertyType.isInstance(o)) { + return (T) o; + } + if (o instanceof String) { + Optional> converter = passedCfg.getConverter(propertyType); + return converter.map(conv -> conv.convert(o.toString())) + .orElseThrow(() -> new NoSuchElementException(propertyName)); + } + throw JmsExceptions.ex.cannotConvertProperty(propertyName, o.getClass(), propertyType); + } else { + return passedCgfValue; + } + } + + @Override + public ConfigValue getConfigValue(String propertyName) { + return passedCfg.getConfigValue(propertyName); + } + + @SuppressWarnings("unchecked") + @Override + public Optional getOptionalValue(String propertyName, Class propertyType) { + Optional passedCfgValue = passedCfg.getOptionalValue(propertyName, propertyType); + if (!passedCfgValue.isPresent()) { + Object o = defaultJmsCfg.get(propertyName); + if (o == null) { + return Optional.empty(); + } + if (propertyType.isInstance(o)) { + return Optional.of((T) o); + } + if (o instanceof String) { + Optional> converter = passedCfg.getConverter(propertyType); + return converter.map(conv -> conv.convert(o.toString())); + } + return Optional.empty(); + } else { + return passedCfgValue; + } + } + + @Override + public Iterable getPropertyNames() { + Iterable names = passedCfg.getPropertyNames(); + Set result = new HashSet<>(); + names.forEach(result::add); + result.addAll(defaultJmsCfg.keySet()); + return result; + } + + @Override + public Iterable getConfigSources() { + return passedCfg.getConfigSources(); + } + + @Override + public Optional> getConverter(Class forType) { + return passedCfg.getConverter(forType); + } + + @Override + public T unwrap(Class type) { + return passedCfg.unwrap(type); + } + }; + } + + /** + * Returns a {@code Config} instance merging the values from the 3 sources (the channel configuration, + * a map specific to the channel and the global jms configuration). + * + * @param passedCfg the channel configuration (high priority) + * @param namedConfig the channel specific configuration (medium priority) + * @param defaultJmsCfg the default jms configuration (low priority) + * @return the computed config. + */ + public static Config merge(Config passedCfg, + Map namedConfig, + Map defaultJmsCfg) { + + if (namedConfig.isEmpty() && defaultJmsCfg.isEmpty()) { + return passedCfg; + } + + return new Config() { + + private T extractValue(String name, Class clazz, boolean failIfMissing) { + Object value = passedCfg.getOptionalValue(name, clazz).orElse(null); + if (value != null) { + //noinspection unchecked + return (T) value; + } + + value = namedConfig.getOrDefault(name, defaultJmsCfg.get(name)); + if (value == null) { + if (failIfMissing) { + throw JmsExceptions.ex.missingProperty(name); + } + return null; + } + + if (clazz.isInstance(value)) { + //noinspection unchecked + return (T) value; + } + + // Attempt a conversion + if (value instanceof String) { + String v = (String) value; + Optional> converter = passedCfg.getConverter(clazz); + if (converter.isPresent()) { + return converter.get().convert(v); + } + if (failIfMissing) { + throw JmsExceptions.ex.missingProperty(name); + } + return null; + } + + if (failIfMissing) { + throw JmsExceptions.ex.cannotConvertProperty(name, value.getClass(), clazz); + } else { + return null; + } + } + + @Override + public T getValue(String propertyName, Class propertyType) { + return extractValue(propertyName, propertyType, true); + } + + @Override + public ConfigValue getConfigValue(String propertyName) { + // We only compute ConfigValue for the original config. + return passedCfg.getConfigValue(propertyName); + } + + @Override + public Optional getOptionalValue(String propertyName, Class propertyType) { + T value = extractValue(propertyName, propertyType, false); + return Optional.ofNullable(value); + } + + @Override + public Iterable getPropertyNames() { + Set result = new HashSet<>(); + + // First global + result.addAll(defaultJmsCfg.keySet()); + + // Configured name + result.addAll(namedConfig.keySet()); + + // Channel + Iterable names = passedCfg.getPropertyNames(); + names.forEach(result::add); + + return result; + } + + @Override + public Iterable getConfigSources() { + return passedCfg.getConfigSources(); + } + + @Override + public Optional> getConverter(Class forType) { + return passedCfg.getConverter(forType); + } + + @Override + public T unwrap(Class type) { + return passedCfg.unwrap(type); + } + }; + } + + /** + * Retrieves the default jms configuration if any. It looks for a {@code Map%lt;String, Object>} identified by + * {@link #DEFAULT_JMS_BROKER}. + * + * @param instances the instances of map exposed as bean + * @return the map, empty if the lookup fails + */ + public static Map retrieveDefaultJmsConfig(Instance> instances) { + Instance> defaultJmsConfigurationInstance = instances + .select(Identifier.Literal.of(DEFAULT_JMS_BROKER)); + if (defaultJmsConfigurationInstance.isUnsatisfied()) { + // Try with @Named, this will be removed when @Named support will be removed. + defaultJmsConfigurationInstance = instances.select(NamedLiteral.of(DEFAULT_JMS_BROKER)); + if (!defaultJmsConfigurationInstance.isUnsatisfied()) { + ProviderLogging.log.deprecatedNamed(); + } + } + + Map defaultJmsConfig = Collections.emptyMap(); + if (!defaultJmsConfigurationInstance.isUnsatisfied()) { + defaultJmsConfig = defaultJmsConfigurationInstance.get(); + } + return defaultJmsConfig; + } + + /** + * Looks for a {@code Map%lt;String, Object>} for the given channel. The map is identified using {@link Identifier}. + * The identifier value is either configured in the channel configuration, or is the channel name. + * + * @param instances the instances of map exposed as bean + * @param config the channel configuration + * @return the map, empty if the lookup fails. + */ + public static Map getChannelSpecificConfig(Instance> instances, Config config) { + Optional name = config.getOptionalValue(JMS_CONFIGURATION_NAME_ATTRIBUTE, String.class); + Optional channel = config.getOptionalValue(ConnectorFactory.CHANNEL_NAME_ATTRIBUTE, String.class); + String channelName = channel.orElse(null); + Map channelSpecificConfig = Collections.emptyMap(); + if (name.isPresent()) { + channelSpecificConfig = lookupForIdentifiedConfiguration(instances, name.get(), false); + } else if (channelName != null) { + channelSpecificConfig = lookupForIdentifiedConfiguration(instances, channelName, true); + } + return channelSpecificConfig; + } + + /** + * Looks for a CDI bean of type {@code Map} identified using the given named. + * If the lookup fails and {@code optional} is {@code true}, an {@link UnsatisfiedResolutionException} is thrown. + * Otherwise, an empty {@code Map} is returned. + * + * @param identifier the identifier + * @param optional whether the lookup is optional + * @return the result + */ + public static Map lookupForIdentifiedConfiguration(Instance> instances, + String identifier, boolean optional) { + Instance> instance = instances.select(Identifier.Literal.of(identifier)); + if (instance.isUnsatisfied()) { + if (!optional) { + throw new UnsatisfiedResolutionException("Cannot find the Jms configuration: " + identifier); + } else { + return Collections.emptyMap(); + } + } else { + return instance.get(); + } + } +} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java index 4b5a0e7f7a..5a4260aaa9 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java @@ -186,7 +186,7 @@ public void testReceptionOfMultipleMessages() { @Test public void testMultipleRequests() { - JmsSource source = new JmsSource(Vertx.vertx(), getResourceHolder("queue"), + JmsSource source = new JmsSource(null, Vertx.vertx(), getResourceHolder("queue"), new JmsConnectorIncomingConfiguration(new MapBasedConfig().put("channel-name", "queue")), null, null, failureHandlerFactories); Publisher> publisher = source.getSource(); @@ -239,7 +239,7 @@ public void onComplete() { @Test public void testBroadcast() { - JmsSource source = new JmsSource(Vertx.vertx(), getResourceHolder("queue"), + JmsSource source = new JmsSource(null, Vertx.vertx(), getResourceHolder("queue"), new JmsConnectorIncomingConfiguration(new MapBasedConfig() .with("channel-name", "queue").with("broadcast", true)), null, null, failureHandlerFactories); diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationAckTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationAckTest.java index 38d965d394..901fc87d64 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationAckTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationAckTest.java @@ -24,6 +24,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler; import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; import io.smallrye.reactive.messaging.support.JmsTestBase; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -78,7 +79,7 @@ public void testChannelWithAckOnMessageContext() { @Test public void testChannelWithNackOnMessageContextFail() { IncomingChannelWithAckOnMessageContext bean = runApplication(dataconfig() - .with("mp.messaging.incoming.data.failure-strategy", "fail"), + .with("mp.messaging.incoming.data.failure-strategy", JmsFailureHandler.Strategy.FAIL), IncomingChannelWithAckOnMessageContext.class); bean.process(i -> { throw new RuntimeException("boom"); @@ -93,7 +94,7 @@ public void testChannelWithNackOnMessageContextFail() { @Test public void testChannelWithNackOnMessageContextIgnore() { IncomingChannelWithAckOnMessageContext bean = runApplication(dataconfig() - .with("mp.messaging.incoming.data.failure-strategy", "ignore"), + .with("mp.messaging.incoming.data.failure-strategy", JmsFailureHandler.Strategy.IGNORE), IncomingChannelWithAckOnMessageContext.class); bean.process(i -> { throw new RuntimeException("boom"); @@ -105,6 +106,25 @@ public void testChannelWithNackOnMessageContextIgnore() { assertThat(bean.getResults()).containsExactly(1, 2, 3, 4, 5); } + @Test + public void testChannelWithNackOnMessageContextDlq() { + IncomingChannelWithAckOnMessageContext bean = runApplication(dataconfig() + .with("mp.messaging.incoming.data.dead-letter-queue.destination", "dlqDestination") + .with("mp.messaging.incoming.data.failure-strategy", JmsFailureHandler.Strategy.DEAD_LETTER_QUEUE), + IncomingChannelWithAckOnMessageContext.class); + bean.process(i -> { + if (i != 3) { + return i + 1; + } + throw new RuntimeException("boom"); + }); + + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> bean.getResults().size() >= 4); + assertThat(bean.getResults()).containsExactly(2, 3, 5, 6); + } + @ApplicationScoped public static class IncomingChannelWithAckOnMessageContext { diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java index eb8979d974..e930ef77e1 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/support/JmsTestBase.java @@ -18,6 +18,7 @@ import io.smallrye.config.inject.ConfigExtension; import io.smallrye.reactive.messaging.jms.JmsConnector; import io.smallrye.reactive.messaging.jms.TestMapping; +import io.smallrye.reactive.messaging.jms.fault.JmsDlqFailure; import io.smallrye.reactive.messaging.jms.fault.JmsFailStop; import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler; import io.smallrye.reactive.messaging.jms.fault.JmsIgnoreFailure; @@ -101,6 +102,7 @@ protected Weld initWithoutConnectionFactory() { weld.addBeanClass(ExecutionHolder.class); weld.addBeanClass(JmsFailStop.Factory.class); weld.addBeanClass(JmsIgnoreFailure.Factory.class); + weld.addBeanClass(JmsDlqFailure.Factory.class); weld.addBeanClass(WorkerPoolRegistry.class); weld.addBeanClass(HealthCenter.class); weld.addBeanClass(Wiring.class);