Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues #14

Merged
merged 5 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pubsub-plus-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,19 @@
</annotationProcessors>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<java.util.logging.manager>java.util.logging.LogManager</java.util.logging.manager>
</systemPropertyVariables>
<classpathDependencyExcludes>
<classpathDependencyExclude>org.jboss.slf4j:slf4j-jboss-logmanager</classpathDependencyExclude>
<classpathDependencyExclude>org.jboss.logmanager:jboss-logmanager-embedded</classpathDependencyExclude>
</classpathDependencyExcludes>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@

import com.solace.messaging.MessagingService;
import com.solace.messaging.config.MessageAcknowledgementConfiguration;
import com.solace.messaging.publisher.PersistentMessagePublisher;
import com.solace.messaging.receiver.AcknowledgementSupport;

import io.quarkiverse.solace.i18n.SolaceLogging;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.smallrye.mutiny.Uni;

public class SolaceErrorTopic implements SolaceFailureHandler {
private final String channel;
Expand Down Expand Up @@ -50,24 +48,18 @@ public void setTimeToLive(Long timeToLive) {

@Override
public CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reason, Metadata metadata) {
PersistentMessagePublisher.PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler
.handle(msg, errorTopic, dmqEligible, timeToLive)
return solaceErrorTopicPublisherHandler.handle(msg, errorTopic, dmqEligible, timeToLive)
.onFailure().retry().withBackOff(Duration.ofSeconds(1))
.atMost(maxDeliveryAttempts)
.subscribeAsCompletionStage().exceptionally((t) -> {
SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel,
t.getMessage());
return null;
}).join();

if (publishReceipt != null) {
return Uni.createFrom().voidItem()
.invoke(() -> ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED))
.runSubscriptionOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}

return Uni.createFrom().<Void> failure(reason)
.emitOn(msg::runOnMessageContext).subscribeAsCompletionStage();
.onItem().invoke(() -> {
SolaceLogging.log.messageSettled(channel,
MessageAcknowledgementConfiguration.Outcome.ACCEPTED.toString().toLowerCase(),
"Message is published to error topic and acknowledged on queue.");
ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED);
})
.replaceWithVoid()
.onFailure().invoke(t -> SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, t))
.emitOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,33 @@
class SolaceErrorTopicPublisherHandler implements PersistentMessagePublisher.MessagePublishReceiptListener {

private final MessagingService solace;
private String errorTopic;
private final PersistentMessagePublisher publisher;
private final OutboundErrorMessageMapper outboundErrorMessageMapper;

public SolaceErrorTopicPublisherHandler(MessagingService solace) {
this.solace = solace;

publisher = solace.createPersistentMessagePublisherBuilder().build();
publisher.setMessagePublishReceiptListener(this);
publisher.start();
outboundErrorMessageMapper = new OutboundErrorMessageMapper();
}

public Uni<PublishReceipt> handle(SolaceInboundMessage<?> message,
String errorTopic,
boolean dmqEligible, Long timeToLive) {
this.errorTopic = errorTopic;
OutboundMessage outboundMessage = outboundErrorMessageMapper.mapError(this.solace.messageBuilder(),
message.getMessage(),
dmqEligible, timeToLive);
publisher.setMessagePublishReceiptListener(this);
// }
return Uni.createFrom().<PublishReceipt> emitter(e -> {
try {
// always wait for error message publish receipt to ensure it is successfully spooled on broker.
publisher.publish(outboundMessage, Topic.of(errorTopic), e);
} catch (Throwable t) {
SolaceLogging.log.publishException(this.errorTopic);
e.fail(t);
}
}).invoke(() -> System.out.println(""));
}).onFailure().invoke(t -> SolaceLogging.log.publishException(errorTopic, t));
}

