Skip to content

Commit

Permalink
GH-1032: Add consumer-side batching support
Browse files Browse the repository at this point in the history
Resolves #1032

* Missing Javadocs
* Still De-Batch producer batches if so configured
* Polishing; detect incorrect configuration.
* Revert to generic message for incorrect configuration.
  • Loading branch information
garyrussell authored and artembilan committed Jun 26, 2019
1 parent c1e3179 commit 3aed80f
Show file tree
Hide file tree
Showing 22 changed files with 492 additions and 120 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.core;

import java.util.List;

/**
* Used to receive a batch of messages if the container supports it.
*
* @author Gary Russell
* @since 2.2
*
*/
public interface BatchMessageListener extends MessageListener {

@Override
default void onMessage(Message message) {
throw new UnsupportedOperationException("Should never be called by the container");
}

@Override
void onMessageBatch(List<Message> messages);


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.amqp.core;

import java.util.List;

/**
* Listener interface to receive asynchronous delivery of Amqp Messages.
*
Expand All @@ -25,6 +27,10 @@
@FunctionalInterface
public interface MessageListener {

/**
* Delivers a single message.
* @param message the message.
*/
void onMessage(Message message);

/**
Expand All @@ -37,4 +43,13 @@ default void containerAckMode(AcknowledgeMode mode) {
// NOSONAR - empty
}

/**
* Delivers a batch of messages.
* @param messages the messages.
* @since 2.2
*/
default void onMessageBatch(List<Message> messages) {
throw new UnsupportedOperationException("This listener does not support message batches");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,14 @@ public class ListenerContainerFactoryBean extends AbstractFactoryBean<AbstractMe

private Long receiveTimeout;

private Integer txSize;
private Integer batchSize;

private Integer declarationRetries;

private Long retryDeclarationInterval;

private Boolean consumerBatchEnabled;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -366,8 +368,53 @@ public void setReceiveTimeout(long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}

/**
* This property has several functions.
* <p>
* When the channel is transacted, it determines how many messages to process in a
* single transaction. It should be less than or equal to
* {@link #setPrefetchCount(int) the prefetch count}.
* <p>
* It also affects how often acks are sent when using
* {@link org.springframework.amqp.core.AcknowledgeMode#AUTO} - one ack per BatchSize.
* <p>
* Finally, when {@link #setConsumerBatchEnabled(boolean)} is true, it determines how
* many records to include in the batch as long as sufficient messages arrive within
* {@link #setReceiveTimeout(long)}.
* <p>
* <b>IMPORTANT</b> The batch size represents the number of physical messages
* received. If {@link #setDeBatchingEnabled(boolean)} is true and a message is a
* batch created by a producer, the actual number of messages received by the listener
* will be larger than this batch size.
* <p>
*
* Default is 1.
* @param batchSize the batch size
* @since 2.2
*/
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

/**
* Set the txSize.
* @param txSize the txSize.
* @deprecated in favor of {@link #setBatchSize(int)}.
*/
@Deprecated
public void setTxSize(int txSize) {
this.txSize = txSize;
setBatchSize(txSize);
}

/**
* Set to true to present a list of messages based on the {@link #setBatchSize(int)},
* if the container and listener support it.
* @param consumerBatchEnabled true to create message batches in the container.
* @since 2.2
* @see #setBatchSize(int)
*/
public void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
this.consumerBatchEnabled = consumerBatchEnabled;
}

public void setDeclarationRetries(int declarationRetries) {
Expand All @@ -380,8 +427,9 @@ public void setRetryDeclarationInterval(long retryDeclarationInterval) {

@Override
public Class<?> getObjectType() {
return this.listenerContainer == null ? AbstractMessageListenerContainer.class : this.listenerContainer
.getClass();
return this.listenerContainer == null
? AbstractMessageListenerContainer.class
: this.listenerContainer.getClass();
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -446,7 +494,8 @@ private AbstractMessageListenerContainer createContainer() {
.acceptIfNotNull(this.consecutiveActiveTrigger, container::setConsecutiveActiveTrigger)
.acceptIfNotNull(this.consecutiveIdleTrigger, container::setConsecutiveIdleTrigger)
.acceptIfNotNull(this.receiveTimeout, container::setReceiveTimeout)
.acceptIfNotNull(this.txSize, container::setTxSize)
.acceptIfNotNull(this.batchSize, container::setBatchSize)
.acceptIfNotNull(this.consumerBatchEnabled, container::setConsumerBatchEnabled)
.acceptIfNotNull(this.declarationRetries, container::setDeclarationRetries)
.acceptIfNotNull(this.retryDeclarationInterval, container::setRetryDeclarationInterval);
return container;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb
super.initializeContainer(instance, endpoint);

JavaUtils javaUtils = JavaUtils.INSTANCE
.acceptIfNotNull(this.txSize, instance::setTxSize);
.acceptIfNotNull(this.txSize, instance::setBatchSize);
String concurrency = null;
if (endpoint != null) {
concurrency = endpoint.getConcurrency();
Expand Down
Loading

0 comments on commit 3aed80f

Please sign in to comment.