diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index d297eb7d7bdde..f18f4a84e132a 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -597,10 +597,12 @@ private static void runProducer(int producerId, .sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) // .compressionType(arguments.compression) // .maxPendingMessages(arguments.maxOutstanding) // - .maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions) .accessMode(arguments.producerAccessMode) // enable round robin message routing if it is a partitioned topic .messageRoutingMode(MessageRoutingMode.RoundRobinPartition); + if (arguments.maxPendingMessagesAcrossPartitions > 0) { + producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions); + } AtomicReference transactionAtomicReference; if (arguments.isEnableTransaction) { diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java index 920a36bd56a22..3adedc3ac605e 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java @@ -190,4 +190,25 @@ public void testDefaultIMessageFormatter() { Assert.assertTrue(msgFormatter instanceof DefaultMessageFormatter); } + @Test + public void testMaxOutstanding() throws Exception { + String argString = "%s -r 10 -u %s -au %s -m 5 -o 10000"; + String topic = testTopic + UUID.randomUUID().toString(); + String args = String.format(argString, topic, pulsar.getBrokerServiceUrl(), pulsar.getWebServiceAddress()); + Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub") + .subscriptionType(SubscriptionType.Key_Shared).subscribe(); + new Thread(() -> { + try { + PerformanceProducer.main(args.split(" ")); + } catch (Exception e) { + log.error("Failed to start perf producer"); + } + }).start(); + Awaitility.await() + .untilAsserted(() -> { + Message message = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(message); + }); + consumer.close(); + } }