Skip to content

Commit

Permalink
[improve][client-c++]Support include message header size when check m…
Browse files Browse the repository at this point in the history
…axMessageSize (#17289)

### Motivation

See: #17188

### Modifications

Support include message header size when check maxMessageSize for cpp client
  • Loading branch information
coderzc authored Sep 22, 2022
1 parent 63d4cf2 commit f28f985
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 26 deletions.
70 changes: 46 additions & 24 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,37 +417,18 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
callback(result, {});
};

auto& msgMetadata = msg.impl_->metadata;
const bool compressed = !canAddToBatch(msg);
const auto payload =
compressed ? applyCompression(uncompressedPayload, conf_.getCompressionType()) : uncompressedPayload;
const auto compressedSize = static_cast<uint32_t>(payload.readableBytes());
const auto maxMessageSize = static_cast<uint32_t>(ClientConnection::getMaxMessageSize());

if (compressed && compressedSize > ClientConnection::getMaxMessageSize() && !chunkingEnabled_) {
LOG_WARN(getName() << " - compressed Message payload size " << payload.readableBytes()
<< " cannot exceed " << ClientConnection::getMaxMessageSize()
<< " bytes unless chunking is enabled");
handleFailedResult(ResultMessageTooBig);
return;
}

auto& msgMetadata = msg.impl_->metadata;
if (!msgMetadata.has_replicated_from() && msgMetadata.has_producer_name()) {
handleFailedResult(ResultInvalidMessage);
return;
}

const int totalChunks =
canAddToBatch(msg) ? 1 : getNumOfChunks(compressedSize, ClientConnection::getMaxMessageSize());
// Each chunk should be sent individually, so try to acquire extra permits for chunks.
for (int i = 0; i < (totalChunks - 1); i++) {
const auto result = canEnqueueRequest(0); // size is 0 because the memory has already reserved
if (result != ResultOk) {
handleFailedResult(result);
return;
}
}

Lock lock(mutex_);
uint64_t sequenceId;
if (!msgMetadata.has_sequence_id()) {
Expand All @@ -457,6 +438,31 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
}
setMessageMetadata(msg, sequenceId, uncompressedSize);

auto payloadChunkSize = maxMessageSize;
int totalChunks;
if (!compressed || !chunkingEnabled_) {
totalChunks = 1;
} else {
const auto metadataSize = static_cast<uint32_t>(msgMetadata.ByteSizeLong());
if (metadataSize >= maxMessageSize) {
LOG_WARN(getName() << " - metadata size " << metadataSize << " cannot exceed " << maxMessageSize
<< " bytes");
handleFailedResult(ResultMessageTooBig);
return;
}
payloadChunkSize = maxMessageSize - metadataSize;
totalChunks = getNumOfChunks(compressedSize, payloadChunkSize);
}

// Each chunk should be sent individually, so try to acquire extra permits for chunks.
for (int i = 0; i < (totalChunks - 1); i++) {
const auto result = canEnqueueRequest(0); // size is 0 because the memory has already reserved
if (result != ResultOk) {
handleFailedResult(result);
return;
}
}

if (canAddToBatch(msg)) {
// Batching is enabled and the message is not delayed
if (!batchMessageContainer_->hasEnoughSpace(msg)) {
Expand Down Expand Up @@ -508,7 +514,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
if (sendChunks) {
msgMetadata.set_chunk_id(chunkId);
}
const uint32_t endIndex = std::min(compressedSize, beginIndex + maxMessageSize);
const uint32_t endIndex = std::min(compressedSize, beginIndex + payloadChunkSize);
auto chunkedPayload = payload.slice(beginIndex, endIndex - beginIndex);
beginIndex = endIndex;

Expand All @@ -517,10 +523,26 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
handleFailedResult(ResultCryptoError);
return;
}
OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr,
producerId_, sequenceId, conf_.getSendTimeout(),
1, uncompressedSize};

if (!chunkingEnabled_) {
const uint32_t msgMetadataSize = op.metadata_.ByteSize();
const uint32_t payloadSize = op.payload_.readableBytes();
const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize;
if (msgHeadersAndPayloadSize > maxMessageSize) {
lock.unlock();
releaseSemaphoreForSendOp(op);
LOG_WARN(getName()
<< " - compressed Message size " << msgHeadersAndPayloadSize << " cannot exceed "
<< maxMessageSize << " bytes unless chunking is enabled");
handleFailedResult(ResultMessageTooBig);
return;
}
}

sendMessage(OpSendMsg{msgMetadata, encryptedPayload,
(chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId,
conf_.getSendTimeout(), 1, uncompressedSize});
sendMessage(op);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -593,8 +593,8 @@ TEST(BasicEndToEndTest, testMessageTooBig) {
result = producer.send(msg);
ASSERT_EQ(ResultMessageTooBig, result);

// Anything up to MaxMessageSize should be allowed
size = ClientConnection::getMaxMessageSize();
// Anything up to MaxMessageSize - MetadataSize should be allowed
size = ClientConnection::getMaxMessageSize() - 32; /*the default message metadata size for string schema*/
msg = MessageBuilder().setAllocatedContent(content, size).build();
result = producer.send(msg);
ASSERT_EQ(ResultOk, result);
Expand Down
62 changes: 62 additions & 0 deletions pulsar-client-cpp/tests/ProducerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ using namespace pulsar;
static const std::string serviceUrl = "pulsar://localhost:6650";
static const std::string adminUrl = "http://localhost:8080/";

// See the `maxMessageSize` config in test-conf/standalone-ssl.conf
static constexpr size_t maxMessageSize = 1024000;

TEST(ProducerTest, producerNotInitialized) {
Producer producer;

Expand Down Expand Up @@ -211,6 +214,63 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
client.close();
}

class ProducerTest : public ::testing::TestWithParam<bool> {};

TEST_P(ProducerTest, testMaxMessageSize) {
Client client(serviceUrl);

const std::string topic = "ProducerTest-NoBatchMaxMessageSize-" + std::to_string(time(nullptr));

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));

Producer producer;
ProducerConfiguration conf;
conf.setBatchingEnabled(GetParam());
ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));

std::string msg = std::string(maxMessageSize / 2, 'a');
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build()));
Message message;
ASSERT_EQ(ResultOk, consumer.receive(message));
ASSERT_EQ(msg, message.getDataAsString());

std::string orderKey = std::string(maxMessageSize, 'a');
ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build()));

ASSERT_EQ(ResultMessageTooBig,
producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build()));

client.close();
}

TEST_P(ProducerTest, testChunkingMaxMessageSize) {
Client client(serviceUrl);

const std::string topic = "ProducerTest-ChunkingMaxMessageSize-" + std::to_string(time(nullptr));

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));

Producer producer;
ProducerConfiguration conf;
conf.setBatchingEnabled(false);
conf.setChunkingEnabled(true);
ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));

std::string orderKey = std::string(maxMessageSize, 'a');
ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build()));

std::string msg = std::string(2 * maxMessageSize + 10, 'b');
Message message;
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build()));
ASSERT_EQ(ResultOk, consumer.receive(message));
ASSERT_EQ(msg, message.getDataAsString());
ASSERT_LE(1L, message.getMessageId().entryId());

client.close();
}

TEST(ProducerTest, testExclusiveProducer) {
Client client(serviceUrl);

Expand All @@ -234,3 +294,5 @@ TEST(ProducerTest, testExclusiveProducer) {
producerConfiguration3.setProducerName("p-name-3");
ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, producerConfiguration3, producer3));
}

INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

0 comments on commit f28f985

Please sign in to comment.