Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JMS: WIP: Initial commit for fault strategies #2599

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import io.smallrye.config.SmallRyeConfigProviderResolver;
import io.smallrye.config.inject.ConfigExtension;
import io.smallrye.reactive.messaging.jms.JmsConnector;
import io.smallrye.reactive.messaging.jms.fault.JmsFailStop;
import io.smallrye.reactive.messaging.jms.fault.JmsIgnoreFailure;
import io.smallrye.reactive.messaging.json.jackson.JacksonMapping;
import io.smallrye.reactive.messaging.json.jackson.ObjectMapperProvider;
import io.smallrye.reactive.messaging.providers.MediatorFactory;
Expand Down Expand Up @@ -96,7 +98,8 @@ protected Weld initWithoutConnectionFactory() {
weld.addBeanClass(EmitterFactoryImpl.class);
weld.addBeanClass(MutinyEmitterFactoryImpl.class);
weld.addBeanClass(LegacyEmitterFactoryImpl.class);

weld.addBeanClass(JmsFailStop.Factory.class);
weld.addBeanClass(JmsIgnoreFailure.Factory.class);
weld.addBeanClass(JmsConnector.class);
weld.addBeanClass(ObjectMapperProvider.class);
weld.addBeanClass(JacksonMapping.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.reactive.messaging.jms;

import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex;
import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
Expand All @@ -12,20 +13,24 @@
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;

public class IncomingJmsMessage<T> implements org.eclipse.microprofile.reactive.messaging.Message<T> {
public class IncomingJmsMessage<T> implements ContextAwareMessage<T> {
private final Message delegate;
private final Executor executor;
private final Class<T> clazz;
private final JsonMapping jsonMapping;
private final IncomingJmsMessageMetadata jmsMetadata;
private final Metadata metadata;
private final JmsFailureHandler failureHandler;

IncomingJmsMessage(Message message, Executor executor, JsonMapping jsonMapping) {
IncomingJmsMessage(Message message, Executor executor, JsonMapping jsonMapping, JmsFailureHandler failureHandler) {
this.delegate = message;
this.jsonMapping = jsonMapping;
this.executor = executor;
this.failureHandler = failureHandler;
String cn = null;
try {
cn = message.getStringProperty("_classname");
Expand All @@ -42,7 +47,7 @@ public class IncomingJmsMessage<T> implements org.eclipse.microprofile.reactive.
}

this.jmsMetadata = new IncomingJmsMessageMetadata(message);
this.metadata = Metadata.of(this.jmsMetadata);
this.metadata = captureContextMetadata(this.jmsMetadata);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -119,6 +124,7 @@ public CompletionStage<Void> ack(Metadata metadata) {
}
})
.runSubscriptionOn(executor)
.emitOn(this::runOnMessageContext)
.subscribeAsCompletionStage();
}

Expand All @@ -127,6 +133,11 @@ public Metadata getMetadata() {
return metadata;
}

@Override
public CompletionStage<Void> nack(Throwable reason, Metadata metadata) {
return failureHandler.handle(this, reason, metadata).subscribeAsCompletionStage();
}

@SuppressWarnings({ "unchecked" })
@Override
public <C> C unwrap(Class<C> unwrapType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;

@ApplicationScoped
Expand Down Expand Up @@ -66,6 +68,9 @@
@ConnectorAttribute(name = "retry.initial-delay", direction = Direction.INCOMING_AND_OUTGOING, description = "The initial delay for the retry.", type = "string", defaultValue = "PT1S")
@ConnectorAttribute(name = "retry.max-delay", direction = Direction.INCOMING_AND_OUTGOING, description = "The maximum delay", type = "string", defaultValue = "PT10S")
@ConnectorAttribute(name = "retry.jitter", direction = Direction.INCOMING_AND_OUTGOING, description = "How much the delay jitters as a multiplier between 0 and 1. The formula is current delay * jitter. For example, with a current delay of 2H, a jitter of 0.5 will result in an actual delay somewhere between 1H and 3H.", type = "double", defaultValue = "0.5")
@ConnectorAttribute(name = "failure-strategy", type = "string", direction = Direction.INCOMING, description = "Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be `fail` (default), `ignore`, or `dead-letter-queue`", defaultValue = "fail")
@ConnectorAttribute(name = "dead-letter-queue.destination", type = "string", direction = Direction.INCOMING, description = "When the `failure-strategy` is set to `dead-letter-queue` indicates on which queue the message is sent. Defaults is `dead-letter-topic-$channel`")
@ConnectorAttribute(name = "dead-letter-queue.producer-client-id", type = "string", direction = Direction.INCOMING, description = "When the `failure-strategy` is set to `dead-letter-queue` indicates what client id the generated producer should use. Defaults is `jms-dead-letter-topic-producer-$client-id`")
public class JmsConnector implements InboundConnector, OutboundConnector {

/**
Expand Down Expand Up @@ -100,6 +105,13 @@ public class JmsConnector implements InboundConnector, OutboundConnector {
@ConfigProperty(name = "smallrye.jms.threads.ttl", defaultValue = DEFAULT_THREAD_TTL)
int ttl;

@Inject
@Any
Instance<JmsFailureHandler.Factory> failureHandlerFactories;

@Inject
ExecutionHolder executionHolder;

private ExecutorService executor;
private JsonMapping jsonMapping;
private final List<JmsSource> sources = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -134,7 +146,8 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config);
JmsResourceHolder<JMSConsumer> holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic));
contexts.add(holder);
JmsSource source = new JmsSource(holder, ic, jsonMapping, executor);
JmsSource source = new JmsSource(this, executionHolder.vertx(), holder, ic, jsonMapping, executor,
failureHandlerFactories);
sources.add(source);
return source.getSource();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ public JmsPropertiesBuilder with(String key, String value) {
return this;
}

public JmsPropertiesBuilder with(String key, Object value) {
validate(key, value);
properties.put(key, new Property<>(key,
value,
JmsTask.wrap(m -> m.setObjectProperty(key, value))));
return this;
}

public JmsPropertiesBuilder without(String key) {
properties.remove(key);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,53 @@
import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.enterprise.inject.Instance;
import jakarta.jms.*;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

class JmsSource {

private final Multi<IncomingJmsMessage<?>> source;
private final JmsResourceHolder<JMSConsumer> resourceHolder;
private final List<Throwable> failures = new ArrayList<>();

private final JmsPublisher publisher;

JmsSource(JmsResourceHolder<JMSConsumer> resourceHolder, JmsConnectorIncomingConfiguration config, JsonMapping jsonMapping,
Executor executor) {
private final Instance<JmsFailureHandler.Factory> failureHandlerFactories;
private final JmsConnectorIncomingConfiguration config;
private final JmsFailureHandler failureHandler;
private final Context context;
private final JmsConnector jmsConnector;

JmsSource(JmsConnector jmsConnector, Vertx vertx, JmsResourceHolder<JMSConsumer> resourceHolder,
JmsConnectorIncomingConfiguration config,
JsonMapping jsonMapping,
Executor executor, Instance<JmsFailureHandler.Factory> failureHandlerFactories) {
this.jmsConnector = jmsConnector;
String channel = config.getChannel();
final String destinationName = config.getDestination().orElseGet(config::getChannel);
String selector = config.getSelector().orElse(null);
boolean nolocal = config.getNoLocal();
boolean durable = config.getDurable();
String type = config.getDestinationType();
boolean retry = config.getRetry();
this.config = config;
this.resourceHolder = resourceHolder.configure(r -> getDestination(r.getContext(), destinationName, type),
r -> {
if (durable) {
Expand All @@ -47,8 +65,12 @@ class JmsSource {
});
resourceHolder.getClient();
this.publisher = new JmsPublisher(resourceHolder);
this.failureHandlerFactories = failureHandlerFactories;
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.failureHandler = createFailureHandler();
source = Multi.createFrom().publisher(publisher)
.<IncomingJmsMessage<?>> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping))
.emitOn(r -> context.runOnContext(r))
.<IncomingJmsMessage<?>> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping, failureHandler))
.onFailure(t -> {
log.terminalErrorOnChannel(channel);
this.resourceHolder.close();
Expand Down Expand Up @@ -92,6 +114,30 @@ Multi<IncomingJmsMessage<?>> getSource() {
return source;
}

private JmsFailureHandler createFailureHandler() {
String strategy = config.getFailureStrategy();
Instance<JmsFailureHandler.Factory> failureHandlerFactory = failureHandlerFactories
.select(Identifier.Literal.of(strategy));
if (failureHandlerFactory.isResolvable()) {
return failureHandlerFactory.get().create(jmsConnector, config, this::reportFailure);
} else {
throw ex.illegalArgumentInvalidFailureStrategy(strategy);
}
}

public synchronized void reportFailure(Throwable failure, boolean fatal) {
//log.failureReported(topics, failure);
// Don't keep all the failures, there are only there for reporting.
if (failures.size() == 10) {
failures.remove(0);
}
failures.add(failure);

if (fatal) {
close();
}
}

@SuppressWarnings("PublisherImplementation")
private static class JmsPublisher implements Flow.Publisher<Message>, Flow.Subscription {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package io.smallrye.reactive.messaging.jms.fault;

import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;
import static io.smallrye.reactive.messaging.providers.wiring.Wiring.wireOutgoingConnectorToUpstream;
import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.INCOMING_PREFIX;

import java.util.Enumeration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.SubscriberDecorator;
import io.smallrye.reactive.messaging.jms.*;
import io.smallrye.reactive.messaging.jms.impl.ConfigHelper;
import io.smallrye.reactive.messaging.jms.impl.ImmutableJmsProperties;
import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig;
import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig;

public class JmsDlqFailure implements JmsFailureHandler {
public static final String CHANNEL_DLQ_SUFFIX = "dead-letter-queue";
public static final String DEAD_LETTER_EXCEPTION_CLASS_NAME = "dead-letter-exception-class-name";
public static final String DEAD_LETTER_CAUSE_CLASS_NAME = "dead-letter-cause-class-name";
public static final String DEAD_LETTER_REASON = "dead-letter-reason";
public static final String DEAD_LETTER_CAUSE = "dead-letter-cause";

private final JmsConnectorIncomingConfiguration config;
private final String dlqDestination;
private final UnicastProcessor<Message<?>> dlqSource;

@ApplicationScoped
@Identifier(Strategy.DEAD_LETTER_QUEUE)
public static class Factory implements JmsFailureHandler.Factory {
@Inject
Instance<SubscriberDecorator> subscriberDecorators;
@Inject
Instance<Config> rootConfig;
@Inject
@Any
Instance<Map<String, Object>> configurations;

@Override
public JmsFailureHandler create(JmsConnector connector, JmsConnectorIncomingConfiguration config,
BiConsumer<Throwable, Boolean> reportFailure) {

Optional<String> deadLetterQueueDestination = config.getDeadLetterQueueDestination();
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
JmsConnector.CONNECTOR_NAME, config.getChannel(), null,
Map.of("destination", c -> deadLetterQueueDestination.orElse("dead-letter-queue-" + config.getChannel())));

Config jmsConfig = ConfigHelper.retrieveChannelConfiguration(configurations, connectorConfig);

JmsConnectorOutgoingConfiguration producerConfig = new JmsConnectorOutgoingConfiguration(jmsConfig);
String destination = producerConfig.getDestination().get();

String deadQueueDestination = config.getDeadLetterQueueDestination()
.orElse("dead-letter-queue-" + config.getChannel());

UnicastProcessor<Message<?>> processor = UnicastProcessor.create();
Flow.Subscriber<? extends Message<?>> subscriber = connector.getSubscriber(jmsConfig);
wireOutgoingConnectorToUpstream(processor, subscriber, subscriberDecorators,
producerConfig.getChannel() + "-" + CHANNEL_DLQ_SUFFIX);

return new JmsDlqFailure(config, deadQueueDestination, processor);
}
}

public JmsDlqFailure(JmsConnectorIncomingConfiguration config, String dlqDestination,
UnicastProcessor<Message<?>> dlqSource) {
this.config = config;
this.dlqDestination = dlqDestination;
this.dlqSource = dlqSource;
}

@Override
public <T> Uni<Void> handle(IncomingJmsMessage<T> incomingMessage, Throwable reason, Metadata metadata) {
OutgoingJmsMessageMetadata outgoingJmsMessageMetadata = getOutgoingJmsMessageMetadata(incomingMessage, reason);

Message<T> dead = Message.of(incomingMessage.getPayload());
dead.addMetadata(outgoingJmsMessageMetadata);

log.messageNackedDeadLetter(config.getChannel(), dlqDestination);

CompletableFuture<Void> future = new CompletableFuture<>();
dlqSource.onNext(dead
.withAck(() -> {
return dead.ack().thenAccept(__ -> future.complete(null));
})
.withNack(throwable -> {
future.completeExceptionally(throwable);
return future;
}));
return Uni.createFrom().completionStage(future)
.emitOn(incomingMessage::runOnMessageContext);
}

private <T> OutgoingJmsMessageMetadata getOutgoingJmsMessageMetadata(IncomingJmsMessage<T> message, Throwable reason) {
Optional<JmsProperties> optionalJmsProperties = message.getMetadata(IncomingJmsMessageMetadata.class).map(a -> {
return a.getProperties();
});
JmsProperties jmsProperties = optionalJmsProperties
.orElse(new ImmutableJmsProperties(message.unwrap(jakarta.jms.Message.class)));
Enumeration<String> propertyNames = jmsProperties.getPropertyNames();
JmsPropertiesBuilder jmsPropertiesBuilder = new JmsPropertiesBuilder();
while (propertyNames.hasMoreElements()) {
String propertyName = propertyNames.nextElement();
Object propertyValue = jmsProperties.getObjectProperty(propertyName);
jmsPropertiesBuilder.with(propertyName, propertyValue);
}

jmsPropertiesBuilder.with(DEAD_LETTER_EXCEPTION_CLASS_NAME, reason.getClass().getName());
jmsPropertiesBuilder.with(DEAD_LETTER_REASON, getThrowableMessage(reason));
if (reason.getCause() != null) {
jmsPropertiesBuilder.with(DEAD_LETTER_CAUSE_CLASS_NAME, reason.getCause().getClass().getName());
jmsPropertiesBuilder.with(DEAD_LETTER_CAUSE, getThrowableMessage(reason.getCause()));
}

JmsProperties outgoingJmsProperties = jmsPropertiesBuilder.build();
OutgoingJmsMessageMetadata outgoingJmsMessageMetadata = new OutgoingJmsMessageMetadata.OutgoingJmsMessageMetadataBuilder()
.withProperties(outgoingJmsProperties).build();
return outgoingJmsMessageMetadata;
}

@Override
public void terminate() {
dlqSource.cancel();
//dlqSink.closeQuietly();
}

private String getThrowableMessage(Throwable throwable) {
String text = throwable.getMessage();
if (text == null) {
text = throwable.toString();
}
return text;
}
}
Loading
Loading