Skip to content

Commit

Permalink
[improve][broker] Support not retaining null-key message during topic…
Browse files Browse the repository at this point in the history
… compaction (apache#21578)
  • Loading branch information
coderzc committed Dec 4, 2023
1 parent 40fa9ab commit aa321ad
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 34 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ brokerServiceCompactionThresholdInBytes=0
# If the execution time of the compaction phase one loop exceeds this time, the compaction will not proceed.
brokerServiceCompactionPhaseOneLoopTimeInSeconds=30

# Whether retain null-key message during topic compaction
topicCompactionRemainNullKey=true

# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1266,3 +1266,6 @@ delayedDeliveryMaxIndexesPerBucketSnapshotSegment=5000
# after reaching the max buckets limitation, the adjacent buckets will be merged.
# (disable with value -1)
delayedDeliveryMaxNumBuckets=-1

# Whether retain null-key message during topic compaction
topicCompactionRemainNullKey=true
Original file line number Diff line number Diff line change
Expand Up @@ -2744,6 +2744,12 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
)
private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Whether retain null-key message during topic compaction."
)
private boolean topicCompactionRemainNullKey = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Interval between checks to see if cluster is migrated and marks topic migrated "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,20 @@ public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKey
return idsAndKeysAndSize;
}

public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter) throws IOException {
return rebatchMessage(msg, filter, true);
}

/**
* Take a batched message and a filter, and returns a message with the only the sub-messages
* which match the filter. Returns an empty optional if no messages match.
*
* NOTE: this message does not alter the reference count of the RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter)
BiPredicate<String, MessageId> filter,
boolean retainNullKey)
throws IOException {
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);

Expand Down Expand Up @@ -125,9 +131,14 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
msg.getMessageIdData().getPartition(),
i);
if (!singleMessageMetadata.hasPartitionKey()) {
messagesRetained++;
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
singleMessagePayload, batchBuffer);
if (retainNullKey) {
messagesRetained++;
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
singleMessagePayload, batchBuffer);
} else {
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
Unpooled.EMPTY_BUFFER, batchBuffer);
}
} else if (filter.test(singleMessageMetadata.getPartitionKey(), id)
&& singleMessagePayload.readableBytes() > 0) {
messagesRetained++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ public class TwoPhaseCompactor extends Compactor {
private static final int MAX_OUTSTANDING = 500;
protected static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
private final Duration phaseOneLoopReadTimeout;
private final boolean topicCompactionRemainNullKey;

public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey();
}

@Override
Expand Down Expand Up @@ -132,8 +134,16 @@ private void phaseOneLoop(RawReader reader,
int numMessagesInBatch = metadata.getNumMessagesInBatch();
int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e : RawBatchConverter
.extractIdsAndKeysAndSize(m, false)) {
.extractIdsAndKeysAndSize(m, true)) {
if (e != null) {
if (e.getMiddle() == null) {
if (!topicCompactionRemainNullKey) {
// record delete null-key message event
deleteCnt++;
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
continue;
}
if (e.getRight() > 0) {
MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
if (old != null) {
Expand Down Expand Up @@ -163,6 +173,10 @@ private void phaseOneLoop(RawReader reader,
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
}
} else {
if (!topicCompactionRemainNullKey) {
deletedMessage = true;
}
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
Expand Down Expand Up @@ -241,7 +255,6 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
}

if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) {
m.close();
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, lastCompactedMessageId);
return;
}
Expand All @@ -253,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
if (RawBatchConverter.isReadableBatch(m)) {
try {
messageToAdd = RawBatchConverter.rebatchMessage(
m, (key, subid) -> subid.equals(latestForKey.get(key)));
m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey);
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
Expand All @@ -262,8 +275,8 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
} else {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
if (keyAndSize == null) { // pass through messages without a key
messageToAdd = Optional.of(m);
if (keyAndSize == null) {
messageToAdd = topicCompactionRemainNullKey ? Optional.of(m) : Optional.empty();
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
&& msg.equals(id)) { // consider message only if present into latestForKey map
if (keyAndSize.getRight() <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -643,8 +644,17 @@ public void testWholeBatchCompactedOut() throws Exception {
}
}

@Test
public void testKeyLessMessagesPassThrough() throws Exception {
@DataProvider(name = "retainNullKey")
public static Object[][] retainNullKey() {
return new Object[][] {{true}, {false}};
}

@Test(dataProvider = "retainNullKey")
public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws Exception {
conf.setTopicCompactionRemainNullKey(retainNullKey);
restartBroker();
FieldUtils.writeDeclaredField(compactor, "topicCompactionRemainNullKey", retainNullKey, true);

String topic = "persistent://my-property/use/my-ns/my-topic1";

// subscribe before sending anything, so that we get all messages
Expand Down Expand Up @@ -685,29 +695,25 @@ public void testKeyLessMessagesPassThrough() throws Exception {
Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
assertNull(m);
} else {
Message<byte[]> message1 = consumer.receive();
Assert.assertFalse(message1.hasKey());
Assert.assertEquals(new String(message1.getData()), "my-message-1");

Message<byte[]> message2 = consumer.receive();
Assert.assertFalse(message2.hasKey());
Assert.assertEquals(new String(message2.getData()), "my-message-2");

Message<byte[]> message3 = consumer.receive();
Assert.assertEquals(message3.getKey(), "key1");
Assert.assertEquals(new String(message3.getData()), "my-message-4");

Message<byte[]> message4 = consumer.receive();
Assert.assertEquals(message4.getKey(), "key2");
Assert.assertEquals(new String(message4.getData()), "my-message-6");

Message<byte[]> message5 = consumer.receive();
Assert.assertFalse(message5.hasKey());
Assert.assertEquals(new String(message5.getData()), "my-message-7");
List<Pair<String, String>> result = new ArrayList<>();
while (true) {
Message<byte[]> message = consumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
}
result.add(Pair.of(message.getKey(), message.getData() == null ? null : new String(message.getData())));
}

Message<byte[]> message6 = consumer.receive();
Assert.assertFalse(message6.hasKey());
Assert.assertEquals(new String(message6.getData()), "my-message-8");
List<Pair<String, String>> expectList;
if (retainNullKey) {
expectList = List.of(
Pair.of(null, "my-message-1"), Pair.of(null, "my-message-2"),
Pair.of("key1", "my-message-4"), Pair.of("key2", "my-message-6"),
Pair.of(null, "my-message-7"), Pair.of(null, "my-message-8"));
} else {
expectList = List.of(Pair.of("key1", "my-message-4"), Pair.of("key2", "my-message-6"));
}
Assert.assertEquals(result, expectList);
}
}
}
Expand Down Expand Up @@ -1888,7 +1894,7 @@ public void testDispatcherMaxReadSizeBytes() throws Exception {
.topic(topicName).create();

for (int i = 0; i < 10; i+=2) {
producer.newMessage().key(null).value(new byte[4*1024*1024]).send();
producer.newMessage().key(UUID.randomUUID().toString()).value(new byte[4*1024*1024]).send();
}
producer.flush();

Expand Down

0 comments on commit aa321ad

Please sign in to comment.