Skip to content

Commit

Permalink
[improve][client]PIP-189: No batching if only one message in batch (a…
Browse files Browse the repository at this point in the history
…pache#16605)

[improve][client]PIP-189: No batching if only one message in batch apache#16605

### Motivation

* See apache#16619

### Modifications

* See apache#16619
* Most of the Modifications are relevant to `BatchMessageContainerImpl`
* Of course there are some tests about batching need to be modified, because batched producer can also pubulish non-batched messages when this PIP applies. The tests include:
    * `RGUsageMTAggrWaitForAllMsgsTest`
    * `BatchMessageTest`
    * `BrokerEntryMetadataE2ETest`
    * `ClientDeduplicationTest`
    * `TopicReaderTest`
    * `PulsarClientToolTest`
  • Loading branch information
AnonHxy authored and nicklixinyang committed Aug 29, 2022
1 parent 5f9df13 commit b0467f0
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -769,8 +769,8 @@ private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
Assert.assertNotEquals(ninthPercentileValue, 0);
}

// Empirically, there appears to be a 42-byte overhead for metadata, imposed by Pulsar runtime.
private static final int PER_MESSAGE_METADATA_OHEAD = 42;
// Empirically, there appears to be a 31-byte overhead for metadata, imposed by Pulsar runtime.
private static final int PER_MESSAGE_METADATA_OHEAD = 31;

private static final int PUBLISH_INTERVAL_SECS = 10;
private static final int NUM_PRODUCERS = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -865,6 +866,37 @@ public void testOrderingOfKeyBasedBatchMessageContainer() throws PulsarClientExc
producer.close();
}

@Test(dataProvider = "containerBuilder")
public void testBatchSendOneMessage(BatcherBuilder builder) throws Exception {
final String topicName = "persistent://prop/ns-abc/testBatchSendOneMessage-" + UUID.randomUUID();
final String subscriptionName = "sub-1";

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared).subscribe();

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS).batchingMaxMessages(10).enableBatching(true)
.batcherBuilder(builder)
.create();
String msg = "my-message";
MessageId messageId = producer.newMessage().value(msg.getBytes()).property("key1", "value1").send();

Assert.assertTrue(messageId instanceof MessageIdImpl);
Assert.assertFalse(messageId instanceof BatchMessageIdImpl);

Message<byte[]> received = consumer.receive();
assertEquals(received.getSequenceId(), 0);
consumer.acknowledge(received);

Assert.assertEquals(new String(received.getData()), msg);
Assert.assertFalse(received.getProperties().isEmpty());
Assert.assertEquals(received.getProperties().get("key1"), "value1");
Assert.assertFalse(received.getMessageId() instanceof BatchMessageIdImpl);

producer.close();
consumer.close();
}

