Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn off graceful shutdown for connector tests by default #54

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions quarkus-solace-messaging-connector/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
<artifactId>quarkus-solace-messaging-connector</artifactId>
<name>Quarkus Solace Messaging Connector - Runtime</name>
<dependencies>
<dependency>
<groupId>io.smallrye.config</groupId>
<artifactId>smallrye-config</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,19 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
.call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
: m)
.onItem().invoke(() -> alive.set(true))
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke((t) -> {
failures.add(t);
alive.set(false);
});
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke(this::reportFailure);
if (!lazyStart) {
receiver.start();
}
}

private void reportFailure(Throwable throwable) {
failures.add(throwable);
private synchronized void reportFailure(Throwable throwable) {
alive.set(false);
// Don't keep all the failures, there are only there for reporting.
if (failures.size() == 10) {
failures.remove(0);
}
failures.add(throwable);
}

private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
Expand Down Expand Up @@ -208,12 +209,16 @@ public void close() {
}
closed.compareAndSet(false, true);
if (this.pollerThread != null) {
this.pollerThread.shutdown();
try {
this.pollerThread.awaitTermination(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
SolaceLogging.log.shutdownException(e.getMessage());
throw new RuntimeException(e);
if (this.gracefulShutdown) {
this.pollerThread.shutdown();
try {
this.pollerThread.awaitTermination(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
SolaceLogging.log.shutdownException(e.getMessage());
throw new RuntimeException(e);
}
} else {
this.pollerThread.shutdownNow();
}
}
receiver.terminate(3000);
Expand All @@ -233,10 +238,8 @@ public void isAlive(HealthReport.HealthReportBuilder builder) {
synchronized (this) {
reportedFailures = new ArrayList<>(failures);
}

builder.add(channel, solace.isConnected() && alive.get(),
reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
failures.removeAll(reportedFailures);
} else {
builder.add(channel, solace.isConnected() && alive.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o
boolean lazyStart = oc.getClientLazyStart();
this.topic = Topic.of(oc.getProducerTopic().orElse(this.channel));
this.processor = new SenderProcessor(oc.getProducerMaxInflightMessages(), oc.getProducerWaitForPublishReceipt(),
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke((t) -> {
failures.add(t);
alive.set(false);
}));
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke(this::reportFailure));
this.subscriber = MultiUtils.via(processor, multi -> multi.plug(
m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(publisher.startAsync())) : m));
if (!lazyStart) {
Expand All @@ -106,11 +103,18 @@ private Uni<Void> sendMessage(MessagingService solace, Message<?> m, boolean wai
return Uni.createFrom().completionStage(m.getAck());
})
.onFailure().recoverWithUni(t -> {
failures.add(t);
alive.set(false);
reportFailure(t);
return Uni.createFrom().completionStage(m.nack(t));
});
}

private synchronized void reportFailure(Throwable throwable) {
alive.set(false);
// Don't keep all the failures, there are only there for reporting.
if (failures.size() == 10) {
failures.remove(0);
}
failures.add(throwable);
}

private Uni<PublishReceipt> publishMessage(PersistentMessagePublisher publisher, Message<?> m,
Expand Down Expand Up @@ -249,9 +253,9 @@ public void isAlive(HealthReport.HealthReportBuilder builder) {
synchronized (this) {
reportedFailures = new ArrayList<>(failures);
}
System.out.println(reportedFailures);
builder.add(channel, solace.isConnected() && alive.get(),
reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
failures.removeAll(reportedFailures);
} else {
builder.add(channel, solace.isConnected() && alive.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private SolaceConsumerTest() {
@Test
@Order(1)
void consumer() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
Expand Down Expand Up @@ -74,7 +74,7 @@ void consumer() {
@Test
@Order(2)
void consumerReplay() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
Expand All @@ -94,7 +94,7 @@ void consumerReplay() {
@Test
@Order(3)
void consumerWithSelectorQuery() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
Expand Down Expand Up @@ -124,7 +124,7 @@ void consumerWithSelectorQuery() {
@Test
@Order(4)
void consumerFailedProcessingPublishToErrorTopic() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
Expand Down Expand Up @@ -158,7 +158,7 @@ void consumerFailedProcessingPublishToErrorTopic() {
@Test
@Order(5)
void consumerFailedProcessingMoveToDMQ() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
Expand Down Expand Up @@ -193,7 +193,7 @@ void consumerFailedProcessingMoveToDMQ() {
@Test
@Order(6)
void partitionedQueue() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.consumer-1.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-1.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
Expand Down Expand Up @@ -262,7 +262,7 @@ void partitionedQueue() {
@Test
@Order(7)
void consumerPublishToErrorTopicPermissionException() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
Expand Down Expand Up @@ -340,7 +340,7 @@ void consumerGracefulCloseTest() {
@Test
@Order(9)
void consumerCreateMissingResourceAddSubscriptionPermissionException() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SolaceProcessorTest extends WeldTestBase {
@Test
void consumer() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class SolacePublisherTest extends WeldTestBase {

@Test
void publisher() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic);

Expand All @@ -53,7 +53,7 @@ void publisher() {

@Test
void publisherWithDynamicDestination() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic);

Expand All @@ -80,7 +80,7 @@ void publisherWithDynamicDestination() {

@Test
void publisherWithBackPressureReject() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic)
.with("mp.messaging.outgoing.out.producer.back-pressure.buffer-capacity", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static SolaceContainer createSolaceContainer() {
public void startSolaceBroker() {
solace = createSolaceContainer()
.withCredentials("user", "pass")
.withExposedPorts(SolaceContainer.Service.SMF.getPort())
.withExposedPorts(SolaceContainer.Service.SMF.getPort(), 8080)
.withPublishTopic("quarkus/integration/test/replay/messages", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/default/>", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/provisioned/>", SolaceContainer.Service.SMF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,9 @@ public boolean isAlive() {
return getHealth().getLiveness().isOk();
}

public MapBasedConfig commonConfig() {
return new MapBasedConfig()
.with("mp.messaging.connector.quarkus-solace.client.graceful-shutdown", false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class SolacePublisherHealthCheck extends WeldTestBase {
public class SolacePublisherHealthTest extends WeldTestBase {
@Test
void publisherHealthCheck() {
MapBasedConfig config = new MapBasedConfig()
Expand Down Expand Up @@ -73,7 +73,9 @@ void publisherLivenessCheck() {
// Run app that publish messages
MyApp app = runApplication(config, MyApp.class);

await().until(() -> isStarted() && isReady());
await().until(() -> isStarted() && isReady() && !isAlive());

await().until(() -> !isAlive());

HealthReport startup = getHealth().getStartup();
HealthReport liveness = getHealth().getLiveness();
Expand All @@ -94,7 +96,6 @@ static class MyApp {

@Outgoing("out")
Multi<Message<String>> out() {

return Multi.createFrom().items("1", "2", "3", "4", "5")
.map(payload -> Message.of(payload).withAck(() -> {
acked.add(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class LocalPropagationAckTest extends WeldTestBase {

private MapBasedConfig dataconfig() {
return new MapBasedConfig()
return commonConfig()
.with("mp.messaging.incoming.data.connector", SolaceConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.data.consumer.queue.subscriptions", topic)
.with("mp.messaging.incoming.data.consumer.queue.add-additional-subscriptions", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class LocalPropagationTest extends WeldTestBase {

private MapBasedConfig dataconfig() {
return new MapBasedConfig()
return commonConfig()
.with("mp.messaging.incoming.data.connector", SolaceConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.data.consumer.queue.subscriptions", topic)
.with("mp.messaging.incoming.data.consumer.queue.add-additional-subscriptions", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class EndToEndPerformanceTest extends WeldTestBase {
@Test
public void endToEndPerformanceTesttWithBackPressureWaitAndWaitForPublishReceipt() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ public void solaceConsumerPerformanceTest() {
.build()
.start();

MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);
// .with("mp.messaging.incoming.in.client.graceful-shutdown", false);

// Run app that consumes messages
MyConsumer app = runApplication(config, MyConsumer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SolacePublisherPerformanceTest extends WeldTestBase {

@Test
void publisherPerformanceTestWithBackPressureWaitAndWaitForPublishReceipt() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic);

Expand Down Expand Up @@ -65,7 +65,7 @@ void publisherPerformanceTestWithBackPressureWaitAndWaitForPublishReceipt() {

@Test
void publisherPerformanceTestWithBackPressureWaitAndNoWaitForPublishReceipt() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic)
.with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false);
Expand Down Expand Up @@ -103,7 +103,7 @@ void publisherPerformanceTestWithBackPressureWaitAndNoWaitForPublishReceipt() {

@Test
void publisherPerformanceTestWithBackPressureElasticAndWaitForPublishReceipt() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic)
.with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic");
Expand Down Expand Up @@ -141,7 +141,7 @@ void publisherPerformanceTestWithBackPressureElasticAndWaitForPublishReceipt() {

@Test
void publisherPerformanceTestWithBackPressureElasticAndNoWaitForPublishReceipt() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic)
.with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic")
Expand Down
Loading