From 3aed80fbd02def9f2ae59c2ccde8aea6e2f685d2 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 26 Jun 2019 14:06:07 -0400 Subject: [PATCH] GH-1032: Add consumer-side batching support Resolves https://github.com/spring-projects/spring-amqp/issues/1032 * Missing Javadocs * Still De-Batch producer batches if so configured * Polishing; detect incorrect configuration. * Revert to generic message for incorrect configuration. --- .../amqp/core/BatchMessageListener.java | 39 +++ .../amqp/core/MessageListener.java | 15 ++ .../config/ListenerContainerFactoryBean.java | 59 ++++- .../SimpleRabbitListenerContainerFactory.java | 2 +- .../AbstractMessageListenerContainer.java | 108 +++++--- .../SimpleMessageListenerContainer.java | 246 ++++++++++++++---- .../api/ChannelAwareBatchMessagelistener.java | 42 +++ .../api/ChannelAwareMessageListener.java | 7 + .../config/ListenerContainerParserTests.java | 2 +- .../RabbitListenerContainerFactoryTests.java | 2 +- .../core/BatchingRabbitTemplateTests.java | 2 +- ...ContainerErrorHandlerIntegrationTests.java | 2 +- ...nerContainerLifecycleIntegrationTests.java | 2 +- ...istenerContainerRetryIntegrationTests.java | 2 +- ...sageListenerManualAckIntegrationTests.java | 2 +- ...istenerRecoveryRepeatIntegrationTests.java | 2 +- ...MessageListenerTxSizeIntegrationTests.java | 2 +- ...sageListenerContainerIntegrationTests.java | 25 +- .../SimpleMessageListenerContainerTests.java | 8 +- .../SimpleMessageListenerWithRabbitMQ.java | 2 +- src/reference/asciidoc/amqp.adoc | 38 ++- src/reference/asciidoc/whats-new.adoc | 3 + 22 files changed, 492 insertions(+), 120 deletions(-) create mode 100644 spring-amqp/src/main/java/org/springframework/amqp/core/BatchMessageListener.java create mode 100644 spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/ChannelAwareBatchMessagelistener.java diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/BatchMessageListener.java b/spring-amqp/src/main/java/org/springframework/amqp/core/BatchMessageListener.java new file mode 100644 index 0000000000..d62325e58d --- /dev/null +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/BatchMessageListener.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.core; + +import java.util.List; + +/** + * Used to receive a batch of messages if the container supports it. + * + * @author Gary Russell + * @since 2.2 + * + */ +public interface BatchMessageListener extends MessageListener { + + @Override + default void onMessage(Message message) { + throw new UnsupportedOperationException("Should never be called by the container"); + } + + @Override + void onMessageBatch(List messages); + + +} diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageListener.java b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageListener.java index 73fdd42bab..e0dbe8397f 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageListener.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageListener.java @@ -16,6 +16,8 @@ package org.springframework.amqp.core; +import java.util.List; + /** * Listener interface to receive asynchronous delivery of Amqp Messages. * @@ -25,6 +27,10 @@ @FunctionalInterface public interface MessageListener { + /** + * Delivers a single message. + * @param message the message. + */ void onMessage(Message message); /** @@ -37,4 +43,13 @@ default void containerAckMode(AcknowledgeMode mode) { // NOSONAR - empty } + /** + * Delivers a batch of messages. + * @param messages the messages. + * @since 2.2 + */ + default void onMessageBatch(List messages) { + throw new UnsupportedOperationException("This listener does not support message batches"); + } + } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java index dc6afd525d..cf5ee9ca82 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java @@ -161,12 +161,14 @@ public class ListenerContainerFactoryBean extends AbstractFactoryBean + * When the channel is transacted, it determines how many messages to process in a + * single transaction. It should be less than or equal to + * {@link #setPrefetchCount(int) the prefetch count}. + *

