Skip to content

Commit

Permalink
GH-893: Add messagesPerAck prop to DirectRLCF
Browse files Browse the repository at this point in the history
Resolves #893

* Allow `messagesPerAck` and `ackTimeout` to be configurable when using `DirectRabbitListenerContainerFactory`
This capability is being added primarily to support configuration of these values when using Spring Boot application.properties/yml.
When using code based configuration an alternate method is to use the `setContainerConfigurer` callback.

* Update copyright year and author for affected classes
  • Loading branch information
sudr authored and artembilan committed Jan 30, 2019
1 parent 454e30e commit 6fb2187
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 the original author or authors.
* Copyright 2016-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
* implementation to build a regular {@link DirectMessageListenerContainer}.
*
* @author Gary Russell
* @author Sud Ramasamy
* @since 2.0
*/
public class DirectRabbitListenerContainerFactory
Expand All @@ -36,6 +37,10 @@ public class DirectRabbitListenerContainerFactory

private Integer consumersPerQueue = 1;

private Integer messagesPerAck;

private Long ackTimeout;

/**
* Set the task scheduler to use for the task that monitors idle containers and
* failed consumers.
Expand Down Expand Up @@ -67,6 +72,29 @@ public void setConsumersPerQueue(Integer consumersPerQueue) {
this.consumersPerQueue = consumersPerQueue;
}

/**
* Set the number of messages to receive before acknowledging (success).
* A failed message will short-circuit this counter.
* @param messagesPerAck the number of messages.
* @see #setAckTimeout(Long)
*/
public void setMessagesPerAck(Integer messagesPerAck) {
this.messagesPerAck = messagesPerAck;
}

/**
* An approximate timeout; when {@link #setMessagesPerAck(Integer) messagesPerAck} is
* greater than 1, and this time elapses since the last ack, the pending acks will be
* sent either when the next message arrives, or a short time later if no additional
* messages arrive. In that case, the actual time depends on the
* {@link #setMonitorInterval(long) monitorInterval}.
* @param ackTimeout the timeout in milliseconds (default 20000);
* @see #setMessagesPerAck(Integer)
*/
public void setAckTimeout(Long ackTimeout) {
this.ackTimeout = ackTimeout;
}

@Override
protected DirectMessageListenerContainer createContainerInstance() {
return new DirectMessageListenerContainer();
Expand All @@ -93,6 +121,12 @@ protected void initializeContainer(DirectMessageListenerContainer instance, Rabb
else if (this.consumersPerQueue != null) {
instance.setConsumersPerQueue(this.consumersPerQueue);
}
if (this.messagesPerAck != null) {
instance.setMessagesPerAck(this.messagesPerAck);
}
if (this.ackTimeout != null) {
instance.setAckTimeout(this.ackTimeout);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,6 +51,7 @@
* @author Artem Bilan
* @author Joris Kuipers
* @author Gary Russell
* @author Sud Ramasamy
*
*/
public class RabbitListenerContainerFactoryTests {
Expand Down Expand Up @@ -167,6 +168,8 @@ public void createDirectContainerFullConfig() {
this.direct.setTaskScheduler(scheduler);
this.direct.setMonitorInterval(1234L);
this.direct.setConsumersPerQueue(42);
this.direct.setMessagesPerAck(5);
this.direct.setAckTimeout(3L);
this.direct.setAfterReceivePostProcessors(afterReceivePostProcessor);

assertArrayEquals(new Advice[] {advice}, this.direct.getAdviceChain());
Expand Down Expand Up @@ -194,6 +197,8 @@ public void createDirectContainerFullConfig() {
assertSame(scheduler, fieldAccessor.getPropertyValue("taskScheduler"));
assertEquals(1234L, fieldAccessor.getPropertyValue("monitorInterval"));
assertEquals(42, fieldAccessor.getPropertyValue("consumersPerQueue"));
assertEquals(5, fieldAccessor.getPropertyValue("messagesPerAck"));
assertEquals(3L, fieldAccessor.getPropertyValue("ackTimeout"));
List<?> actualAfterReceivePostProcessors = (List<?>) fieldAccessor.getPropertyValue("afterReceivePostProcessors");
assertEquals("Wrong number of afterReceivePostProcessors", 1, actualAfterReceivePostProcessors.size());
assertSame("Wrong afterReceivePostProcessor", afterReceivePostProcessor, actualAfterReceivePostProcessors.get(0));
Expand Down

0 comments on commit 6fb2187

Please sign in to comment.