Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-132] Include message header size when check maxMessageSize for non-batch message on the client side. #14007

Merged
merged 6 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;

import java.util.Optional;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -32,6 +31,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
Expand All @@ -41,6 +41,7 @@
import org.testng.annotations.Test;

@Test(groups = "broker")
@Slf4j
public class MaxMessageSizeTest {

PulsarService pulsar;
Expand All @@ -55,7 +56,7 @@ void setup() {
try {
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
ServerConfiguration conf = new ServerConfiguration();
conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024);
conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024 + 10 * 1024);
bkEnsemble.startStandalone(conf, false);

configuration = new ServiceConfiguration();
Expand All @@ -78,7 +79,8 @@ void setup() {
admin = PulsarAdmin.builder().serviceHttpUrl(url).build();
admin.clusters().createCluster("max_message_test", ClusterData.builder().serviceUrl(url).build());
admin.tenants()
.createTenant("test", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("max_message_test")));
.createTenant("test",
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("max_message_test")));
admin.namespaces().createNamespace("test/message", Sets.newHashSet("max_message_test"));
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -101,8 +103,8 @@ public void testMaxMessageSetting() throws PulsarClientException {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
String topicName = "persistent://test/message/topic1";
Producer producer = client.newProducer().topic(topicName).sendTimeout(60, TimeUnit.SECONDS).create();
Consumer consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();
Producer<byte[]> producer = client.newProducer().topic(topicName).sendTimeout(60, TimeUnit.SECONDS).create();
Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();

// less than 5MB message

Expand Down Expand Up @@ -139,6 +141,14 @@ public void testMaxMessageSetting() throws PulsarClientException {
byte[] consumerNewNormalMsg = consumer.receive().getData();
Assert.assertEquals(newNormalMsg, consumerNewNormalMsg);

// 2MB metadata and 8 MB payload
try {
producer.newMessage().keyBytes(new byte[2 * 1024 * 1024]).value(newNormalMsg).send();
Assert.fail("Shouldn't send out this message");
} catch (PulsarClientException e) {
//no-op
}

// equals 10MB message
byte[] newLimitMsg = new byte[10 * 1024 * 1024];
try {
Expand All @@ -151,6 +161,87 @@ public void testMaxMessageSetting() throws PulsarClientException {
consumer.unsubscribe();
consumer.close();
producer.close();
}

@Test
public void testNonBatchingMaxMessageSize() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
String topicName = "persistent://test/message/testNonBatchingMaxMessageSize";
@Cleanup
Producer<byte[]> producer = client.newProducer()
.topic(topicName)
.enableBatching(false)
.sendTimeout(30, TimeUnit.SECONDS).create();
@Cleanup
Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();

byte[] data = new byte[8 * 1024 * 1024];
try {
producer.newMessage().value(data).send();
} catch (PulsarClientException e) {
Assert.fail("Shouldn't have exception at here", e);
}
Assert.assertEquals(consumer.receive().getData(), data);

// 1MB metadata and 8 MB payload
try {
producer.newMessage().property("P", new String(new byte[1024 * 1024])).value(data).send();
} catch (PulsarClientException e) {
Assert.fail("Shouldn't have exception at here", e);
}
Assert.assertEquals(consumer.receive().getData(), data);

// 2MB metadata and 8 MB payload, should fail.
try {
producer.newMessage().property("P", new String(new byte[2 * 1024 * 1024])).value(data).send();
Assert.fail("Shouldn't send out this message");
} catch (PulsarClientException e) {
//no-op
}
}

@Test
public void testChunkingMaxMessageSize() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
String topicName = "persistent://test/message/testChunkingMaxMessageSize";
@Cleanup
Producer<byte[]> producer = client.newProducer()
.topic(topicName)
.enableBatching(false)
.enableChunking(true)
.sendTimeout(30, TimeUnit.SECONDS).create();
@Cleanup
Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();

// 12 MB metadata, should fail
try {
producer.newMessage().orderingKey(new byte[12 * 1024 * 1024]).send();
Assert.fail("Shouldn't send out this message");
} catch (PulsarClientException e) {
//no-op
}

// 12 MB payload, there should be 2 chunks
byte[] data = new byte[12 * 1024 * 1024];
try {
producer.newMessage().value(data).send();
} catch (PulsarClientException e) {
Assert.fail("Shouldn't have exception at here", e);
}
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) consumer.receive();
Assert.assertEquals(msg.getData(), data);
Assert.assertEquals(msg.getMessageBuilder().getNumChunksFromMsg(), 2);

// 5MB metadata and 12 MB payload, there should be 3 chunks
try {
producer.newMessage().property("P", new String(new byte[5 * 1024 * 1024])).value(data).send();
} catch (PulsarClientException e) {
Assert.fail("Shouldn't have exception at here", e);
}
msg = (MessageImpl<byte[]>) consumer.receive();
Assert.assertEquals(msg.getData(), data);
Assert.assertEquals(msg.getMessageBuilder().getNumChunksFromMsg(), 3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testInvalidConfig() throws Exception {
public void testLargeMessage(boolean ackReceiptEnabled) throws Exception {

log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
this.conf.setMaxMessageSize(50);
final int totalMessages = 5;
final String topicName = "persistent://my-property/my-ns/my-topic1";

Expand All @@ -125,7 +125,7 @@ public void testLargeMessage(boolean ackReceiptEnabled) throws Exception {

List<String> publishedMessages = Lists.newArrayList();
for (int i = 0; i < totalMessages; i++) {
String message = createMessagePayload(i * 10);
String message = createMessagePayload(i * 100);
publishedMessages.add(message);
producer.send(message.getBytes());
}
Expand Down Expand Up @@ -171,7 +171,7 @@ public void testLargeMessage(boolean ackReceiptEnabled) throws Exception {

@Test
public void testChunkingWithOrderingKey() throws Exception {
this.conf.setMaxMessageSize(5);
this.conf.setMaxMessageSize(100);

final String topicName = "persistent://my-property/my-ns/testChunkingWithOrderingKey";

Expand All @@ -183,8 +183,8 @@ public void testChunkingWithOrderingKey() throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableChunking(true)
.enableBatching(false).create();

byte[] data = RandomUtils.nextBytes(20);
byte[] ok = RandomUtils.nextBytes(10);
byte[] data = RandomUtils.nextBytes(200);
byte[] ok = RandomUtils.nextBytes(50);
producer.newMessage().value(data).orderingKey(ok).send();

Message<byte[]> msg = consumer.receive();
Expand All @@ -196,7 +196,7 @@ public void testChunkingWithOrderingKey() throws Exception {
public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Exception {

log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
this.conf.setMaxMessageSize(50);
final int totalMessages = 5;
final String topicName = "persistent://my-property/my-ns/my-topic1";

Expand All @@ -213,7 +213,7 @@ public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Excepti

List<String> publishedMessages = Lists.newArrayList();
for (int i = 0; i < totalMessages; i++) {
String message = createMessagePayload(i * 10);
String message = createMessagePayload(i * 100);
publishedMessages.add(message);
producer.send(message.getBytes());
}
Expand Down Expand Up @@ -263,7 +263,7 @@ public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Excepti
@Test
public void testPublishWithFailure() throws Exception {
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
this.conf.setMaxMessageSize(50);
final String topicName = "persistent://my-property/my-ns/my-topic1";

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
Expand All @@ -274,7 +274,7 @@ public void testPublishWithFailure() throws Exception {
stopBroker();

try {
producer.send(createMessagePayload(100).getBytes());
producer.send(createMessagePayload(1000).getBytes());
fail("should have failed with timeout exception");
} catch (PulsarClientException.TimeoutException e) {
// Ok
Expand Down Expand Up @@ -403,7 +403,7 @@ public void testExpireIncompleteChunkMessage() throws Exception{
public void testChunksEnqueueFailed() throws Exception {
final String topicName = "persistent://my-property/my-ns/test-chunks-enqueue-failed";
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
this.conf.setMaxMessageSize(50);

final MemoryLimitController controller = ((PulsarClientImpl) pulsarClient).getMemoryLimitController();
assertEquals(controller.currentUsage(), 0);
Expand All @@ -423,7 +423,7 @@ public void testChunksEnqueueFailed() throws Exception {
assertEquals(semaphore.availablePermits(), maxPendingMessages);
producer.send(createMessagePayload(1).getBytes());
try {
producer.send(createMessagePayload(100).getBytes(StandardCharsets.UTF_8));
producer.send(createMessagePayload(1000).getBytes(StandardCharsets.UTF_8));
fail("It should fail with ProducerQueueIsFullError");
} catch (PulsarClientException e) {
assertTrue(e instanceof PulsarClientException.ProducerQueueIsFullError);
Expand All @@ -440,7 +440,7 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
@Test
public void testSeekChunkMessages() throws PulsarClientException {
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
this.conf.setMaxMessageSize(50);
final int totalMessages = 5;
final String topicName = "persistent://my-property/my-ns/test-seek-chunk";

Expand All @@ -463,7 +463,7 @@ public void testSeekChunkMessages() throws PulsarClientException {
.subscribe();

for (int i = 0; i < totalMessages; i++) {
String message = createMessagePayload(10);
String message = createMessagePayload(100);
producer.send(message.getBytes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.MathUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
Expand Down Expand Up @@ -470,9 +471,30 @@ public void sendAsync(Message<?> message, SendCallback callback) {
}

// send in chunks
int totalChunks = canAddToBatch(msg) ? 1
: Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()
+ (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
int totalChunks;
int payloadChunkSize;
if (canAddToBatch(msg) || !conf.isChunkingEnabled()) {
totalChunks = 1;
payloadChunkSize = ClientCnx.getMaxMessageSize();
} else {
// Reserve current metadata size for chunk size to avoid message size overflow.
// NOTE: this is not strictly bounded, as metadata will be updated after chunking.
// So there is a small chance that the final message size is larger than ClientCnx.getMaxMessageSize().
// But it won't cause produce failure as broker have 10 KB padding space for these cases.
payloadChunkSize = ClientCnx.getMaxMessageSize() - msgMetadata.getSerializedSize();
if (payloadChunkSize <= 0) {
PulsarClientException.InvalidMessageException invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a message with %d bytes metadata that "
+ "exceeds %d bytes", producerName, topic,
msgMetadata.getSerializedSize(), ClientCnx.getMaxMessageSize()));
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
compressedPayload.release();
return;
}
totalChunks = MathUtils.ceilDiv(Math.max(1, compressedPayload.readableBytes()), payloadChunkSize);
}

// chunked message also sent individually so, try to acquire send-permits
for (int i = 0; i < (totalChunks - 1); i++) {
if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
Expand Down Expand Up @@ -512,9 +534,9 @@ public void sendAsync(Message<?> message, SendCallback callback) {
}
}
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
readStartIndex, payloadChunkSize, compressedPayload, compressed,
compressedPayload.readableBytes(), uncompressedSize, callback, chunkedMessageCtx);
readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());
readStartIndex = ((chunkId + 1) * payloadChunkSize);
}
}
} catch (PulsarClientException e) {
Expand Down Expand Up @@ -1400,6 +1422,19 @@ void setMessageId(ChunkMessageIdImpl chunkMessageId) {
}
}

public int getMessageHeaderAndPayloadSize() {
if (cmd == null) {
return 0;
}
ByteBuf cmdHeader = cmd.getFirst();
cmdHeader.markReaderIndex();
int totalSize = cmdHeader.readInt();
int cmdSize = cmdHeader.readInt();
int msgHeadersAndPayloadSize = totalSize - cmdSize - 4;
cmdHeader.resetReaderIndex();
return msgHeadersAndPayloadSize;
}

private OpSendMsg(Handle<OpSendMsg> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
Expand Down Expand Up @@ -1982,6 +2017,9 @@ protected void processOpSendMsg(OpSendMsg op) {
if (op.msg != null && isBatchMessagingEnabled()) {
batchMessageAndSend();
}
if (isMessageSizeExceeded(op)) {
return;
}
pendingMessages.add(op);
if (op.msg != null) {
LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this,
Expand Down Expand Up @@ -2049,6 +2087,9 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
if (op.cmd == null) {
checkState(op.rePopulate != null);
op.rePopulate.run();
if (isMessageSizeExceeded(op)) {
continue;
}
}
if (stripChecksum) {
stripChecksum(op);
Expand All @@ -2074,6 +2115,24 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
}
}

/**
* Check if final message size for non-batch and non-chunked messages is larger than max message size.
*/
public boolean isMessageSizeExceeded(OpSendMsg op) {
if (op.msg != null && op.totalChunks <= 1) {
int messageSize = op.getMessageHeaderAndPayloadSize();
if (messageSize > ClientCnx.getMaxMessageSize()) {
releaseSemaphoreForSendOp(op);
op.sendComplete(new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a message with %d bytes that exceeds %d bytes",
producerName, topic, messageSize, ClientCnx.getMaxMessageSize()),
op.sequenceId));
return true;
}
}
return false;
}

public long getDelayInMillis() {
OpSendMsg firstMsg = pendingMessages.peek();
if (firstMsg != null) {
Expand Down
Loading