Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client] PIP-393: Improve performance of Negative Acknowledgement #23600

Merged
merged 6 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.util.HashSet;
Expand Down Expand Up @@ -311,19 +312,64 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
// negative topic message id
consumer.negativeAcknowledge(topicMessageId);
NegativeAcksTracker negativeAcksTracker = consumer.getNegativeAcksTracker();
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L);
assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
// negative batch message id
unAckedMessageTracker.add(messageId);
consumer.negativeAcknowledge(batchMessageId);
consumer.negativeAcknowledge(batchMessageId2);
consumer.negativeAcknowledge(batchMessageId3);
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L);
assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
}

/**
* If we nack multiple messages in the same batch with different redelivery delays, the messages should be redelivered
* with the correct delay. However, all messages are redelivered at the same time.
* @throws Exception
*/
@Test
public void testNegativeAcksWithBatch() throws Exception {
cleanup();
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setup();
String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithBatch");

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(3, TimeUnit.SECONDS)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.batchingMaxMessages(2)
.create();
// send two messages in the same batch
producer.sendAsync("test-0");
producer.sendAsync("test-1");
producer.flush();

// negative ack the first message
consumer.negativeAcknowledge(consumer.receive());
// wait for 2s, negative ack the second message
Thread.sleep(2000);
consumer.negativeAcknowledge(consumer.receive());

// now 2s has passed, the first message should be redelivered 1s later.
Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
assertNotNull(msg1);
}

@Test
public void testNegativeAcksWithBatchAckEnabled() throws Exception {
cleanup();
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@
<include>org.reactivestreams:reactive-streams</include>
<include>org.tukaani:xz</include>
<include>org.yaml:snakeyaml</include>
<include>it.unimi.dsi:fastutil</include>
</includes>
<excludes>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
<include>org.reactivestreams:reactive-streams</include>
<include>org.tukaani:xz</include>
<include>org.yaml:snakeyaml</include>
<include>it.unimi.dsi:fastutil</include>
</includes>
<excludes>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
Expand Down
10 changes: 10 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,16 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
thetumbled marked this conversation as resolved.
Show resolved Hide resolved

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2745,7 +2745,7 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
int messagesFromQueue = 0;
Message<T> peek = incomingMessages.peek();
if (peek != null) {
MessageIdAdv messageId = MessageIdAdvUtils.discardBatch(peek.getMessageId());
MessageId messageId = NegativeAcksTracker.discardBatchAndPartitionIndex(peek.getMessageId());
if (!messageIds.contains(messageId)) {
// first message is not expired, then no message is expired in queue.
return 0;
Expand All @@ -2756,7 +2756,7 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
while (message != null) {
decreaseIncomingMessageSize(message);
messagesFromQueue++;
MessageIdAdv id = MessageIdAdvUtils.discardBatch(message.getMessageId());
MessageId id = NegativeAcksTracker.discardBatchAndPartitionIndex(message.getMessageId());
if (!messageIds.contains(id)) {
messageIds.add(id);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,50 +22,51 @@
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator;
import java.io.Closeable;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NegativeAcksTracker implements Closeable {
private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class);

private ConcurrentLongLongPairHashMap nackedMessages = null;
// timestamp -> ledgerId -> entryId, no need to batch index, if different messages have
// different timestamp, there will be multiple entries in the map
// RB Tree -> LongOpenHashMap -> Roaring64Bitmap
private Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = null;

private final ConsumerBase<?> consumer;
private final Timer timer;
private final long nackDelayNanos;
private final long timerIntervalNanos;
private final long nackDelayMs;
private final RedeliveryBackoff negativeAckRedeliveryBackoff;
private final int negativeAckPrecisionBitCnt;

private Timeout timeout;

// Set a min delay to allow for grouping nacks within a single batch
private static final long MIN_NACK_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = Long.MAX_VALUE;
private static final long MIN_NACK_DELAY_MS = 100;
private static final int DUMMY_PARTITION_INDEX = -2;

public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?> conf) {
this.consumer = consumer;
this.timer = consumer.getClient().timer();
this.nackDelayNanos = Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()),
MIN_NACK_DELAY_NANOS);
this.nackDelayMs = Math.max(TimeUnit.MICROSECONDS.toMillis(conf.getNegativeAckRedeliveryDelayMicros()),
MIN_NACK_DELAY_MS);
this.negativeAckRedeliveryBackoff = conf.getNegativeAckRedeliveryBackoff();
if (negativeAckRedeliveryBackoff != null) {
this.timerIntervalNanos = Math.max(
TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(0)),
MIN_NACK_DELAY_NANOS) / 3;
} else {
this.timerIntervalNanos = nackDelayNanos / 3;
}
this.negativeAckPrecisionBitCnt = conf.getNegativeAckPrecisionBitCnt();
}

private void triggerRedelivery(Timeout t) {
Expand All @@ -76,21 +77,48 @@ private void triggerRedelivery(Timeout t) {
return;
}

long now = System.nanoTime();
nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> {
if (timestamp < now) {
MessageId msgId = new MessageIdImpl(ledgerId, entryId,
// need to covert non-partitioned topic partition index to -1
(int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
long currentTimestamp = System.currentTimeMillis();
for (long timestamp : nackedMessages.keySet()) {
if (timestamp > currentTimestamp) {
// We are done with all the messages that need to be redelivered
break;
}

Long2ObjectMap<Roaring64Bitmap> ledgerMap = nackedMessages.get(timestamp);
for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : ledgerMap.long2ObjectEntrySet()) {
long ledgerId = ledgerEntry.getLongKey();
Roaring64Bitmap entrySet = ledgerEntry.getValue();
entrySet.forEach(entryId -> {
MessageId msgId = new MessageIdImpl(ledgerId, entryId, DUMMY_PARTITION_INDEX);
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
});
}
}

// remove entries from the nackedMessages map
LongBidirectionalIterator iterator = nackedMessages.keySet().iterator();
while (iterator.hasNext()) {
long timestamp = iterator.nextLong();
if (timestamp <= currentTimestamp) {
iterator.remove();
} else {
break;
}
});
for (MessageId messageId : messagesToRedeliver) {
nackedMessages.remove(((MessageIdImpl) messageId).getLedgerId(),
((MessageIdImpl) messageId).getEntryId());
}
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);

// Schedule the next redelivery if there are still messages to redeliver
if (!nackedMessages.isEmpty()) {
long nextTriggerTimestamp = nackedMessages.firstLongKey();
long delayMs = Math.max(nextTriggerTimestamp - currentTimestamp, 0);
if (delayMs > 0) {
this.timeout = timer.newTimeout(this::triggerRedelivery, delayMs, TimeUnit.MILLISECONDS);
} else {
this.timeout = timer.newTimeout(this::triggerRedelivery, 0, TimeUnit.MILLISECONDS);
}
} else {
this.timeout = null;
}
}

// release the lock of NegativeAcksTracker before calling consumer.redeliverUnacknowledgedMessages,
Expand All @@ -110,39 +138,56 @@ public synchronized void add(Message<?> message) {
add(message.getMessageId(), message.getRedeliveryCount());
}

static long trimLowerBit(long timestamp, int bits) {
return timestamp & (-1L << bits);
}

private synchronized void add(MessageId messageId, int redeliveryCount) {
if (nackedMessages == null) {
nackedMessages = ConcurrentLongLongPairHashMap.newBuilder()
.autoShrink(true)
.concurrencyLevel(1)
.build();
nackedMessages = new Long2ObjectAVLTreeMap<>();
}

long backoffNs;
long backoffMs;
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
if (negativeAckRedeliveryBackoff != null) {
backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount));
backoffMs = TimeUnit.MILLISECONDS.toMillis(negativeAckRedeliveryBackoff.next(redeliveryCount));
} else {
backoffNs = nackDelayNanos;
backoffMs = nackDelayMs;
}
MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId);
// ConcurrentLongLongPairHashMap requires the key and value >=0.
// partitionIndex is -1 if the message is from a non-partitioned topic, but we don't use
// partitionIndex actually, so we can set it to Long.MAX_VALUE in the case of non-partitioned topic to
// avoid exception from ConcurrentLongLongPairHashMap.
nackedMessages.put(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(),
messageIdAdv.getPartitionIndex() >= 0 ? messageIdAdv.getPartitionIndex() :
NON_PARTITIONED_TOPIC_PARTITION_INDEX, System.nanoTime() + backoffNs);
MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
long timestamp = trimLowerBit(System.currentTimeMillis() + backoffMs, negativeAckPrecisionBitCnt);
nackedMessages.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap<>())
.computeIfAbsent(messageIdAdv.getLedgerId(), k -> new Roaring64Bitmap())
.add(messageIdAdv.getEntryId());

