diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/DirectRabbitListenerContainerFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/DirectRabbitListenerContainerFactory.java index d9a2ae89a5..3e22e51789 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/DirectRabbitListenerContainerFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/DirectRabbitListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ * implementation to build a regular {@link DirectMessageListenerContainer}. * * @author Gary Russell + * @author Sud Ramasamy * @since 2.0 */ public class DirectRabbitListenerContainerFactory @@ -36,6 +37,10 @@ public class DirectRabbitListenerContainerFactory private Integer consumersPerQueue = 1; + private Integer messagesPerAck; + + private Long ackTimeout; + /** * Set the task scheduler to use for the task that monitors idle containers and * failed consumers. @@ -67,6 +72,29 @@ public void setConsumersPerQueue(Integer consumersPerQueue) { this.consumersPerQueue = consumersPerQueue; } + /** + * Set the number of messages to receive before acknowledging (success). + * A failed message will short-circuit this counter. + * @param messagesPerAck the number of messages. + * @see #setAckTimeout(Long) + */ + public void setMessagesPerAck(Integer messagesPerAck) { + this.messagesPerAck = messagesPerAck; + } + + /** + * An approximate timeout; when {@link #setMessagesPerAck(Integer) messagesPerAck} is + * greater than 1, and this time elapses since the last ack, the pending acks will be + * sent either when the next message arrives, or a short time later if no additional + * messages arrive. In that case, the actual time depends on the + * {@link #setMonitorInterval(long) monitorInterval}. + * @param ackTimeout the timeout in milliseconds (default 20000); + * @see #setMessagesPerAck(Integer) + */ + public void setAckTimeout(Long ackTimeout) { + this.ackTimeout = ackTimeout; + } + @Override protected DirectMessageListenerContainer createContainerInstance() { return new DirectMessageListenerContainer(); @@ -93,6 +121,12 @@ protected void initializeContainer(DirectMessageListenerContainer instance, Rabb else if (this.consumersPerQueue != null) { instance.setConsumersPerQueue(this.consumersPerQueue); } + if (this.messagesPerAck != null) { + instance.setMessagesPerAck(this.messagesPerAck); + } + if (this.ackTimeout != null) { + instance.setAckTimeout(this.ackTimeout); + } } } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java index 8a0a6ebc00..d69adc8b20 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,6 +51,7 @@ * @author Artem Bilan * @author Joris Kuipers * @author Gary Russell + * @author Sud Ramasamy * */ public class RabbitListenerContainerFactoryTests { @@ -167,6 +168,8 @@ public void createDirectContainerFullConfig() { this.direct.setTaskScheduler(scheduler); this.direct.setMonitorInterval(1234L); this.direct.setConsumersPerQueue(42); + this.direct.setMessagesPerAck(5); + this.direct.setAckTimeout(3L); this.direct.setAfterReceivePostProcessors(afterReceivePostProcessor); assertArrayEquals(new Advice[] {advice}, this.direct.getAdviceChain()); @@ -194,6 +197,8 @@ public void createDirectContainerFullConfig() { assertSame(scheduler, fieldAccessor.getPropertyValue("taskScheduler")); assertEquals(1234L, fieldAccessor.getPropertyValue("monitorInterval")); assertEquals(42, fieldAccessor.getPropertyValue("consumersPerQueue")); + assertEquals(5, fieldAccessor.getPropertyValue("messagesPerAck")); + assertEquals(3L, fieldAccessor.getPropertyValue("ackTimeout")); List actualAfterReceivePostProcessors = (List) fieldAccessor.getPropertyValue("afterReceivePostProcessors"); assertEquals("Wrong number of afterReceivePostProcessors", 1, actualAfterReceivePostProcessors.size()); assertSame("Wrong afterReceivePostProcessor", afterReceivePostProcessor, actualAfterReceivePostProcessors.get(0));