Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt messages are not acknowledged #556

Closed
PaulFrmBrn opened this issue May 21, 2020 · 3 comments · Fixed by #559
Closed

Mqtt messages are not acknowledged #556

PaulFrmBrn opened this issue May 21, 2020 · 3 comments · Fixed by #559
Assignees
Labels
on-roadmap The issue is part of the roadmap
Milestone

Comments

@PaulFrmBrn
Copy link

It seems that messages sent to MQTT baker with QOS = 2 are not acknowledged - or callback method is not called. On contrast, AMQP messages are acknowledged as expected.

On calling this endpoint

    @GET
    @Path("/mqtt/message")
    @Produces(MediaType.TEXT_PLAIN)
    public String helloMqttMessage() {
        var message = Message.of("Hello").withAck(() -> {
            log.info("mqtt: message acked");
            return CompletableFuture.completedStage(null);
        });
        log.info("mqtt: message prepared");
        mqttEmitter.send(message);
        log.info("mqtt: message emitted");

        return message.getPayload();
    }

These logs are printed:

2020-05-21 16:07:22,487 INFO  [com.pau.GreetingResource] (executor-thread-1) mqtt: message prepared
2020-05-21 16:07:22,488 INFO  [com.pau.GreetingResource] (executor-thread-1) mqtt: message emitted

While one would expect smth like this:

2020-05-21 16:07:22,487 INFO  [com.pau.GreetingResource] (executor-thread-1) mqtt: message prepared
2020-05-21 16:07:22,488 INFO  [com.pau.GreetingResource] (executor-thread-1) mqtt: message emitted
2020-05-21 16:07:22,488 INFO  [com.pau.GreetingResource] ([some Vert.x thread]) mqtt: message acked

When using AMQP

    @GET
    @Path("/amqp/message")
    @Produces(MediaType.TEXT_PLAIN)
    public String helloAmqpMessage() {
        var message = Message.of("Hello").withAck(() -> {
            log.info("amqp: message acked");
            return CompletableFuture.completedStage(null);
        });
        log.info("amqp: message prepared");
        amqpEmitter.send(message);
        log.info("amqp: message emitted");
        return message.getPayload();
    }

These logs are printed:

2020-05-21 16:08:52,352 INFO  [com.pau.GreetingResource] (executor-thread-1) amqp: message prepared
2020-05-21 16:08:52,360 INFO  [com.pau.GreetingResource] (executor-thread-1) amqp: message emitted
2020-05-21 16:08:52,477 INFO  [com.pau.GreetingResource] (vert.x-eventloop-thread-5) amqp: message acked

Debug logs say that QOS = 2 is used

2020-05-21 16:14:03,358 FINE  [io.ver.mqt.imp.MqttClientImpl] (vert.x-eventloop-thread-1) Sending packet MqttPublishMessage[fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=EXACTLY_ONCE, isRetain=false, remainingLength=0], variableHeader=MqttPublishVariableHeader[topicName=/test-topic, packetId=1], payload=UnpooledHeapByteBuf(ridx: 0, widx: 5, cap: 5/5)]

As a consequence trying to .join() returned by emitter.send(Payload) completable future leads to executor's thread infinite awaiting.

And message is really sent to MQTT broker - this was checked for VerneMQ, ActiveMQ Artemis and InMemoryConnector. But still with no acknowledge callback called.

Reactive messaging properties:

mp.messaging.outgoing.test-topic-mqtt.connector=smallrye-mqtt
mp.messaging.outgoing.test-topic-mqtt.topic=/test-topic
mp.messaging.outgoing.test-topic-mqtt.host=127.0.0.1
mp.messaging.outgoing.test-topic-mqtt.port=1883
mp.messaging.outgoing.test-topic-mqtt.qos=2
mp.messaging.outgoing.test-topic-mqtt.username=artemis
mp.messaging.outgoing.test-topic-mqtt.password=simetraehcapa

mp.messaging.outgoing.test-topic-amqp.connector=smallrye-amqp
mp.messaging.outgoing.test-topic-amqp.topic=/test-topic
mp.messaging.outgoing.test-topic-amqp.host=localhost
mp.messaging.outgoing.test-topic-amqp.port=5672
mp.messaging.outgoing.test-topic-amqp.username=artemis
mp.messaging.outgoing.test-topic-amqp.password=simetraehcapa

I've made a demo app which uses both MQTT and AMQP connectors via send(Payload) and send(Message) emitter's method with tests for each endpoint.

just call

GET http://localhost:8080/hello/amqp/message (logs are fine, message sent)
GET http://localhost:8080/hello/amqp/payload (logs are fine, message sent)
GET http://localhost:8080/hello/mqtt/message (no ack log entry, message sent)
GET http://localhost:8080/hello/mqtt/payload (blocks on `send.toCompletableFuture().join();`, message sent)

All the code can be found here: https://github.com/PaulFrmBrn/mqtt-ack-test

Environment (built & run by /gradlew quarkusDev)
Quarkus: 1.4.2
SmallRye Reactive Messaging: 2.0.2
OS: macOS Catalina 10.15.3
Java info

java --version
openjdk 11.0.6 2020-01-14
OpenJDK Runtime Environment GraalVM CE 20.0.0 (build 11.0.6+9-jvmci-20.0-b02)
OpenJDK 64-Bit Server VM GraalVM CE 20.0.0 (build 11.0.6+9-jvmci-20.0-b02, mixed mode, sharing)

Gradle: 6.1
MQTT brokers used (containers)

docker run -p 5672:5672 -p 1883:1883 -p 8161:8161 -d vromero/activemq-artemis
docker run -p 1883:1883 -e "DOCKER_VERNEMQ_ALLOW_ANONYMOUS=on" -d erlio/docker-vernemq

Thanks in advance.

@cescoffier cescoffier added the on-roadmap The issue is part of the roadmap label May 23, 2020
@cescoffier cescoffier self-assigned this May 25, 2020
@cescoffier cescoffier added this to the 2.1.0 milestone May 25, 2020
cescoffier added a commit that referenced this issue May 25, 2020
Acknowledge messages when the broker send the receipt.
@cescoffier cescoffier linked a pull request May 25, 2020 that will close this issue
@cescoffier
Copy link
Contributor

Thanks for the reproducer! It made the fix so much easier to implement and verify. I've opened a PR with the actual fix. I've tested with QoS 0, 1 and 2.

@PaulFrmBrn
Copy link
Author

Great! Thank you for the fast response ) Looking forward to try the fix )

@PaulFrmBrn
Copy link
Author

Great! Thanks! Is there a hint when 2.1.0 will arrive?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
on-roadmap The issue is part of the roadmap
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants