From 8cccdf567b293484c6c1e8843d511eae085952fe Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Fri, 5 Nov 2021 20:15:52 +0900 Subject: [PATCH] Enable CLI to publish non-batched messages --- .../client/cli/PulsarClientToolTest.java | 46 +++++++++++++++++-- .../apache/pulsar/client/cli/CmdProduce.java | 5 ++ site2/docs/reference-cli-tools.md | 1 + 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java index d56308911ddf4..b1e067035d273 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java @@ -33,6 +33,9 @@ import lombok.Cleanup; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; import org.testng.Assert; @@ -106,7 +109,7 @@ public void testNonDurableSubscribe() throws Exception { properties.setProperty("serviceUrl", brokerUrl.toString()); properties.setProperty("useTls", "false"); - final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString(); + final String topicName = getTopicWithRandomSuffix("non-durable"); int numberOfMessages = 10; @Cleanup("shutdownNow") @@ -155,7 +158,7 @@ public void testDurableSubscribe() throws Exception { properties.setProperty("serviceUrl", brokerUrl.toString()); properties.setProperty("useTls", "false"); - final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString(); + final String topicName = getTopicWithRandomSuffix("durable"); int numberOfMessages = 10; @Cleanup("shutdownNow") @@ -197,7 +200,7 @@ public void testEncryption() throws Exception { properties.setProperty("serviceUrl", brokerUrl.toString()); properties.setProperty("useTls", "false"); - final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString(); + final String topicName = getTopicWithRandomSuffix("encryption"); final String keyUriBase = "file:../pulsar-broker/src/test/resources/certificate/"; final int numberOfMessages = 10; @@ -234,4 +237,41 @@ public void testEncryption() throws Exception { } } + @Test(timeOut = 20000) + public void testDisableBatching() throws Exception { + Properties properties = new Properties(); + properties.setProperty("serviceUrl", brokerUrl.toString()); + properties.setProperty("useTls", "false"); + + final String topicName = getTopicWithRandomSuffix("disable-batching"); + final int numberOfMessages = 5; + + @Cleanup + Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe(); + + PulsarClientTool pulsarClientTool1 = new PulsarClientTool(properties); + String[] args1 = {"produce", "-m", "batched", "-n", Integer.toString(numberOfMessages), topicName}; + Assert.assertEquals(pulsarClientTool1.run(args1), 0); + + PulsarClientTool pulsarClientTool2 = new PulsarClientTool(properties); + String[] args2 = {"produce", "-m", "non-batched", "-n", Integer.toString(numberOfMessages), "-db", topicName}; + Assert.assertEquals(pulsarClientTool2.run(args2), 0); + + for (int i = 0; i < numberOfMessages * 2; i++) { + Message msg = consumer.receive(10, TimeUnit.SECONDS); + Assert.assertNotNull(msg); + if (i < numberOfMessages) { + Assert.assertEquals(new String(msg.getData()), "batched"); + Assert.assertTrue(msg.getMessageId() instanceof BatchMessageIdImpl); + } else { + Assert.assertEquals(new String(msg.getData()), "non-batched"); + Assert.assertFalse(msg.getMessageId() instanceof BatchMessageIdImpl); + } + } + } + + private static String getTopicWithRandomSuffix(String localNameBase) { + return String.format("persistent://prop/ns-abc/test/%s-%s", localNameBase, UUID.randomUUID().toString()); + } + } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java index 1ae2d38e106ed..0ec22ba2e7b48 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java @@ -107,6 +107,9 @@ public class CmdProduce { description = "Rate (in msg/sec) at which to produce," + " value 0 means to produce messages as fast as possible.") private double publishRate = 0; + + @Parameter(names = { "-db", "--disable-batching" }, description = "Disable batch sending of messages") + private boolean disableBatching = false; @Parameter(names = { "-c", "--chunking" }, description = "Should split the message and publish in chunks if message size is larger than allowed max size") @@ -247,6 +250,8 @@ private int publish(String topic) { if (this.chunkingAllowed) { producerBuilder.enableChunking(true); producerBuilder.enableBatching(false); + } else if (this.disableBatching) { + producerBuilder.enableBatching(false); } if (isNotBlank(this.encKeyName) && isNotBlank(this.encKeyValue)) { producerBuilder.addEncryptionKey(this.encKeyName); diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md index 6eb7555974628..df41ffdc821e5 100644 --- a/site2/docs/reference-cli-tools.md +++ b/site2/docs/reference-cli-tools.md @@ -312,6 +312,7 @@ Options |`-m`, `--messages`|Comma-separated string of messages to send; either -m or -f must be specified|[]| |`-n`, `--num-produce`|The number of times to send the message(s); the count of messages/files * num-produce should be below 1000|1| |`-r`, `--rate`|Rate (in messages per second) at which to produce; a value 0 means to produce messages as fast as possible|0.0| +|`-db`, `--disable-batching`|Disable batch sending of messages|false| |`-c`, `--chunking`|Split the message and publish in chunks if the message size is larger than the allowed max size|false| |`-s`, `--separator`|Character to split messages string with.|","| |`-k`, `--key`|Message key to add|key=value string, like k1=v1,k2=v2.|