+ * It also affects how often acks are sent when using + * {@link org.springframework.amqp.core.AcknowledgeMode#AUTO} - one ack per BatchSize. + *

+ * Finally, when {@link #setConsumerBatchEnabled(boolean)} is true, it determines how + * many records to include in the batch as long as sufficient messages arrive within + * {@link #setReceiveTimeout(long)}. + *

+ * IMPORTANT The batch size represents the number of physical messages + * received. If {@link #setDeBatchingEnabled(boolean)} is true and a message is a + * batch created by a producer, the actual number of messages received by the listener + * will be larger than this batch size. + *

+ * + * Default is 1. + * @param batchSize the batch size + * @since 2.2 + */ + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + /** + * Set the txSize. + * @param txSize the txSize. + * @deprecated in favor of {@link #setBatchSize(int)}. + */ + @Deprecated public void setTxSize(int txSize) { - this.txSize = txSize; + setBatchSize(txSize); + } + + /** + * Set to true to present a list of messages based on the {@link #setBatchSize(int)}, + * if the container and listener support it. + * @param consumerBatchEnabled true to create message batches in the container. + * @since 2.2 + * @see #setBatchSize(int) + */ + public void setConsumerBatchEnabled(boolean consumerBatchEnabled) { + this.consumerBatchEnabled = consumerBatchEnabled; } public void setDeclarationRetries(int declarationRetries) { @@ -380,8 +427,9 @@ public void setRetryDeclarationInterval(long retryDeclarationInterval) { @Override public Class getObjectType() { - return this.listenerContainer == null ? AbstractMessageListenerContainer.class : this.listenerContainer - .getClass(); + return this.listenerContainer == null + ? AbstractMessageListenerContainer.class + : this.listenerContainer.getClass(); } @SuppressWarnings("deprecation") @@ -446,7 +494,8 @@ private AbstractMessageListenerContainer createContainer() { .acceptIfNotNull(this.consecutiveActiveTrigger, container::setConsecutiveActiveTrigger) .acceptIfNotNull(this.consecutiveIdleTrigger, container::setConsecutiveIdleTrigger) .acceptIfNotNull(this.receiveTimeout, container::setReceiveTimeout) - .acceptIfNotNull(this.txSize, container::setTxSize) + .acceptIfNotNull(this.batchSize, container::setBatchSize) + .acceptIfNotNull(this.consumerBatchEnabled, container::setConsumerBatchEnabled) .acceptIfNotNull(this.declarationRetries, container::setDeclarationRetries) .acceptIfNotNull(this.retryDeclarationInterval, container::setRetryDeclarationInterval); return container; diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java index 21624cb137..fee9af73e3 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java @@ -140,7 +140,7 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb super.initializeContainer(instance, endpoint); JavaUtils javaUtils = JavaUtils.INSTANCE - .acceptIfNotNull(this.txSize, instance::setTxSize); + .acceptIfNotNull(this.txSize, instance::setBatchSize); String concurrency = null; if (endpoint != null) { concurrency = endpoint.getConcurrency(); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java index b41a104bf6..d354a60d9a 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java @@ -40,6 +40,7 @@ import org.springframework.amqp.ImmediateAcknowledgeAmqpException; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.core.BatchMessageListener; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.core.MessagePostProcessor; @@ -53,6 +54,7 @@ import org.springframework.amqp.rabbit.connection.RabbitResourceHolder; import org.springframework.amqp.rabbit.connection.RabbitUtils; import org.springframework.amqp.rabbit.connection.RoutingConnectionFactory; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessagelistener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException; import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException; @@ -447,6 +449,10 @@ public void setDeBatchingEnabled(boolean deBatchingEnabled) { this.deBatchingEnabled = deBatchingEnabled; } + protected boolean isDeBatchingEnabled() { + return this.deBatchingEnabled; + } + /** * Public setter for the {@link Advice} to apply to listener executions. *

@@ -1059,6 +1065,14 @@ public void setBatchingStrategy(BatchingStrategy batchingStrategy) { this.batchingStrategy = batchingStrategy; } + protected BatchingStrategy getBatchingStrategy() { + return this.batchingStrategy; + } + + protected Collection getAfterReceivePostProcessors() { + return this.afterReceivePostProcessors; + } + /** * Delegates to {@link #validateConfiguration()} and {@link #initialize()}. */ @@ -1321,30 +1335,39 @@ protected void invokeErrorHandler(Throwable ex) { /** * Execute the specified listener, committing or rolling back the transaction afterwards (if necessary). * @param channel the Rabbit Channel to operate on - * @param messageIn the received Rabbit Message + * @param data the received Rabbit Message * @see #invokeListener * @see #handleListenerException */ - protected void executeListener(Channel channel, Message messageIn) { + @SuppressWarnings("unchecked") + protected void executeListener(Channel channel, Object data) { if (!isRunning()) { if (logger.isWarnEnabled()) { - logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn); + logger.warn( + "Rejecting received message(s) because the listener container has been stopped: " + data); } throw new MessageRejectedWhileStoppingException(); } try { - doExecuteListener(channel, messageIn); + doExecuteListener(channel, data); } catch (RuntimeException ex) { - if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) { + Message message; + if (data instanceof Message) { + message = (Message) data; + } + else { + message = ((List) data).get(0); + } + if (message.getMessageProperties().isFinalRetryForMessageWithNoId()) { if (this.statefulRetryFatalWithNullMessageId) { throw new FatalListenerExecutionException( - "Illegal null id in message. Failed to manage retry for message: " + messageIn, ex); + "Illegal null id in message. Failed to manage retry for message: " + message, ex); } else { throw new ListenerExecutionFailedException("Cannot retry message more than once without an ID", new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex), - messageIn); + message); } } handleListenerException(ex); @@ -1352,39 +1375,44 @@ protected void executeListener(Channel channel, Message messageIn) { } } - private void doExecuteListener(Channel channel, Message messageIn) { - Message message = messageIn; - if (this.afterReceivePostProcessors != null) { - for (MessagePostProcessor processor : this.afterReceivePostProcessors) { - message = processor.postProcessMessage(message); - if (message == null) { - throw new ImmediateAcknowledgeAmqpException( - "Message Post Processor returned 'null', discarding message"); + private void doExecuteListener(Channel channel, Object data) { + if (data instanceof Message) { + Message message = (Message) data; + if (this.afterReceivePostProcessors != null) { + for (MessagePostProcessor processor : this.afterReceivePostProcessors) { + message = processor.postProcessMessage(message); + if (message == null) { + throw new ImmediateAcknowledgeAmqpException( + "Message Post Processor returned 'null', discarding message"); + } } } - } - if (this.deBatchingEnabled && this.batchingStrategy.canDebatch(message.getMessageProperties())) { - this.batchingStrategy.deBatch(message, fragment -> invokeListener(channel, fragment)); + if (this.deBatchingEnabled && this.batchingStrategy.canDebatch(message.getMessageProperties())) { + this.batchingStrategy.deBatch(message, fragment -> invokeListener(channel, fragment)); + } + else { + invokeListener(channel, message); + } } else { - invokeListener(channel, message); + invokeListener(channel, data); } } - protected void invokeListener(Channel channel, Message message) { - this.proxy.invokeListener(channel, message); + protected void invokeListener(Channel channel, Object data) { + this.proxy.invokeListener(channel, data); } /** * Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener. * @param channel the Rabbit Channel to operate on - * @param message the received Rabbit Message + * @param data the received Rabbit Message or List of Message. * @see #setMessageListener(MessageListener) */ - protected void actualInvokeListener(Channel channel, Message message) { + protected void actualInvokeListener(Channel channel, Object data) { Object listener = getMessageListener(); if (listener instanceof ChannelAwareMessageListener) { - doInvokeListener((ChannelAwareMessageListener) listener, channel, message); + doInvokeListener((ChannelAwareMessageListener) listener, channel, data); } else if (listener instanceof MessageListener) { boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted(); @@ -1395,7 +1423,7 @@ else if (listener instanceof MessageListener) { resourceHolder); } try { - doInvokeListener((MessageListener) listener, message); + doInvokeListener((MessageListener) listener, data); } finally { if (bindChannel) { @@ -1419,12 +1447,14 @@ else if (listener != null) { * An exception thrown from the listener will be wrapped in a {@link ListenerExecutionFailedException}. * @param listener the Spring ChannelAwareMessageListener to invoke * @param channel the Rabbit Channel to operate on - * @param message the received Rabbit Message + * @param data the received Rabbit Message or List of Message. * @see ChannelAwareMessageListener * @see #setExposeListenerChannel(boolean) */ - protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message) { + @SuppressWarnings("unchecked") + protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Object data) { + Message message = null; RabbitResourceHolder resourceHolder = null; Channel channelToUse = channel; boolean boundHere = false; @@ -1458,7 +1488,13 @@ protected void doInvokeListener(ChannelAwareMessageListener listener, Channel ch } // Actually invoke the message listener... try { - listener.onMessage(message, channelToUse); + if (data instanceof List) { + listener.onMessageBatch((List) data, channelToUse); + } + else { + message = (Message) data; + listener.onMessage(message, channelToUse); + } } catch (Exception e) { throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message); @@ -1500,13 +1536,21 @@ private void cleanUpAfterInvoke(@Nullable RabbitResourceHolder resourceHolder, C * Exception thrown from listener will be wrapped to {@link ListenerExecutionFailedException}. * * @param listener the Rabbit MessageListener to invoke - * @param message the received Rabbit Message + * @param data the received Rabbit Message or List of Message. * * @see org.springframework.amqp.core.MessageListener#onMessage */ - protected void doInvokeListener(MessageListener listener, Message message) { + @SuppressWarnings("unchecked") + protected void doInvokeListener(MessageListener listener, Object data) { + Message message = null; try { - listener.onMessage(message); + if (data instanceof List) { + listener.onMessageBatch((List) data); + } + else { + message = (Message) data; + listener.onMessage(message); + } } catch (Exception e) { throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message); @@ -1789,7 +1833,7 @@ private void checkPossibleAuthenticationFailureFatalFromProperty() { @FunctionalInterface private interface ContainerDelegate { - void invokeListener(Channel channel, Message message); + void invokeListener(Channel channel, Object data); } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index 5ebc38787b..bbef22f59d 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -38,7 +38,9 @@ import org.springframework.amqp.AmqpIllegalStateException; import org.springframework.amqp.AmqpRejectAndDontRequeueException; import org.springframework.amqp.ImmediateAcknowledgeAmqpException; +import org.springframework.amqp.core.BatchMessageListener; import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils; @@ -46,6 +48,7 @@ import org.springframework.amqp.rabbit.connection.RabbitResourceHolder; import org.springframework.amqp.rabbit.connection.RabbitUtils; import org.springframework.amqp.rabbit.connection.SimpleResourceHolder; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessagelistener; import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException; import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException; import org.springframework.amqp.rabbit.support.ActiveObjectCounter; @@ -97,23 +100,17 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta private final BlockingQueue abortEvents = new LinkedBlockingQueue<>(); - private volatile long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL; + private long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL; - private volatile long stopConsumerMinInterval = DEFAULT_STOP_CONSUMER_MIN_INTERVAL; + private long stopConsumerMinInterval = DEFAULT_STOP_CONSUMER_MIN_INTERVAL; - private volatile int consecutiveActiveTrigger = DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER; + private int consecutiveActiveTrigger = DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER; - private volatile int consecutiveIdleTrigger = DEFAULT_CONSECUTIVE_IDLE_TRIGGER; + private int consecutiveIdleTrigger = DEFAULT_CONSECUTIVE_IDLE_TRIGGER; - private volatile int txSize = 1; + private int batchSize = 1; - private volatile int concurrentConsumers = 1; - - private volatile Integer maxConcurrentConsumers; - - private volatile long lastConsumerStarted; - - private volatile long lastConsumerStopped; + private boolean consumerBatchEnabled; private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT; @@ -129,6 +126,14 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta private long consumerStartTimeout = DEFAULT_CONSUMER_START_TIMEOUT; + private volatile int concurrentConsumers = 1; + + private volatile Integer maxConcurrentConsumers; + + private volatile long lastConsumerStarted; + + private volatile long lastConsumerStopped; + /** * Default constructor for convenient dependency injection via setters. */ @@ -271,12 +276,12 @@ public final void setStopConsumerMinInterval(long stopConsumerMinInterval) { * {@link #maxConcurrentConsumers} has not been reached, specifies the number of * consecutive cycles when a single consumer was active, in order to consider * starting a new consumer. If the consumer goes idle for one cycle, the counter is reset. - * This is impacted by the {@link #txSize}. + * This is impacted by the {@link #batchSize}. * Default is 10 consecutive messages. * @param consecutiveActiveTrigger The number of consecutive receives to trigger a new consumer. * @see #setMaxConcurrentConsumers(int) * @see #setStartConsumerMinInterval(long) - * @see #setTxSize(int) + * @see #setBatchSize(int) */ public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger) { Assert.isTrue(consecutiveActiveTrigger > 0, "'consecutiveActiveTrigger' must be > 0"); @@ -288,14 +293,14 @@ public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger) { * the number of consumers exceeds {@link #concurrentConsumers}, specifies the * number of consecutive receive attempts that return no data; after which we consider * stopping a consumer. The idle time is effectively - * {@link #receiveTimeout} * {@link #txSize} * this value because the consumer thread waits for - * a message for up to {@link #receiveTimeout} up to {@link #txSize} times. + * {@link #receiveTimeout} * {@link #batchSize} * this value because the consumer thread waits for + * a message for up to {@link #receiveTimeout} up to {@link #batchSize} times. * Default is 10 consecutive idles. * @param consecutiveIdleTrigger The number of consecutive timeouts to trigger stopping a consumer. * @see #setMaxConcurrentConsumers(int) * @see #setStopConsumerMinInterval(long) * @see #setReceiveTimeout(long) - * @see #setTxSize(int) + * @see #setBatchSize(int) */ public final void setConsecutiveIdleTrigger(int consecutiveIdleTrigger) { Assert.isTrue(consecutiveIdleTrigger > 0, "'consecutiveIdleTrigger' must be > 0"); @@ -312,6 +317,37 @@ public void setReceiveTimeout(long receiveTimeout) { this.receiveTimeout = receiveTimeout; } + /** + * This property has several functions. + *

+ * When the channel is transacted, it determines how many messages to process in a + * single transaction. It should be less than or equal to + * {@link #setPrefetchCount(int) the prefetch count}. + *

+ * It also affects how often acks are sent when using + * {@link org.springframework.amqp.core.AcknowledgeMode#AUTO} - one ack per BatchSize. + *

+ * Finally, when {@link #setConsumerBatchEnabled(boolean)} is true, it determines how + * many records to include in the batch as long as sufficient messages arrive within + * {@link #setReceiveTimeout(long)}. + *

+ * IMPORTANT The batch size represents the number of physical messages + * received. If {@link #setDeBatchingEnabled(boolean)} is true and a message is a + * batch created by a producer, the actual number of messages received by the listener + * will be larger than this batch size. + *

+ * + * Default is 1. + * @param batchSize the batch size + * @since 2.2 + * @see #setConsumerBatchEnabled(boolean) + * @see #setDeBatchingEnabled(boolean) + */ + public void setBatchSize(int batchSize) { + Assert.isTrue(batchSize > 0, "'batchSize' must be > 0"); + this.batchSize = batchSize; + } + /** * Tells the container how many messages to process in a single transaction (if the * channel is transactional). For best results it should be less than or equal to @@ -319,10 +355,22 @@ public void setReceiveTimeout(long receiveTimeout) { * sent when using {@link org.springframework.amqp.core.AcknowledgeMode#AUTO} - one * ack per txSize. Default is 1. * @param txSize the transaction size + * @deprecated since 2.2 in favor of {@link #setBatchSize(int)}. */ + @Deprecated public void setTxSize(int txSize) { - Assert.isTrue(txSize > 0, "'txSize' must be > 0"); - this.txSize = txSize; + setBatchSize(txSize); + } + + /** + * Set to true to present a list of messages based on the {@link #setBatchSize(int)}, + * if the listener supports it. + * @param consumerBatchEnabled true to create message batches in the container. + * @since 2.2 + * @see #setBatchSize(int) + */ + public void setConsumerBatchEnabled(boolean consumerBatchEnabled) { + this.consumerBatchEnabled = consumerBatchEnabled; } /** @@ -467,7 +515,9 @@ protected final boolean sharedConnectionEnabled() { @Override protected void doInitialize() { - + Assert.state(!this.consumerBatchEnabled || getMessageListener() instanceof BatchMessageListener + || getMessageListener() instanceof ChannelAwareBatchMessagelistener, + "When setting 'consumerBatchEnabled' to true, the listener must support batching"); } @ManagedMetric(metricType = MetricType.GAUGE) @@ -758,7 +808,7 @@ protected BlockingQueueConsumer createBlockingQueueConsumer() { String[] queues = getQueueNames(); // There's no point prefetching less than the tx size, otherwise the consumer will stall because the broker // didn't get an ack for delivered messages - int actualPrefetchCount = getPrefetchCount() > this.txSize ? getPrefetchCount() : this.txSize; + int actualPrefetchCount = getPrefetchCount() > this.batchSize ? getPrefetchCount() : this.batchSize; consumer = new BlockingQueueConsumer(getConnectionFactory(), getMessagePropertiesConverter(), this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount, isDefaultRequeueRejected(), getConsumerArguments(), isNoLocal(), isExclusive(), queues); @@ -856,66 +906,154 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep Channel channel = consumer.getChannel(); - for (int i = 0; i < this.txSize; i++) { + List messages = null; + long deliveryTag = 0; + + for (int i = 0; i < this.batchSize; i++) { logger.trace("Waiting for message from consumer."); Message message = consumer.nextMessage(this.receiveTimeout); if (message == null) { break; } - try { - executeListener(channel, message); - } - catch (ImmediateAcknowledgeAmqpException e) { - if (this.logger.isDebugEnabled()) { - this.logger.debug("User requested ack for failed delivery '" - + e.getMessage() + "': " - + message.getMessageProperties().getDeliveryTag()); + if (this.consumerBatchEnabled) { + Collection afterReceivePostProcessors = getAfterReceivePostProcessors(); + if (afterReceivePostProcessors != null) { + deliveryTag = message.getMessageProperties().getDeliveryTag(); + for (MessagePostProcessor processor : getAfterReceivePostProcessors()) { + message = processor.postProcessMessage(message); + if (message == null) { + channel.basicAck(deliveryTag, false); + if (this.logger.isDebugEnabled()) { + this.logger + .debug("Message Post Processor returned 'null', discarding message " + message); + } + break; + } + } + } + if (message != null) { + if (messages == null) { + messages = new ArrayList<>(this.batchSize); + } + if (isDeBatchingEnabled() && getBatchingStrategy().canDebatch(message.getMessageProperties())) { + final List messageList = messages; + getBatchingStrategy().deBatch(message, fragment -> messageList.add(fragment)); + } + else { + messages.add(message); + } } - break; } - catch (Exception ex) { - if (causeChainHasImmediateAcknowledgeAmqpException(ex)) { + else { + try { + executeListener(channel, message); + } + catch (ImmediateAcknowledgeAmqpException e) { if (this.logger.isDebugEnabled()) { - this.logger.debug("User requested ack for failed delivery: " + this.logger.debug("User requested ack for failed delivery '" + + e.getMessage() + "': " + message.getMessageProperties().getDeliveryTag()); } break; } - if (getTransactionManager() != null) { - if (getTransactionAttribute().rollbackOn(ex)) { - RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager - .getResource(getConnectionFactory()); - if (resourceHolder != null) { - consumer.clearDeliveryTags(); + catch (Exception ex) { + if (causeChainHasImmediateAcknowledgeAmqpException(ex)) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("User requested ack for failed delivery: " + + message.getMessageProperties().getDeliveryTag()); + } + break; + } + if (getTransactionManager() != null) { + if (getTransactionAttribute().rollbackOn(ex)) { + RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager + .getResource(getConnectionFactory()); + if (resourceHolder != null) { + consumer.clearDeliveryTags(); + } + else { + /* + * If we don't actually have a transaction, we have to roll back + * manually. See prepareHolderForRollback(). + */ + consumer.rollbackOnExceptionIfNecessary(ex); + } + throw ex; // encompassing transaction will handle the rollback. } else { - /* - * If we don't actually have a transaction, we have to roll back - * manually. See prepareHolderForRollback(). - */ - consumer.rollbackOnExceptionIfNecessary(ex); + if (this.logger.isDebugEnabled()) { + this.logger.debug("No rollback for " + ex); + } + break; } - throw ex; // encompassing transaction will handle the rollback. } else { - if (this.logger.isDebugEnabled()) { - this.logger.debug("No rollback for " + ex); - } - break; + consumer.rollbackOnExceptionIfNecessary(ex); + throw ex; } } - else { - consumer.rollbackOnExceptionIfNecessary(ex); - throw ex; - } } } + if (this.consumerBatchEnabled && messages != null) { + executeWithList(channel, messages, deliveryTag, consumer); + } return consumer.commitIfNecessary(isChannelLocallyTransacted()); } + private void executeWithList(Channel channel, List messages, long deliveryTag, + BlockingQueueConsumer consumer) { + + try { + executeListener(channel, messages); + } + catch (ImmediateAcknowledgeAmqpException e) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("User requested ack for failed delivery '" + + e.getMessage() + "' (last in batch): " + + deliveryTag); + } + return; + } + catch (Exception ex) { + if (causeChainHasImmediateAcknowledgeAmqpException(ex)) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("User requested ack for failed delivery (last in batch): " + + deliveryTag); + } + return; + } + if (getTransactionManager() != null) { + if (getTransactionAttribute().rollbackOn(ex)) { + RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager + .getResource(getConnectionFactory()); + if (resourceHolder != null) { + consumer.clearDeliveryTags(); + } + else { + /* + * If we don't actually have a transaction, we have to roll back + * manually. See prepareHolderForRollback(). + */ + consumer.rollbackOnExceptionIfNecessary(ex); + } + throw ex; // encompassing transaction will handle the rollback. + } + else { + if (this.logger.isDebugEnabled()) { + this.logger.debug("No rollback for " + ex); + } + } + } + else { + consumer.rollbackOnExceptionIfNecessary(ex); + throw ex; + } + } + } + protected void handleStartupFailure(BackOffExecution backOffExecution) { long recoveryInterval = backOffExecution.nextBackOff(); if (BackOffExecution.STOP == recoveryInterval) { diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/ChannelAwareBatchMessagelistener.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/ChannelAwareBatchMessagelistener.java new file mode 100644 index 0000000000..ca74e0428f --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/ChannelAwareBatchMessagelistener.java @@ -0,0 +1,42 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.listener.api; + +import java.util.List; + +import org.springframework.amqp.core.Message; + +import com.rabbitmq.client.Channel; + +/** + * Used to receive a batch of messages if the container supports it. + * + * @author Gary Russell + * @since 2.2 + * + */ +public interface ChannelAwareBatchMessagelistener extends ChannelAwareMessageListener { + + @Override + default void onMessage(Message message, Channel channel) throws Exception { + throw new UnsupportedOperationException("Should never be called by the container"); + } + + @Override + void onMessageBatch(List messages, Channel channel); + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/ChannelAwareMessageListener.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/ChannelAwareMessageListener.java index 6f918136f6..58266a3a4f 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/ChannelAwareMessageListener.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/ChannelAwareMessageListener.java @@ -16,6 +16,8 @@ package org.springframework.amqp.rabbit.listener.api; +import java.util.List; + import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; @@ -45,4 +47,9 @@ default void onMessage(Message message) { throw new IllegalStateException("Should never be called for a ChannelAwareMessageListener"); } + @SuppressWarnings("unused") + default void onMessageBatch(List messages, Channel channel) { + throw new UnsupportedOperationException("This listener does not support message batches"); + } + } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java index 8c78950327..1acfe46475 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java @@ -179,7 +179,7 @@ public void testParseWithDefaultQueueRejectedFalse() { public void testParseWithTx() { SimpleMessageListenerContainer container = beanFactory.getBean("container6", SimpleMessageListenerContainer.class); assertThat(container.isChannelTransacted()).isTrue(); - assertThat(ReflectionTestUtils.getField(container, "txSize")).isEqualTo(5); + assertThat(ReflectionTestUtils.getField(container, "batchSize")).isEqualTo(5); } @Test diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java index 1f59e2794e..5c8394451c 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java @@ -117,7 +117,7 @@ public void createContainerFullConfig() { DirectFieldAccessor fieldAccessor = new DirectFieldAccessor(container); assertThat(fieldAccessor.getPropertyValue("taskExecutor")).isSameAs(executor); assertThat(fieldAccessor.getPropertyValue("transactionManager")).isSameAs(transactionManager); - assertThat(fieldAccessor.getPropertyValue("txSize")).isEqualTo(10); + assertThat(fieldAccessor.getPropertyValue("batchSize")).isEqualTo(10); assertThat(fieldAccessor.getPropertyValue("concurrentConsumers")).isEqualTo(2); assertThat(fieldAccessor.getPropertyValue("maxConcurrentConsumers")).isEqualTo(5); assertThat(fieldAccessor.getPropertyValue("startConsumerMinInterval")).isEqualTo(2000L); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java index fae4eb7618..b04b329c91 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java @@ -277,7 +277,7 @@ public void testDebatchByContainerPerformance() throws Exception { }); container.setReceiveTimeout(100); container.setPrefetchCount(1000); - container.setTxSize(1000); + container.setBatchSize(1000); container.afterPropertiesSet(); container.start(); try { diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java index 5b5a045b47..4af9ced238 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java @@ -293,7 +293,7 @@ public void doTest(int messageCount, ErrorHandler errorHandler, CountDownLatch l container.setConcurrentConsumers(concurrentConsumers); container.setPrefetchCount(messageCount); - container.setTxSize(messageCount); + container.setBatchSize(messageCount); container.setQueueNames(queue.getName()); container.setErrorHandler(errorHandler); container.setReceiveTimeout(50); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java index 12ebe0993e..28b8199b09 100755 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java @@ -247,7 +247,7 @@ private void doTest(MessageCount level, Concurrency concurrency, TransactionMode if (transactionMode.getPrefetch() > 0) { container.setPrefetchCount(transactionMode.getPrefetch()); - container.setTxSize(transactionMode.getTxSize()); + container.setBatchSize(transactionMode.getTxSize()); } container.setQueueNames(queue.getName()); container.setShutdownTimeout(30000); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerRetryIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerRetryIntegrationTests.java index a8234240c5..3046113dcb 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerRetryIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerRetryIntegrationTests.java @@ -222,7 +222,7 @@ private void doTestRetry(int messageCount, int txSize, int failFrequency, int co container.setMessageListener(new MessageListenerAdapter(listener)); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setChannelTransacted(true); - container.setTxSize(txSize); + container.setBatchSize(txSize); container.setConcurrentConsumers(concurrentConsumers); final CountDownLatch latch = new CountDownLatch(failedMessageCount); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java index a8e301895f..9f11c35b73 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java @@ -130,7 +130,7 @@ private SimpleMessageListenerContainer createContainer(Object listener) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory()); container.setMessageListener(new MessageListenerAdapter(listener)); container.setQueueNames(queue.getName()); - container.setTxSize(txSize); + container.setBatchSize(txSize); container.setPrefetchCount(txSize); container.setConcurrentConsumers(concurrentConsumers); container.setChannelTransacted(transactional); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerRecoveryRepeatIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerRecoveryRepeatIntegrationTests.java index 115eb3ecd8..0c26ee05d1 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerRecoveryRepeatIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerRecoveryRepeatIntegrationTests.java @@ -157,7 +157,7 @@ private SimpleMessageListenerContainer createContainer(String queueName, Object SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setMessageListener(new MessageListenerAdapter(listener)); container.setQueueNames(queueName); - container.setTxSize(txSize); + container.setBatchSize(txSize); container.setPrefetchCount(txSize); container.setConcurrentConsumers(concurrentConsumers); container.setChannelTransacted(transactional); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerTxSizeIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerTxSizeIntegrationTests.java index b1b154522b..ac18738c3e 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerTxSizeIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerTxSizeIntegrationTests.java @@ -132,7 +132,7 @@ private SimpleMessageListenerContainer createContainer(Object listener) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory()); container.setMessageListener(new MessageListenerAdapter(listener)); container.setQueueNames(queue.getName()); - container.setTxSize(txSize); + container.setBatchSize(txSize); container.setPrefetchCount(txSize); container.setConcurrentConsumers(concurrentConsumers); container.setChannelTransacted(transactional); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java index 993f9357a3..2008442217 100755 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java @@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +38,7 @@ import org.junit.runners.Parameterized.Parameters; import org.springframework.amqp.core.AcknowledgeMode; +import org.springframework.amqp.core.BatchMessageListener; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.core.Queue; @@ -221,6 +223,27 @@ public void testNullQueueName() { .isThrownBy(() -> container = createContainer(m -> { }, (String) null)); } + @Test + public void testConsumerBatching() throws InterruptedException { + AtomicReference> received = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + this.container = createContainer((BatchMessageListener) messages -> { + received.set(messages); + latch.countDown(); + }, this.queue); + this.container.setConsumerBatchEnabled(true); + this.container.setBatchSize(this.messageCount); + this.container.setConcurrentConsumers(1); + this.container.afterPropertiesSet(); + this.container.start(); + for (int i = 0; i < this.messageCount; i++) { + this.template.convertAndSend(this.queue.getName(), i + "foo"); + } + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(received.get()).isNotNull(); + assertThat(received.get()).hasSize(this.messageCount); + } + private void doSunnyDayTest(CountDownLatch latch, MessageListener listener) throws Exception { container = createContainer(listener); for (int i = 0; i < messageCount; i++) { @@ -281,7 +304,7 @@ private SimpleMessageListenerContainer createContainer(MessageListener listener, private SimpleMessageListenerContainer doCreateContainer(MessageListener listener) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory()); container.setMessageListener(listener); - container.setTxSize(txSize); + container.setBatchSize(txSize); container.setPrefetchCount(txSize); container.setConcurrentConsumers(concurrentConsumers); container.setChannelTransacted(transactional); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index 23f565f86e..91566097e1 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -199,7 +199,7 @@ public void testTxSizeAcks() throws Exception { final List messages = new ArrayList<>(); final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("foo"); - container.setTxSize(2); + container.setBatchSize(2); container.setMessageListener(messages::add); container.start(); BasicProperties props = new BasicProperties(); @@ -251,7 +251,7 @@ public void testTxSizeAcksWIthShortSet() throws Exception { final List messages = new ArrayList<>(); final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("foobar"); - container.setTxSize(2); + container.setBatchSize(2); container.setMessageListener(messages::add); container.start(); BasicProperties props = new BasicProperties(); @@ -555,7 +555,7 @@ public void testNullMPP() { class Container extends SimpleMessageListenerContainer { @Override - public void executeListener(Channel channel, Message messageIn) { + public void executeListener(Channel channel, Object messageIn) { super.executeListener(channel, messageIn); } @@ -605,7 +605,7 @@ public void testAddAndRemoveAfterReceivePostProcessors() { class Container extends SimpleMessageListenerContainer { @Override - public void executeListener(Channel channel, Message messageIn) { + public void executeListener(Channel channel, Object messageIn) { super.executeListener(channel, messageIn); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerWithRabbitMQ.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerWithRabbitMQ.java index dceafad844..86408c5065 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerWithRabbitMQ.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerWithRabbitMQ.java @@ -61,7 +61,7 @@ public static void main(String[] args) throws InterruptedException { container.setConnectionFactory(connectionFactory); container.setQueueNames("foo"); container.setPrefetchCount(1000); - container.setTxSize(500); + container.setBatchSize(500); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setConcurrentConsumers(20); container.setMessageListener(new MessageListenerAdapter(new SimpleAdapter(), messageConverter)); diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 01bf0b1491..49e6cf57d4 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -2974,8 +2974,9 @@ The `concurrentConsumers` and associated properties are not available with this The following features are available with the SMLC but not the DMLC: -* `txSize`: With the SMLC, you can set this to control how many messages are delivered in a transaction or to reduce the number of acks, but it may cause the number of duplicate deliveries to increase after a failure. -(The DMLC does have `messagesPerAck`, which you can use to reduce the acks, the same as with `txSize` and the SMLC, but it cannot be used with transactions -- each message is delivered and ack'd in a separate transaction). +* `batchSize`: With the SMLC, you can set this to control how many messages are delivered in a transaction or to reduce the number of acks, but it may cause the number of duplicate deliveries to increase after a failure. +(The DMLC does have `messagesPerAck`, which you can use to reduce the acks, the same as with `batchSize` and the SMLC, but it cannot be used with transactions -- each message is delivered and ack'd in a separate transaction). +* `consumerBatchEnabled`: enables batching of discrete messages in the consumer; see <> for more information. * `maxConcurrentConsumers` and consumer scaling intervals or triggers -- there is no auto-scaling in the DMLC. It does, however, let you programmatically change the `consumersPerQueue` property and the consumers are adjusted accordingly. @@ -5038,7 +5039,7 @@ RabbitMQ calls this "`autoack`", because the broker assumes all messages are ack * `AUTO`: The container acknowledges the message automatically, unless the `MessageListener` throws an exception. Note that `acknowledgeMode` is complementary to `channelTransacted` -- if the channel is transacted, the broker requires a commit notification in addition to the ack. This is the default mode. -See also `txSize`. +See also `batchSize`. a| image::images/tickmark.png[] a| image::images/tickmark.png[] @@ -5058,7 +5059,7 @@ a| image::images/tickmark.png[] a| The number of unacknowledged messages that can be outstanding at each consumer. The higher this value is, the faster the messages can be delivered, but the higher the risk of non-sequential processing. Ignored if the `acknowledgeMode` is `NONE`. -This is increased, if necessary, to match the `txSize` or `messagePerAck`. +This is increased, if necessary, to match the `batchSize` or `messagePerAck`. Defaults to 250 since 2.0. You can set it to 1 to revert to the previous behavior. @@ -5092,12 +5093,22 @@ You can set it to `false` to revert to the previous behavior. a| image::images/tickmark.png[] a| image::images/tickmark.png[] -| txSize +| batchSize (transaction-size) +(batch-size) | When used with `acknowledgeMode` set to `AUTO`, the container tries to process up to this number of messages before sending an ack (waiting for each one up to the receive timeout setting). This is also when a transactional channel is committed. -If the `prefetchCount` is less than the `txSize`, it is increased to match the `txSize`. +If the `prefetchCount` is less than the `batchSize`, it is increased to match the `batchSize`. + +a| image::images/tickmark.png[] +a| + +| consumerBatchEnabled +(batch-enabled) + +| If the `MessageListener` supports it, setting this to true enables batching of discrete messages, up to `batchSize`; a partial batch will be delivered if no new messages arrive in `receiveTimeout`. +When this is false, batching is only supported for batches created by a producer; see <>. a| image::images/tickmark.png[] a| @@ -5134,7 +5145,8 @@ a| image::images/tickmark.png[] | The maximum time to wait for each message. If `acknowledgeMode=NONE`, this has very little effect -- the container spins round and asks for another message. -It has the biggest effect for a transactional `Channel` with `txSize > 1`, since it can cause messages already consumed not to be acknowledged until the timeout expires. +It has the biggest effect for a transactional `Channel` with `batchSize > 1`, since it can cause messages already consumed not to be acknowledged until the timeout expires. +When `consumerBatchEnabled` is true, a partial batch will be delivered if this timeout occurs before a batch is complete. a| image::images/tickmark.png[] a| @@ -5261,7 +5273,7 @@ a| (min-consecutive-active) | The minimum number of consecutive messages received by a consumer, without a receive timeout occurring, when considering starting a new consumer. -Also impacted by 'txSize'. +Also impacted by 'batchSize'. See <>. Default: 10. @@ -5272,7 +5284,7 @@ a| (min-consecutive-idle) | The minimum number of receive timeouts a consumer must experience before considering stopping a consumer. -Also impacted by 'txSize'. +Also impacted by 'batchSize'. See <>. Default: 10. @@ -5611,14 +5623,14 @@ This works in conjunction with four additional properties: `consecutiveActiveTri With the default settings, the algorithm to increase consumers works as follows: If the `maxConcurrentConsumers` has not been reached and an existing consumer is active for ten consecutive cycles AND at least 10 seconds has elapsed since the last consumer was started, a new consumer is started. -A consumer is considered active if it received at least one message in `txSize` * `receiveTimeout` milliseconds. +A consumer is considered active if it received at least one message in `batchSize` * `receiveTimeout` milliseconds. With the default settings, the algorithm to decrease consumers works as follows: If there are more than `concurrentConsumers` running and a consumer detects ten consecutive timeouts (idle) AND the last consumer was stopped at least 60 seconds ago, a consumer is stopped. -The timeout depends on the `receiveTimeout` and the `txSize` properties. -A consumer is considered idle if it receives no messages in `txSize` * `receiveTimeout` milliseconds. -So, with the default timeout (one second) and a `txSize` of four, stopping a consumer is considered after 40 seconds of idle time (four timeouts correspond to one idle detection). +The timeout depends on the `receiveTimeout` and the `batchSize` properties. +A consumer is considered idle if it receives no messages in `batchSize` * `receiveTimeout` milliseconds. +So, with the default timeout (one second) and a `batchSize` of four, stopping a consumer is considered after 40 seconds of idle time (four timeouts correspond to one idle detection). NOTE: Practically, consumers can be stopped only if the whole container is idle for some time. This is because the broker shares its work across all the active consumers. diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index e7b58c2f05..2135364d7f 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -27,6 +27,9 @@ When receiving batched messages one-at-a-time, the last message has the `isLastI In addition, received batched messages now contain the `amqp_batchSize` header. +Listeners can also consume batches created in the `SimpleMessageListenerContainer`, even if the batch is not created by the producer. +See <> for more information. + Spring Data Projection interfaces are now supported by the `Jackson2JsonMessageConverter`. See <> for more information.