Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker][PIP-318] Support not retaining null-key message during topic compaction #21578

Merged
merged 3 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,9 @@ brokerServiceCompactionThresholdInBytes=0
# If the execution time of the compaction phase one loop exceeds this time, the compaction will not proceed.
brokerServiceCompactionPhaseOneLoopTimeInSeconds=30

# Whether retain null-key message during topic compaction
topicCompactionRemainNullKey=false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
topicCompactionRemainNullKey=false
topicCompactionRetainNullKey=false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderzc I just saw this typo in the config key :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I open a new PR to fix it. #21690


# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
Expand Down
5 changes: 4 additions & 1 deletion conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1285,4 +1285,7 @@ brokerInterceptorsDirectory=./interceptors
brokerInterceptors=

# Enable or disable the broker interceptor, which is only used for testing for now
disableBrokerInterceptors=true
disableBrokerInterceptors=true

# Whether retain null-key message during topic compaction
topicCompactionRemainNullKey=false
Original file line number Diff line number Diff line change
Expand Up @@ -2772,6 +2772,12 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
)
private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Whether retain null-key message during topic compaction."
)
private boolean topicCompactionRemainNullKey = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Interval between checks to see if cluster is migrated and marks topic migrated "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,31 @@ public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKey
msg.getMessageIdData().getEntryId(),
msg.getMessageIdData().getPartition(),
i);
if (!smm.isCompactedOut() && smm.hasPartitionKey()) {
if (!smm.isCompactedOut()) {
idsAndKeysAndSize.add(ImmutableTriple.of(id,
smm.getPartitionKey(),
smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
smm.hasPartitionKey() ? smm.getPartitionKey() : null,
smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
}
singleMessagePayload.release();
}
uncompressedPayload.release();
return idsAndKeysAndSize;
}

public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter) throws IOException {
return rebatchMessage(msg, filter, true);
}

/**
* Take a batched message and a filter, and returns a message with the only the sub-messages
* which match the filter. Returns an empty optional if no messages match.
*
* NOTE: this message does not alter the reference count of the RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter)
BiPredicate<String, MessageId> filter,
boolean retainNullKey)
throws IOException {
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);

Expand Down Expand Up @@ -129,9 +135,14 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
msg.getMessageIdData().getPartition(),
i);
if (!singleMessageMetadata.hasPartitionKey()) {
messagesRetained++;
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
singleMessagePayload, batchBuffer);
if (retainNullKey) {
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
messagesRetained++;
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
singleMessagePayload, batchBuffer);
} else {
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
Unpooled.EMPTY_BUFFER, batchBuffer);
}
} else if (filter.test(singleMessageMetadata.getPartitionKey(), id)
&& singleMessagePayload.readableBytes() > 0) {
messagesRetained++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ public class TwoPhaseCompactor extends Compactor {
private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
private final Duration phaseOneLoopReadTimeout;
private final boolean topicCompactionRemainNullKey;

public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey();
}

@Override
Expand Down Expand Up @@ -134,6 +136,14 @@ private void phaseOneLoop(RawReader reader,
int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
if (e != null) {
if (e.getMiddle() == null) {
if (!topicCompactionRemainNullKey) {
// record delete null-key message event
deleteCnt++;
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
continue;
}
if (e.getRight() > 0) {
MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
if (old != null) {
Expand Down Expand Up @@ -163,6 +173,10 @@ private void phaseOneLoop(RawReader reader,
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
}
} else {
if (!topicCompactionRemainNullKey) {
deletedMessage = true;
}
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
Expand Down Expand Up @@ -249,8 +263,8 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
try {
messageToAdd = rebatchMessage(
m, (key, subid) -> subid.equals(latestForKey.get(key)));
messageToAdd = rebatchMessage(reader.getTopic(),
m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey);
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
Expand All @@ -259,8 +273,8 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
} else {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
if (keyAndSize == null) { // pass through messages without a key
messageToAdd = Optional.of(m);
if (keyAndSize == null) {
messageToAdd = topicCompactionRemainNullKey ? Optional.of(m) : Optional.empty();
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
&& msg.equals(id)) { // consider message only if present into latestForKey map
if (keyAndSize.getRight() <= 0) {
Expand Down Expand Up @@ -419,9 +433,13 @@ protected List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAnd
return RawBatchConverter.extractIdsAndKeysAndSize(msg);
}

protected Optional<RawMessage> rebatchMessage(RawMessage msg, BiPredicate<String, MessageId> filter)
protected Optional<RawMessage> rebatchMessage(String topic, RawMessage msg, BiPredicate<String, MessageId> filter,
boolean retainNullKey)
throws IOException {
return RawBatchConverter.rebatchMessage(msg, filter);
if (log.isDebugEnabled()) {
log.debug("Rebatching message {} for topic {}", msg.getMessageId(), topic);
}
return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
}

private static class PhaseOneResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1885,7 +1885,7 @@ public void testDispatcherMaxReadSizeBytes() throws Exception {
.topic(topicName).create();

for (int i = 0; i < 10; i+=2) {
producer.newMessage().key(null).value(new byte[4*1024*1024]).send();
producer.newMessage().key(UUID.randomUUID().toString()).value(new byte[4*1024*1024]).send();
}
producer.flush();

Expand Down Expand Up @@ -1984,4 +1984,60 @@ public void testCompactionDuplicate() throws Exception {
}
}
}

@DataProvider(name = "retainNullKey")
public static Object[][] retainNullKey() {
return new Object[][] {{true}, {false}};
}

@Test(dataProvider = "retainNullKey")
public void testCompactionNullKeyRetain(boolean retainNullKey) throws Exception {
conf.setTopicCompactionRemainNullKey(retainNullKey);
restartBroker();

final String topicName = "persistent://my-property/use/my-ns/testCompactionNullKeyRetain" + UUID.randomUUID();
final String subName = "my-sub";
@Cleanup
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName).create();

producer.newMessage().key(null).value("V1").send();
producer.newMessage().key("K1").value("V2").send();
producer.newMessage().key("K2").value("V3").send();
producer.newMessage().key(null).value("V4").send();
producer.newMessage().key("K1").value("V5").send();
producer.newMessage().key("K2").value("V6").send();
producer.newMessage().key(null).value("V7").send();
producer.flush();

admin.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
.topic(topicName).readCompacted(true).subscriptionName(subName)
.subscribe();

List<String> result = new ArrayList<>();
while (true) {
Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
}
result.add(message.getValue());
}

if (!retainNullKey) {
Assert.assertEquals(result, List.of("V5", "V6"));
} else {
Assert.assertEquals(result, List.of("V1", "V4", "V5", "V6", "V7"));
}

consumer.close();
producer.close();
}
}