@Test(dataProvider = "containerBuilder")
public void testRetrieveSequenceIdGenerated(BatcherBuilder builder) throws Exception {

Expand Down Expand Up @@ -1034,7 +1066,10 @@ private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subTyp
if (enableBatch) {
// only ack messages which batch index < 2, which means we will not to ack the
// whole batch for the batch that with more than 2 messages
if (((BatchMessageIdImpl) message.getMessageId()).getBatchIndex() < 2) {
if ((message.getMessageId() instanceof BatchMessageIdImpl)
&& ((BatchMessageIdImpl) message.getMessageId()).getBatchIndex() < 2) {
consumer.acknowledgeAsync(message).get();
} else if (!(message.getMessageId() instanceof BatchMessageIdImpl)){
consumer.acknowledgeAsync(message).get();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import static org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -36,6 +38,7 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.assertj.core.util.Sets;
import org.awaitility.Awaitility;
import org.testng.Assert;
Expand Down Expand Up @@ -211,57 +214,75 @@ public void testBatchMessage() throws Exception {
final String topic = newTopicName();
final String subscription = "my-sub";
final long eventTime= 200;
final int msgNum = 2;

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
// make sure 2 messages in one batch, because if only one message in batch,
// producer will not send batched messages
.batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
.batchingMaxMessages(msgNum)
.batchingMaxBytes(Integer.MAX_VALUE)
.enableBatching(true)
.create();

long sendTime = System.currentTimeMillis();
// send message which is batch message and only contains one message, so do not set the deliverAtTime
MessageIdImpl messageId = (MessageIdImpl) producer.newMessage()
// send message which is batch message, so do not set the deliverAtTime
List<CompletableFuture<MessageId>> messageIdsFuture = new ArrayList<>(msgNum);
for (int i = 0; i < msgNum; ++i) {
CompletableFuture<MessageId> messageId = producer.newMessage()
.eventTime(eventTime)
.value(("hello").getBytes())
.send();
.value(("hello" + i).getBytes())
.sendAsync();
messageIdsFuture.add(messageId);
}
FutureUtil.waitForAll(messageIdsFuture);

// 1. test for peekMessages
admin.topics().createSubscription(topic, subscription, MessageId.earliest);
final List<Message<byte[]>> messages = admin.topics().peekMessages(topic, subscription, 1);
Assert.assertEquals(messages.size(), 1);

MessageImpl message = (MessageImpl) messages.get(0);
Assert.assertEquals(message.getData(), ("hello").getBytes());
Assert.assertTrue(message.getPublishTime() >= sendTime);
BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
Assert.assertEquals(entryMetadata.getIndex(), 0);
System.out.println(message.getProperties());
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
// make sure BATCH_SIZE_HEADER > 0
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
final List<Message<byte[]>> messages = admin.topics().peekMessages(topic, subscription, msgNum);
Assert.assertEquals(messages.size(), msgNum);

MessageImpl message;
BrokerEntryMetadata entryMetadata;
for (int i = 0; i < msgNum; ++i) {
message = (MessageImpl) messages.get(i);
Assert.assertEquals(message.getData(), ("hello" + i).getBytes());
Assert.assertTrue(message.getPublishTime() >= sendTime);
entryMetadata = message.getBrokerEntryMetadata();
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
System.out.println(message.getProperties());
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), msgNum);
// make sure BATCH_SIZE_HEADER > 0
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
}

// getMessagesById and examineMessage only return the first messages in the batch
// 2. test for getMessagesById
MessageIdImpl messageId = (MessageIdImpl) messageIdsFuture.get(0).get();
message = (MessageImpl) admin.topics().getMessageById(topic, messageId.getLedgerId(), messageId.getEntryId());
Assert.assertEquals(message.getData(), ("hello").getBytes());
// getMessagesById return the first message in the batch
Assert.assertEquals(message.getData(), ("hello" + 0).getBytes());
Assert.assertTrue(message.getPublishTime() >= sendTime);
entryMetadata = message.getBrokerEntryMetadata();
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
Assert.assertEquals(entryMetadata.getIndex(), 0);
Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
System.out.println(message.getProperties());
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), msgNum);
// make sure BATCH_SIZE_HEADER > 0
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);

// 3. test for examineMessage
message = (MessageImpl) admin.topics().examineMessage(topic, "earliest", 1);
Assert.assertEquals(message.getData(), ("hello").getBytes());
Assert.assertEquals(message.getData(), ("hello" + 0).getBytes());
Assert.assertTrue(message.getPublishTime() >= sendTime);
entryMetadata = message.getBrokerEntryMetadata();
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
Assert.assertEquals(entryMetadata.getIndex(), 0);
Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
System.out.println(message.getProperties());
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), msgNum);
// make sure BATCH_SIZE_HEADER > 0
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -361,8 +362,10 @@ public void testKeyBasedBatchingOrder() throws Exception {
for (int i = 0; i < 5; i++) {
// Currently sending a duplicated message won't throw an exception. Instead, an invalid result is returned.
final MessageId messageId = producer.newMessage().value("msg").sequenceId(i).send();
assertTrue(messageId instanceof BatchMessageIdImpl);
final BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl) messageId;
// a duplicated message will send in a single batch, that will perform as a non-batched sending
assertTrue(messageId instanceof MessageIdImpl);
assertFalse(messageId instanceof BatchMessageIdImpl);
final MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
assertEquals(messageIdImpl.getLedgerId(), -1L);
assertEquals(messageIdImpl.getEntryId(), -1L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive)
}