if (this.timeout == null) {
// Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for
// nack immediately following the current one will be batched into the same redeliver request.
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
this.timeout = timer.newTimeout(this::triggerRedelivery, backoffMs, TimeUnit.MILLISECONDS);
}
}

/**
* Discard the batch index and partition index from the message id.
*
* @param messageId
* @return
*/
public static MessageIdAdv discardBatchAndPartitionIndex(MessageId messageId) {
if (messageId instanceof ChunkMessageIdImpl) {
return (MessageIdAdv) messageId;
}
MessageIdAdv msgId = (MessageIdAdv) messageId;
return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), DUMMY_PARTITION_INDEX);
}

@VisibleForTesting
Optional<Long> getNackedMessagesCount() {
return Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size);
synchronized long getNackedMessagesCount() {
if (nackedMessages == null) {
return 0;
}
return nackedMessages.values().stream().mapToLong(
ledgerMap -> ledgerMap.values().stream().mapToLong(
Roaring64Bitmap::getLongCardinality).sum()).sum();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
)
private long negativeAckRedeliveryDelayMicros = TimeUnit.MINUTES.toMicros(1);

@ApiModelProperty(
name = "negativeAckPrecisionBitCnt",
value = "The redelivery time precision bit count. The lower bits of the redelivery time will be"
+ "trimmed to reduce the memory occupation.\nThe default value is 8, which means the"
+ "redelivery time will be bucketed by 256ms, the redelivery time could be earlier(no later)"
+ "than the expected time, but no more than 256ms. \nIf set to k, the redelivery time will be"
+ "bucketed by 2^k ms.\nIf the value is 0, the redelivery time will be accurate to ms."
)
private int negativeAckPrecisionBitCnt = 8;
thetumbled marked this conversation as resolved.
Show resolved Hide resolved

@ApiModelProperty(
name = "maxTotalReceiverQueueSizeAcrossPartitions",
value = "The max total receiver queue size across partitions.\n"
Expand Down
Loading