Skip to content

Commit

Permalink
GH-1006: Fix listener ack for Mono<Void>
Browse files Browse the repository at this point in the history
Fixes #1006

When listener method returns `Mono<Void>`, the `success` callback is
not called from the Reactor because there is no value to handle.

* Move `basicAck()` to the `completeConsumer` which is called when `Mono`
is completed successfully with value or without

**Cherry-pick to 2.1.x**
  • Loading branch information
artembilan authored and garyrussell committed May 31, 2019
1 parent 14d421d commit e220c2f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
* @author Stephane Nicoll
* @author Gary Russell
* @author Artem Bilan
* @author Johan Haleby
*
* @since 1.4
*
Expand Down Expand Up @@ -314,7 +315,10 @@ protected void handleResult(InvocationResult resultArg, Message request, Channel
+ "otherwise the container will ack the message immediately");
}
((ListenableFuture<?>) resultArg.getReturnValue()).addCallback(
r -> asyncSuccess(resultArg, request, channel, source, r),
r -> {
asyncSuccess(resultArg, request, channel, source, r);
basicAck(request, channel);
},
t -> asyncFailure(request, channel, t));
}
else if (monoPresent && MonoHandler.isMono(resultArg.getReturnValue())) {
Expand All @@ -324,7 +328,8 @@ else if (monoPresent && MonoHandler.isMono(resultArg.getReturnValue())) {
}
MonoHandler.subscribe(resultArg.getReturnValue(),
r -> asyncSuccess(resultArg, request, channel, source, r),
t -> asyncFailure(request, channel, t));
t -> asyncFailure(request, channel, t),
() -> basicAck(request, channel));
}
else {
doHandleResult(resultArg, request, channel, source);
Expand Down Expand Up @@ -361,6 +366,9 @@ private void asyncSuccess(InvocationResult resultArg, Message request, Channel c
doHandleResult(new InvocationResult(deferredResult, resultArg.getSendTo(), returnType), request, channel,
source);
}
}

private void basicAck(Message request, Channel channel) {
try {
channel.basicAck(request.getMessageProperties().getDeliveryTag(), false);
}
Expand Down Expand Up @@ -598,9 +606,9 @@ static boolean isMono(Object result) {

@SuppressWarnings("unchecked")
static void subscribe(Object returnValue, Consumer<? super Object> success,
Consumer<? super Throwable> failure) {
Consumer<? super Throwable> failure, Runnable completeConsumer) {

((Mono<? super Object>) returnValue).subscribe(success, failure);
((Mono<? super Object>) returnValue).subscribe(success, failure, completeConsumer);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.springframework.util.concurrent.SettableListenableFuture;

import com.rabbitmq.client.Channel;
import reactor.core.publisher.Mono;

/**
* @author Dave Syer
Expand Down Expand Up @@ -238,6 +239,23 @@ public ListenableFuture<String> myPojoMessageMethod(String input) {
verify(mockChannel).basicAck(anyLong(), eq(false));
}

@Test
public void testMonoVoidReturnAck() throws Exception {
class Delegate {

@SuppressWarnings("unused")
public Mono<Void> myPojoMessageMethod(String input) {
return Mono.empty();
}

}
this.adapter = new MessageListenerAdapter(new Delegate(), "myPojoMessageMethod");
this.adapter.containerAckMode(AcknowledgeMode.MANUAL);
this.adapter.setResponseExchange("default");
Channel mockChannel = mock(Channel.class);
this.adapter.onMessage(new Message("foo".getBytes(), this.messageProperties), mockChannel);
verify(mockChannel).basicAck(anyLong(), eq(false));
}

public interface Service {

Expand Down

0 comments on commit e220c2f

Please sign in to comment.