@Test(timeOut = 20000)
public void testHasMessageAvailableWithBatch() throws Exception {
public void testHasMessageAvailable() throws Exception {
final String topicName = "persistent://my-property/my-ns/testHasMessageAvailableWithBatch";
final int numOfMessage = 10;

Expand All @@ -1092,11 +1092,11 @@ public void testHasMessageAvailableWithBatch() throws Exception {

//For batch-messages with single message, the type of client messageId should be the same as that of broker
MessageIdImpl messageId = (MessageIdImpl) producer.send("msg".getBytes());
assertTrue(messageId instanceof MessageIdImpl);
assertFalse(messageId instanceof BatchMessageIdImpl);
ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
.startMessageId(messageId).startMessageIdInclusive().create();
MessageIdImpl lastMsgId = (MessageIdImpl) reader.getConsumer().getLastMessageId();
assertTrue(messageId instanceof BatchMessageIdImpl);
assertFalse(lastMsgId instanceof BatchMessageIdImpl);
assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId());
assertEquals(lastMsgId.getEntryId(), messageId.getEntryId());
reader.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.cli;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.mockito.stubbing.Answer;
import org.testng.Assert;

/**
* An implement of {@link PulsarClientTool} for test, which will publish messages iff there is enough messages
* in the batch.
*/
public class PulsarClientToolForceBatchNum extends PulsarClientTool{
private final String topic;
private final int batchNum;

/**
*
* @param properties properties
* @param topic topic
* @param batchNum iff there is batchNum messages in the batch, the producer will flush and send.
*/
public PulsarClientToolForceBatchNum(Properties properties, String topic, int batchNum) {
super(properties);
this.topic = topic;
this.batchNum = batchNum;
}

@Override
protected void initJCommander() {
super.initJCommander();
produceCommand = new CmdProduce() {
@Override
public void updateConfig(ClientBuilder newBuilder, Authentication authentication, String serviceURL) {
try {
super.updateConfig(mockClientBuilder(newBuilder), authentication, serviceURL);
} catch (Exception e) {
Assert.fail("update config fail " + e.getMessage());
}
}
};
jcommander.addCommand("produce", produceCommand);
}

private ClientBuilder mockClientBuilder(ClientBuilder newBuilder) throws Exception {
PulsarClientImpl client = (PulsarClientImpl) newBuilder.build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer()
.batchingMaxBytes(Integer.MAX_VALUE)
.batchingMaxMessages(batchNum)
.batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
.topic(topic);
Producer<byte[]> producer = producerBuilder.create();

PulsarClientImpl mockClient = spy(client);
ProducerBuilder<byte[]> mockProducerBuilder = spy(producerBuilder);
Producer<byte[]> mockProducer = spy(producer);
ClientBuilder mockClientBuilder = spy(newBuilder);

doAnswer((Answer<TypedMessageBuilder>) invocation -> {
TypedMessageBuilder typedMessageBuilder = spy((TypedMessageBuilder) invocation.callRealMethod());
doAnswer((Answer<MessageId>) invocation1 -> {
TypedMessageBuilder mock = ((TypedMessageBuilder) invocation1.getMock());
// using sendAsync() to replace send()
mock.sendAsync();
return null;
}).when(typedMessageBuilder).send();
return typedMessageBuilder;
}).when(mockProducer).newMessage();

doReturn(mockProducer).when(mockProducerBuilder).create();
doReturn(mockProducerBuilder).when(mockClient).newProducer(any(Schema.class));
doReturn(mockClient).when(mockClientBuilder).build();
return mockClientBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,19 @@ public void testDisableBatching() throws Exception {
properties.setProperty("useTls", "false");

final String topicName = getTopicWithRandomSuffix("disable-batching");
final int numberOfMessages = 5;
// `numberOfMessages` should be an even number, because we set `batchNum` as 2, make sure batch and non batch
// messages in the same batch
final int numberOfMessages = 6;
final int batchNum = 2;

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();

PulsarClientTool pulsarClientTool1 = new PulsarClientTool(properties);
PulsarClientTool pulsarClientTool1 = new PulsarClientToolForceBatchNum(properties, topicName, batchNum);
String[] args1 = {"produce", "-m", "batched", "-n", Integer.toString(numberOfMessages), topicName};
Assert.assertEquals(pulsarClientTool1.run(args1), 0);

PulsarClientTool pulsarClientTool2 = new PulsarClientTool(properties);
PulsarClientTool pulsarClientTool2 = new PulsarClientToolForceBatchNum(properties, topicName, batchNum);
String[] args2 = {"produce", "-m", "non-batched", "-n", Integer.toString(numberOfMessages), "-db", topicName};
Assert.assertEquals(pulsarClientTool2.run(args2), 0);

Expand Down
Loading

0 comments on commit b0467f0

Please sign in to comment.