Skip to content

Commit

Permalink
Fix #556
Browse files Browse the repository at this point in the history
Acknowledge messages when the broker send the receipt.
  • Loading branch information
cescoffier committed May 25, 2020
1 parent aae925e commit 2de8502
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private CompletionStage<?> send(AtomicReference<MqttClient> reference, Message<?
}

return client.publish(actualTopicToBeUsed, convert(msg.getPayload()), actualQoS, false, isRetain)
.onItem().produceCompletionStage(i -> msg.ack().thenApply(x -> msg))
.subscribeAsCompletionStage();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package io.smallrye.reactive.messaging.mqtt;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.weld.environment.se.Weld;
import org.jboss.weld.environment.se.WeldContainer;
import org.junit.After;
import org.junit.Test;

public class MqttAcknowledgementTest extends MqttTestBase {

private WeldContainer container;

@After
public void cleanup() {
if (container != null) {
container.close();
}
Clients.clear();
}

@Test(timeout = 10000)
public void testAcknowledgmentWithQoS1AndPayloads() throws InterruptedException {
Weld weld = baseWeld(getConfig(1));
weld.addBeanClass(EmitterBean.class);

CountDownLatch latch = new CountDownLatch(2);
usage.consumeStrings("test-topic-mqtt", 2, 10, TimeUnit.SECONDS,
null,
v -> latch.countDown());

container = weld.initialize();

EmitterBean bean = container.getBeanManager().createInstance().select(EmitterBean.class).get();
bean.sendAndAwait();
bean.sendMessageAndAwait();
assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue();
}

@Test(timeout = 10000)
public void testAcknowledgmentWithQoS0AndPayloads() throws InterruptedException {
Weld weld = baseWeld(getConfig(0));
weld.addBeanClass(EmitterBean.class);

CountDownLatch latch = new CountDownLatch(2);
usage.consumeStrings("test-topic-mqtt", 2, 10, TimeUnit.SECONDS,
null,
v -> latch.countDown());

container = weld.initialize();

EmitterBean bean = container.getBeanManager().createInstance().select(EmitterBean.class).get();
bean.sendAndAwait();
bean.sendMessageAndAwait();
assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue();
}

@Test(timeout = 10000)
public void testAcknowledgmentWithQoS2AndPayloads() throws InterruptedException {
Weld weld = baseWeld(getConfig(0));
weld.addBeanClass(EmitterBean.class);

CountDownLatch latch = new CountDownLatch(2);
usage.consumeStrings("test-topic-mqtt", 2, 10, TimeUnit.SECONDS,
null,
v -> latch.countDown());

container = weld.initialize();

EmitterBean bean = container.getBeanManager().createInstance().select(EmitterBean.class).get();
bean.sendAndAwait();
bean.sendMessageAndAwait();
assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue();
}

@ApplicationScoped
public static class EmitterBean {
@Inject
@Channel("test-topic-mqtt")
Emitter<String> mqttEmitter;

private int counter = 0;

public void sendAndAwait() {
mqttEmitter.send("hello-" + counter++).toCompletableFuture().join();
}

public void sendMessageAndAwait() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
mqttEmitter.send(Message.of("hello-message-" + counter++, () -> {
latch.countDown();
return CompletableFuture.completedFuture(null);
}));

assertThat(latch.await(3, TimeUnit.SECONDS)).isTrue();
}

}

private MapBasedConfig getConfig(int qos) {
String prefix = "mp.messaging.outgoing.test-topic-mqtt.";
Map<String, Object> config = new HashMap<>();
config.put(prefix + "topic", "test-topic-mqtt");
config.put(prefix + "connector", MqttConnector.CONNECTOR_NAME);
config.put(prefix + "host", System.getProperty("mqtt-host"));
config.put(prefix + "port", Integer.valueOf(System.getProperty("mqtt-port")));
config.put(prefix + "qos", qos);
if (System.getProperty("mqtt-user") != null) {
config.put(prefix + "username", System.getProperty("mqtt-user"));
config.put(prefix + "password", System.getProperty("mqtt-pwd"));
}
return new MapBasedConfig(config);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.smallrye.reactive.messaging.MediatorFactory;
import io.smallrye.reactive.messaging.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.connectors.WorkerPoolRegistry;
import io.smallrye.reactive.messaging.extension.EmitterImpl;
import io.smallrye.reactive.messaging.extension.MediatorManager;
import io.smallrye.reactive.messaging.extension.ReactiveMessagingExtension;
import io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory;
Expand Down Expand Up @@ -74,6 +75,7 @@ static Weld baseWeld(MapBasedConfig config) {
weld.addBeanClass(ConfiguredChannelFactory.class);
weld.addBeanClass(WorkerPoolRegistry.class);
weld.addBeanClass(ExecutionHolder.class);
weld.addPackages(EmitterImpl.class.getPackage());
weld.addExtension(new ReactiveMessagingExtension());
weld.addBeanClass(MqttConnector.class);

Expand Down

0 comments on commit 2de8502

Please sign in to comment.