From 74abb31923e2ebb72c13f0e2bcdacf3a362c81c9 Mon Sep 17 00:00:00 2001 From: penghui Date: Sun, 24 Apr 2022 09:33:44 +0800 Subject: [PATCH] [fix][tools] Only apply maxPendingMessagesAcrossPartitions if it presents ### Motivation After #13344, we have changed the default value of maxPendingMessagesAcrossPartitions to `0`, for the performance producer, it will always apply the default value of maxPendingMessagesAcrossPartitions while creating producers, so it will always use `0` for maxPendingMessagesAcrossPartitions. But we have a check here https://github.com/apache/pulsar/blob/0a91196dcc4d31ae647867ed319b8c1af0cb93c6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java#L138-L142 This will lead the performance producer not able to create if users only set `-o` option ``` 06:32:49.628 [pulsar-perf-producer-exec-1-1] ERROR org.apache.pulsar.testclient.PerformanceProducer - Got error java.lang.IllegalArgumentException: maxPendingMessagesAcrossPartitions needs to be >= maxPendingMessages at org.apache.pulsar.shade.com.google.common.base.Preconditions.checkArgument(Preconditions.java:145) ~[pulsar-client-admin-2.10.0.2.jar:2.10.0.2] at org.apache.pulsar.client.impl.conf.ProducerConfigurationData.setMaxPendingMessagesAcrossPartitions(ProducerConfigurationData.java:138) ~[pulsar-client-admin-2.10.0.2.jar:2.10.0.2] at org.apache.pulsar.client.impl.ProducerBuilderImpl.maxPendingMessagesAcrossPartitions(ProducerBuilderImpl.java:148) ~[pulsar-client-admin-2.10.0.2.jar:2.10.0.2] at org.apache.pulsar.testclient.PerformanceProducer.runProducer(PerformanceProducer.java:600) ~[pulsar-testclient-2.10.0.2.jar:2.10.0.2] at org.apache.pulsar.testclient.PerformanceProducer.lambda$main$1(PerformanceProducer.java:464) ~[pulsar-testclient-2.10.0.2.jar:2.10.0.2] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_332] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_332] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_332] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_332] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.74.Final.jar:4.1.74.Final] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332] ``` ### Modification Change the performance producer to only call `.maxPendingMessagesAcrossPartitions()` if it presents. ### Verification Added new test to only use `-o` to start the performance producer, use a consumer to consume data from the topic to make sure the performance producer has been created successfully. --- .../testclient/PerformanceProducer.java | 4 +++- .../testclient/PerformanceProducerTest.java | 21 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index d297eb7d7bdde..f18f4a84e132a 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -597,10 +597,12 @@ private static void runProducer(int producerId, .sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) // .compressionType(arguments.compression) // .maxPendingMessages(arguments.maxOutstanding) // - .maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions) .accessMode(arguments.producerAccessMode) // enable round robin message routing if it is a partitioned topic .messageRoutingMode(MessageRoutingMode.RoundRobinPartition); + if (arguments.maxPendingMessagesAcrossPartitions > 0) { + producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions); + } AtomicReference transactionAtomicReference; if (arguments.isEnableTransaction) { diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java index 920a36bd56a22..3adedc3ac605e 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java @@ -190,4 +190,25 @@ public void testDefaultIMessageFormatter() { Assert.assertTrue(msgFormatter instanceof DefaultMessageFormatter); } + @Test + public void testMaxOutstanding() throws Exception { + String argString = "%s -r 10 -u %s -au %s -m 5 -o 10000"; + String topic = testTopic + UUID.randomUUID().toString(); + String args = String.format(argString, topic, pulsar.getBrokerServiceUrl(), pulsar.getWebServiceAddress()); + Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub") + .subscriptionType(SubscriptionType.Key_Shared).subscribe(); + new Thread(() -> { + try { + PerformanceProducer.main(args.split(" ")); + } catch (Exception e) { + log.error("Failed to start perf producer"); + } + }).start(); + Awaitility.await() + .untilAsserted(() -> { + Message message = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(message); + }); + consumer.close(); + } }