From 3e13d586f1046a5e7f95e78d93b3bbc0b131fb21 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 May 2022 11:14:38 +0300 Subject: [PATCH] [Cli tools] Disable Pulsar client memory limit by default - There's a regression with the tools since the memory limit cannot be adjusted - It's better to default to the previous setting of disabling memory limits so that the performance profile doesn't change because of the memory limit. --- .../java/org/apache/pulsar/client/cli/PulsarClientTool.java | 4 +++- .../org/apache/pulsar/testclient/LoadSimulationClient.java | 2 ++ .../org/apache/pulsar/testclient/PerformanceConsumer.java | 2 ++ .../org/apache/pulsar/testclient/PerformanceProducer.java | 2 ++ .../java/org/apache/pulsar/testclient/PerformanceReader.java | 2 ++ .../org/apache/pulsar/testclient/PerformanceTransaction.java | 5 ++++- 6 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java index fe716941dbc45a..617ed6b5a7caf8 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java @@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.api.SizeUnit; @Parameters(commandDescription = "Produce or consume messages on a specified topic") public class PulsarClientTool { @@ -119,7 +120,8 @@ public PulsarClientTool(Properties properties) { } private void updateConfig() throws UnsupportedAuthenticationException { - ClientBuilder clientBuilder = PulsarClient.builder(); + ClientBuilder clientBuilder = PulsarClient.builder() + .memoryLimit(0, SizeUnit.BYTES); Authentication authentication = null; if (isNotBlank(this.authPluginClassName)) { authentication = AuthenticationFactory.create(authPluginClassName, authParams); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java index 34def55b9db852..04f1be4e3823cb 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SizeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -317,6 +318,7 @@ public LoadSimulationClient(final MainArguments arguments) throws Exception { .serviceHttpUrl(arguments.serviceURL) .build(); client = PulsarClient.builder() + .memoryLimit(0, SizeUnit.BYTES) .serviceUrl(arguments.serviceURL) .connectionsPerBroker(4) .ioThreads(Runtime.getRuntime().availableProcessors()) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 0b9b719bfa2b5c..6b145ca288ab9c 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -51,6 +51,7 @@ import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; @@ -356,6 +357,7 @@ public static void main(String[] args) throws Exception { long testEndTime = startTime + (long) (arguments.testTime * 1e9); ClientBuilder clientBuilder = PulsarClient.builder() // + .memoryLimit(0, SizeUnit.BYTES) .enableTransaction(arguments.isEnableTransaction) .serviceUrl(arguments.serviceURL) // .connectionsPerBroker(arguments.maxConnections) // diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index ea40aa31ae2a1a..46155bd5eb4364 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -68,6 +68,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -572,6 +573,7 @@ private static void runProducer(int producerId, List>> futures = new ArrayList<>(); ClientBuilder clientBuilder = PulsarClient.builder() // + .memoryLimit(0, SizeUnit.BYTES) .enableTransaction(arguments.isEnableTransaction)// .serviceUrl(arguments.serviceURL) // .connectionsPerBroker(arguments.maxConnections) // diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java index 724cb4378b3b8b..1c99dce8c3240e 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.ReaderListener; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; @@ -257,6 +258,7 @@ public static void main(String[] args) throws Exception { }; ClientBuilder clientBuilder = PulsarClient.builder() // + .memoryLimit(0, SizeUnit.BYTES) .serviceUrl(arguments.serviceURL) // .connectionsPerBroker(arguments.maxConnections) // .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) // diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java index 52af64d1beac87..1258918b18a22f 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java @@ -62,6 +62,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; @@ -295,7 +296,9 @@ public static void main(String[] args) } ClientBuilder clientBuilder = - PulsarClient.builder().enableTransaction(!arguments.isDisableTransaction) + PulsarClient.builder() + .memoryLimit(0, SizeUnit.BYTES) + .enableTransaction(!arguments.isDisableTransaction) .serviceUrl(arguments.serviceURL) .connectionsPerBroker(arguments.maxConnections) .statsInterval(0, TimeUnit.SECONDS)