Skip to content

Commit

Permalink
fix testBlockIfQueueFullWhenChunking
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 committed Nov 22, 2022
1 parent f84c69e commit 10cbc40
Showing 1 changed file with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,8 @@ public void sendAsync(Message<?> message, SendCallback callback) {
? msg.getMessageBuilder().getSchemaVersion() : null;
byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
? msg.getMessageBuilder().getOrderingKey() : null;
// msg.messageId will be reset if previous message chunk is sent successfully.
final MessageId messageId = msg.getMessageId();
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
// Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
// `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
Expand All @@ -555,7 +557,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
synchronized (this) {
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
readStartIndex, payloadChunkSize, compressedPayload, compressed,
compressedPayload.readableBytes(), callback, chunkedMessageCtx);
compressedPayload.readableBytes(), callback, chunkedMessageCtx, messageId);
readStartIndex = ((chunkId + 1) * payloadChunkSize);
}
}
Expand Down Expand Up @@ -617,7 +619,8 @@ private void serializeAndSendMessage(MessageImpl<?> msg,
boolean compressed,
int compressedPayloadSize,
SendCallback callback,
ChunkedMessageCtx chunkedMessageCtx) throws IOException {
ChunkedMessageCtx chunkedMessageCtx,
MessageId messageId) throws IOException {
ByteBuf chunkPayload = compressedPayload;
MessageMetadata msgMetadata = msg.getMessageBuilder();
if (totalChunks > 1 && TopicName.get(topic).isPersistent()) {
Expand Down Expand Up @@ -686,14 +689,14 @@ private void serializeAndSendMessage(MessageImpl<?> msg,
: 1;
final OpSendMsg op;
if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) {
ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msg.getMessageId(), msgMetadata,
ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, messageId, msgMetadata,
encryptedPayload);
op = OpSendMsg.create(msg, cmd, sequenceId, callback);
} else {
op = OpSendMsg.create(msg, null, sequenceId, callback);
final MessageMetadata finalMsgMetadata = msgMetadata;
op.rePopulate = () -> {
op.cmd = sendMessage(producerId, sequenceId, numMessages, msg.getMessageId(), finalMsgMetadata,
op.cmd = sendMessage(producerId, sequenceId, numMessages, messageId, finalMsgMetadata,
encryptedPayload);
};
}
Expand Down Expand Up @@ -1505,9 +1508,7 @@ void setBatchSizeByte(long batchSizeByte) {

void setMessageId(long ledgerId, long entryId, int partitionIndex) {
if (msg != null) {
if (msg.getMessageId() == null) {
msg.setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex));
}
msg.setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex));
} else if (msgs.size() == 1) {
// If there is only one message in batch, the producer will publish messages like non-batch
msgs.get(0).setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex));
Expand Down

0 comments on commit 10cbc40

Please sign in to comment.