Skip to content

Commit

Permalink
Deprecate maxPendingMessagesAcrossPartitions
Browse files Browse the repository at this point in the history
  • Loading branch information
HQebupt committed Dec 23, 2021
1 parent a754347 commit 2874be4
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public interface ProducerBuilder<T> extends Cloneable {
* the max size of the pending messages queue for the producer
* @return the producer builder instance
*/
@Deprecated
ProducerBuilder<T> maxPendingMessages(int maxPendingMessages);

/**
Expand All @@ -188,7 +189,7 @@ public interface ProducerBuilder<T> extends Cloneable {
* The purpose of this setting is to have an upper-limit on the number
* of pending messages when publishing on a partitioned topic.
*
* <p>Default is 50000.
* <p>Default is 0, disable the pending messages across partitions check.
*
* <p>If publishing at high rate over a topic with many partitions (especially when publishing messages without a
* partitioning key), it might be beneficial to increase this parameter to allow for more pipelining within the
Expand All @@ -198,6 +199,7 @@ public interface ProducerBuilder<T> extends Cloneable {
* max pending messages across all the partitions
* @return the producer builder instance
*/
@Deprecated
ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,14 @@ public ProducerBuilder<T> sendTimeout(int sendTimeout, @NonNull TimeUnit unit) {
return this;
}

@Deprecated
@Override
public ProducerBuilder<T> maxPendingMessages(int maxPendingMessages) {
conf.setMaxPendingMessages(maxPendingMessages);
return this;
}

@Deprecated
@Override
public ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {

public static final int DEFAULT_BATCHING_MAX_MESSAGES = 1000;
public static final int DEFAULT_MAX_PENDING_MESSAGES = 0;
public static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 50000;
public static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 0;

private String topicName = null;
private String producerName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,12 @@ public void testProducerBuilderImplWhenSendTimeoutPropertyIsNegative() {

@Test(expectedExceptions = IllegalArgumentException.class)
public void testProducerBuilderImplWhenMaxPendingMessagesAcrossPartitionsPropertyIsInvalid() {
producerBuilderImpl.maxPendingMessagesAcrossPartitions(999);
producerBuilderImpl.maxPendingMessagesAcrossPartitions(-1);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "maxPendingMessagesAcrossPartitions needs to be >= maxPendingMessages")
public void testProducerBuilderImplWhenMaxPendingMessagesAcrossPartitionsPropertyIsInvalidErrorMessages() {
producerBuilderImpl.maxPendingMessagesAcrossPartitions(999);
producerBuilderImpl.maxPendingMessagesAcrossPartitions(-1);
}

@Test
Expand Down

0 comments on commit 2874be4

Please sign in to comment.