Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cli] Enable CLI to publish non-batched messages #12641

Merged
merged 1 commit into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import lombok.Cleanup;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
Expand Down Expand Up @@ -106,7 +109,7 @@ public void testNonDurableSubscribe() throws Exception {
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");

final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString();
final String topicName = getTopicWithRandomSuffix("non-durable");

int numberOfMessages = 10;
@Cleanup("shutdownNow")
Expand Down Expand Up @@ -155,7 +158,7 @@ public void testDurableSubscribe() throws Exception {
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");

final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString();
final String topicName = getTopicWithRandomSuffix("durable");

int numberOfMessages = 10;
@Cleanup("shutdownNow")
Expand Down Expand Up @@ -197,7 +200,7 @@ public void testEncryption() throws Exception {
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");

final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString();
final String topicName = getTopicWithRandomSuffix("encryption");
final String keyUriBase = "file:../pulsar-broker/src/test/resources/certificate/";
final int numberOfMessages = 10;

Expand Down Expand Up @@ -234,4 +237,41 @@ public void testEncryption() throws Exception {
}
}

@Test(timeOut = 20000)
public void testDisableBatching() throws Exception {
Properties properties = new Properties();
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");

final String topicName = getTopicWithRandomSuffix("disable-batching");
final int numberOfMessages = 5;

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();

PulsarClientTool pulsarClientTool1 = new PulsarClientTool(properties);
String[] args1 = {"produce", "-m", "batched", "-n", Integer.toString(numberOfMessages), topicName};
Assert.assertEquals(pulsarClientTool1.run(args1), 0);

PulsarClientTool pulsarClientTool2 = new PulsarClientTool(properties);
String[] args2 = {"produce", "-m", "non-batched", "-n", Integer.toString(numberOfMessages), "-db", topicName};
Assert.assertEquals(pulsarClientTool2.run(args2), 0);

for (int i = 0; i < numberOfMessages * 2; i++) {
Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
if (i < numberOfMessages) {
Assert.assertEquals(new String(msg.getData()), "batched");
Assert.assertTrue(msg.getMessageId() instanceof BatchMessageIdImpl);
} else {
Assert.assertEquals(new String(msg.getData()), "non-batched");
Assert.assertFalse(msg.getMessageId() instanceof BatchMessageIdImpl);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that you have not checked if the message is from a batch message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check not enough? Is there any other good way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, my mistake. It's good.

}
}
}

private static String getTopicWithRandomSuffix(String localNameBase) {
return String.format("persistent://prop/ns-abc/test/%s-%s", localNameBase, UUID.randomUUID().toString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public class CmdProduce {
description = "Rate (in msg/sec) at which to produce," +
" value 0 means to produce messages as fast as possible.")
private double publishRate = 0;

@Parameter(names = { "-db", "--disable-batching" }, description = "Disable batch sending of messages")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It better to named --batch-enabled? and the default is true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, boolean type options do not require a value, so the default value should be false.

# By default batching is enabled
$ ./bin/pulsar-client produce -m hello persistent://public/default/t1

# Batching is enabled in this case as well
$ ./bin/pulsar-client produce -m hello --batch-enabled persistent://public/default/t1

We can set the default value to true by specifying arity = 1, but it may be a bit verbose.
https://jcommander.org/#_boolean

# Batching is enabled
$ ./bin/pulsar-client produce -m hello --batch-enabled true persistent://public/default/t1

# Batching is disabled
$ ./bin/pulsar-client produce -m hello --batch-enabled false persistent://public/default/t1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that makes sense.

How about keeping the same configurations with the perf producer?

        @Parameter(names = { "-b",
                "--batch-time-window" }, description = "Batch messages in 'x' ms window (Default: 1ms)")
        public double batchTimeMillis = 1.0;

        @Parameter(names = {
            "-bm", "--batch-max-messages"
        }, description = "Maximum number of messages per batch")
        public int batchMaxMessages = DEFAULT_BATCHING_MAX_MESSAGES;

        @Parameter(names = {
            "-bb", "--batch-max-bytes"
        }, description = "Maximum number of bytes per batch")
        public int batchMaxBytes = 4 * 1024 * 1024
if (arguments.batchTimeMillis <= 0.0 && arguments.batchMaxMessages <= 0) {
                producerBuilder.enableBatching(false);
            }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the pulsar-client command, one batch message always contains only one message, even if batching is enabled. This is because we are using the synchronous method send() instead of the asynchronous method sendAsync().

So, unlike the perf producer, I don't think such fine-grained settings are useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no particular benefit to enabling batching for the pulsar-client command, so we have the option of always disabling it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that make sense.

private boolean disableBatching = false;

@Parameter(names = { "-c",
"--chunking" }, description = "Should split the message and publish in chunks if message size is larger than allowed max size")
Expand Down Expand Up @@ -247,6 +250,8 @@ private int publish(String topic) {
if (this.chunkingAllowed) {
producerBuilder.enableChunking(true);
producerBuilder.enableBatching(false);
} else if (this.disableBatching) {
producerBuilder.enableBatching(false);
}
if (isNotBlank(this.encKeyName) && isNotBlank(this.encKeyValue)) {
producerBuilder.addEncryptionKey(this.encKeyName);
Expand Down
1 change: 1 addition & 0 deletions site2/docs/reference-cli-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ Options
|`-m`, `--messages`|Comma-separated string of messages to send; either -m or -f must be specified|[]|
|`-n`, `--num-produce`|The number of times to send the message(s); the count of messages/files * num-produce should be below 1000|1|
|`-r`, `--rate`|Rate (in messages per second) at which to produce; a value 0 means to produce messages as fast as possible|0.0|
|`-db`, `--disable-batching`|Disable batch sending of messages|false|
|`-c`, `--chunking`|Split the message and publish in chunks if the message size is larger than the allowed max size|false|
|`-s`, `--separator`|Character to split messages string with.|","|
|`-k`, `--key`|Message key to add|key=value string, like k1=v1,k2=v2.|
Expand Down