Skip to content

Commit

Permalink
[Cli tools] Disable Pulsar client memory limit by default
Browse files Browse the repository at this point in the history
- There's a regression with the tools since the memory limit cannot be adjusted
  - It's better to default to the previous setting of disabling memory limits
    so that the performance profile doesn't change because of the memory limit.
  • Loading branch information
lhotari committed May 24, 2022
1 parent 3dbf1f5 commit 3e13d58
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.SizeUnit;

@Parameters(commandDescription = "Produce or consume messages on a specified topic")
public class PulsarClientTool {
Expand Down Expand Up @@ -119,7 +120,8 @@ public PulsarClientTool(Properties properties) {
}

private void updateConfig() throws UnsupportedAuthenticationException {
ClientBuilder clientBuilder = PulsarClient.builder();
ClientBuilder clientBuilder = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES);
Authentication authentication = null;
if (isNotBlank(this.authPluginClassName)) {
authentication = AuthenticationFactory.create(authPluginClassName, authParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SizeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -317,6 +318,7 @@ public LoadSimulationClient(final MainArguments arguments) throws Exception {
.serviceHttpUrl(arguments.serviceURL)
.build();
client = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES)
.serviceUrl(arguments.serviceURL)
.connectionsPerBroker(4)
.ioThreads(Runtime.getRuntime().availableProcessors())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
Expand Down Expand Up @@ -356,6 +357,7 @@ public static void main(String[] args) throws Exception {
long testEndTime = startTime + (long) (arguments.testTime * 1e9);

ClientBuilder clientBuilder = PulsarClient.builder() //
.memoryLimit(0, SizeUnit.BYTES)
.enableTransaction(arguments.isEnableTransaction)
.serviceUrl(arguments.serviceURL) //
.connectionsPerBroker(arguments.maxConnections) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
Expand Down Expand Up @@ -572,6 +573,7 @@ private static void runProducer(int producerId,
List<Future<Producer<byte[]>>> futures = new ArrayList<>();

ClientBuilder clientBuilder = PulsarClient.builder() //
.memoryLimit(0, SizeUnit.BYTES)
.enableTransaction(arguments.isEnableTransaction)//
.serviceUrl(arguments.serviceURL) //
.connectionsPerBroker(arguments.maxConnections) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -257,6 +258,7 @@ public static void main(String[] args) throws Exception {
};

ClientBuilder clientBuilder = PulsarClient.builder() //
.memoryLimit(0, SizeUnit.BYTES)
.serviceUrl(arguments.serviceURL) //
.connectionsPerBroker(arguments.maxConnections) //
.statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
Expand Down Expand Up @@ -295,7 +296,9 @@ public static void main(String[] args)
}

ClientBuilder clientBuilder =
PulsarClient.builder().enableTransaction(!arguments.isDisableTransaction)
PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES)
.enableTransaction(!arguments.isDisableTransaction)
.serviceUrl(arguments.serviceURL)
.connectionsPerBroker(arguments.maxConnections)
.statsInterval(0, TimeUnit.SECONDS)
Expand Down

0 comments on commit 3e13d58

Please sign in to comment.