@Override
Expand All @@ -53,7 +50,6 @@ public void onPublishReceipt(PublishReceipt publishReceipt) {
.getUserContext();
PubSubPlusClientException exception = publishReceipt.getException();
if (exception != null) {
SolaceLogging.log.publishException(this.errorTopic);
uniEmitter.fail(exception);
} else {
uniEmitter.complete(publishReceipt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.LogMessage;
import org.jboss.logging.annotations.Message;
import org.jboss.logging.annotations.MessageLogger;
import org.jboss.logging.annotations.Once;
import org.jboss.logging.annotations.*;

/**
* Logging for Solace PubSub Connector
Expand All @@ -30,12 +27,12 @@ public interface SolaceLogging extends BasicLogger {
void messageSettled(String channel, String outcome, String reason);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55203, value = "Publishing error message to topic %s received from channel `%s` is unsuccessful, reason: %s")
void unsuccessfulToTopic(String topic, String channel, String reason);
@Message(id = 55203, value = "Publishing error message to topic %s received from channel `%s` is unsuccessful")
void unsuccessfulToTopic(String topic, String channel, @Cause Throwable cause);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55204, value = "A exception occurred when publishing to topic %s")
void publishException(String topic);
void publishException(String topic, @Cause Throwable cause);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55205, value = "A exception occurred during shutdown %s")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import io.quarkiverse.solace.i18n.SolaceLogging;

class IncomingMessagesUnsignedCounterBarrier {
private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1)
private final Lock awaitLock = new ReentrantLock();
private final Condition isZero = awaitLock.newCondition();

private static final Log logger = LogFactory.getLog(IncomingMessagesUnsignedCounterBarrier.class);

public IncomingMessagesUnsignedCounterBarrier(long initialValue) {
counter = new AtomicLong(initialValue);
}
Expand Down Expand Up @@ -64,7 +61,8 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
awaitLock.lock();
try {
if (timeout > 0) {
logger.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit));
SolaceLogging.log
.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit));
final long expiry = unit.toMillis(timeout) + System.currentTimeMillis();
while (isGreaterThanZero()) {
long realTimeout = expiry - System.currentTimeMillis();
Expand All @@ -76,7 +74,7 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
return true;
} else if (timeout < 0) {
while (isGreaterThanZero()) {
logger.info(String.format("Waiting for %s items", counter.get()));
SolaceLogging.log.info(String.format("Waiting for %s items", counter.get()));
isZero.await(5, TimeUnit.SECONDS);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import io.quarkiverse.solace.i18n.SolaceLogging;

class OutgoingMessagesUnsignedCounterBarrier {
private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1)
private final Lock awaitLock = new ReentrantLock();
private final Condition isZero = awaitLock.newCondition();

private static final Log logger = LogFactory.getLog(OutgoingMessagesUnsignedCounterBarrier.class);

public OutgoingMessagesUnsignedCounterBarrier(long initialValue) {
counter = new AtomicLong(initialValue);
}
Expand Down Expand Up @@ -64,7 +61,8 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
awaitLock.lock();
try {
if (timeout > 0) {
logger.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit));
SolaceLogging.log
.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit));
final long expiry = unit.toMillis(timeout) + System.currentTimeMillis();
while (isGreaterThanZero()) {
long realTimeout = expiry - System.currentTimeMillis();
Expand All @@ -76,7 +74,7 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
return true;
} else if (timeout < 0) {
while (isGreaterThanZero()) {
logger.info(String.format("Waiting for %s items", counter.get()));
SolaceLogging.log.info(String.format("Waiting for %s items", counter.get()));
isZero.await(5, TimeUnit.SECONDS);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@

import io.quarkiverse.solace.base.SolaceContainer;
import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.logging.SolaceTestAppender;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SolaceConsumerTest extends WeldTestBase {
private org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getLogger("io.quarkiverse.solace");
private SolaceTestAppender solaceTestAppender = new SolaceTestAppender();

private SolaceConsumerTest() {
rootLogger.addAppender(solaceTestAppender);
}

@Test
@Order(1)
Expand Down Expand Up @@ -180,28 +187,6 @@ void consumerFailedProcessingMoveToDMQ() {

@Test
@Order(6)
void consumerCreateMissingResourceAddSubscriptionPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);

Exception exception = assertThrows(Exception.class, () -> {
// Run app that consumes messages
MyConsumer app = runApplication(config, MyConsumer.class);
});

// Assert on published messages
await().untilAsserted(() -> assertThat(exception.getMessage())
.contains("com.solacesystems.jcsmp.AccessDeniedException: Permission Not Allowed - Queue '"
+ SolaceContainer.INTEGRATION_TEST_QUEUE_NAME + "' - Topic '" + topic));
}

@Test
@Order(7)
void consumerPublishToErrorTopicPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
Expand All @@ -210,6 +195,7 @@ void consumerPublishToErrorTopicPermissionException() {
.with("mp.messaging.incoming.in.consumer.queue.failure-strategy", "error_topic")
.with("mp.messaging.incoming.in.consumer.queue.error.topic",
"publish/deny")
.with("mp.messaging.incoming.in.consumer.queue.error.message.max-delivery-attempts", 0)
.with("mp.messaging.incoming.error-in.connector", "quarkus-solace")
.with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME)
.with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive");
Expand All @@ -226,6 +212,32 @@ void consumerPublishToErrorTopicPermissionException() {
publisher.publish(outboundMessage, tp);

await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages().size()).isEqualTo(0));
// await().untilAsserted(() -> assertThat(inMemoryLogHandler.getRecords().stream().filter(record -> record.getMessage().contains("A exception occurred when publishing to topic")).count()).isEqualTo(4));
await().untilAsserted(() -> assertThat(solaceTestAppender.getLog().stream()
.anyMatch(record -> record.getMessage().toString().contains("Publishing error message to topic")))
.isEqualTo(true));
}

@Test
@Order(7)
void consumerCreateMissingResourceAddSubscriptionPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);

Exception exception = assertThrows(Exception.class, () -> {
// Run app that consumes messages
MyConsumer app = runApplication(config, MyConsumer.class);
});

// Assert on published messages
await().untilAsserted(() -> assertThat(exception.getMessage())
.contains("com.solacesystems.jcsmp.AccessDeniedException: Permission Not Allowed - Queue '"
+ SolaceContainer.INTEGRATION_TEST_QUEUE_NAME + "' - Topic '" + topic));
}

@ApplicationScoped
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.quarkiverse.solace.logging;

import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;

public class SolaceTestAppender extends AppenderSkeleton {
private List<LoggingEvent> log = new ArrayList<>();

@Override
protected void append(LoggingEvent loggingEvent) {
log.add(loggingEvent);
}

@Override
public void close() {

}

@Override
public boolean requiresLayout() {
return false;
}

public List<LoggingEvent> getLog() {
return new ArrayList<LoggingEvent>(log);
}
}