Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix possible mark delete NPE when batch index ack is enabled #23833

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.bookkeeper.mledger.impl;

import javax.annotation.Nullable;

/**
* Interface to manage the ackSet state attached to a position.
* Helpers in {@link AckSetStateUtil} to create positions with
Expand All @@ -28,7 +30,7 @@ public interface AckSetState {
* Get the ackSet bitset information encoded as a long array.
* @return the ackSet
*/
long[] getAckSet();
@Nullable long[] getAckSet();

/**
* Set the ackSet bitset information as a long array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -61,6 +62,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import javax.annotation.Nullable;
import lombok.Getter;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -105,7 +108,6 @@
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.RangeBoundConsumer;
Expand Down Expand Up @@ -200,7 +202,9 @@ public class ManagedCursorImpl implements ManagedCursor {

// Maintain the deletion status for batch messages
// (ledgerId, entryId) -> deletion indexes
protected final ConcurrentSkipListMap<Position, BitSetRecyclable> batchDeletedIndexes;
@Getter
@VisibleForTesting
@Nullable protected final ConcurrentSkipListMap<Position, BitSet> batchDeletedIndexes;
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private RateLimiter markDeleteLimiter;
Expand Down Expand Up @@ -709,6 +713,7 @@ private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> i

private void recoverBatchDeletedIndexes (
List<MLDataFormats.BatchedEntryDeletionIndexInfo> batchDeletedIndexInfoList) {
Objects.requireNonNull(batchDeletedIndexes);
lock.writeLock().lock();
try {
this.batchDeletedIndexes.clear();
Expand All @@ -720,8 +725,7 @@ private void recoverBatchDeletedIndexes (
}
this.batchDeletedIndexes.put(
PositionFactory.create(batchDeletedIndexInfo.getPosition().getLedgerId(),
batchDeletedIndexInfo.getPosition().getEntryId()),
BitSetRecyclable.create().resetWords(array));
batchDeletedIndexInfo.getPosition().getEntryId()), BitSet.valueOf(array));
}
});
} finally {
Expand Down Expand Up @@ -1381,14 +1385,12 @@ public void operationComplete() {
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
? getProperties() : Collections.emptyMap(), null, null);
individualDeletedMessages.clear();
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
if (batchDeletedIndexes != null) {
batchDeletedIndexes.clear();
AckSetStateUtil.maybeGetAckSetState(newReadPosition).ifPresent(ackSetState -> {
long[] resetWords = ackSetState.getAckSet();
if (resetWords != null) {
BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords);
batchDeletedIndexes.put(newReadPosition, ackSet);
batchDeletedIndexes.put(newReadPosition, BitSet.valueOf(resetWords));
}
});
}
Expand Down Expand Up @@ -2017,47 +2019,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position);
}

Position newPosition = position;

Optional<AckSetState> ackSetStateOptional = AckSetStateUtil.maybeGetAckSetState(newPosition);
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
if (ackSetStateOptional.isPresent()) {
AtomicReference<BitSetRecyclable> bitSetRecyclable = new AtomicReference<>();
BitSetRecyclable givenBitSet =
BitSetRecyclable.create().resetWords(ackSetStateOptional.map(AckSetState::getAckSet).get());
// In order to prevent the batch index recorded in batchDeletedIndexes from rolling back,
// only update batchDeletedIndexes when the submitted batch index is greater
// than the recorded index.
batchDeletedIndexes.compute(newPosition,
(k, v) -> {
if (v == null) {
return givenBitSet;
}
if (givenBitSet.nextSetBit(0) > v.nextSetBit(0)) {
bitSetRecyclable.set(v);
return givenBitSet;
} else {
bitSetRecyclable.set(givenBitSet);
return v;
}
});
if (bitSetRecyclable.get() != null) {
bitSetRecyclable.get().recycle();
}
newPosition = ledger.getPreviousPosition(newPosition);
}
Map<Position, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionFactory.EARLIEST, newPosition);
subMap.values().forEach(BitSetRecyclable::recycle);
subMap.clear();
} else {
if (ackSetStateOptional.isPresent()) {
AckSetState ackSetState = ackSetStateOptional.get();
if (ackSetState.getAckSet() != null) {
newPosition = ledger.getPreviousPosition(newPosition);
}
}
}

Position newPosition = ackBatchPosition(position);
if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) {
boolean shouldCursorMoveForward = false;
try {
Expand Down Expand Up @@ -2103,6 +2065,31 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
internalAsyncMarkDelete(newPosition, properties, callback, ctx);
}

private Position ackBatchPosition(Position position) {
return AckSetStateUtil.maybeGetAckSetState(position)
.map(AckSetState::getAckSet)
.map(ackSet -> {
if (batchDeletedIndexes == null) {
return ledger.getPreviousPosition(position);
}
// In order to prevent the batch index recorded in batchDeletedIndexes from rolling back,
// only update batchDeletedIndexes when the submitted batch index is greater
// than the recorded index.
final var givenBitSet = BitSet.valueOf(ackSet);
batchDeletedIndexes.compute(position, (k, v) -> {
if (v == null || givenBitSet.nextSetBit(0) > v.nextSetBit(0)) {
return givenBitSet;
} else {
return v;
}
});
final var newPosition = ledger.getPreviousPosition(position);
batchDeletedIndexes.subMap(PositionFactory.EARLIEST, newPosition).clear();
return newPosition;
})
.orElse(position);
}

protected void internalAsyncMarkDelete(final Position newPosition, Map<String, Long> properties,
final MarkDeleteCallback callback, final Object ctx) {
ledger.mbean.addMarkDeleteOp();
Expand Down Expand Up @@ -2208,12 +2195,10 @@ public void operationComplete() {
try {
individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId());
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
Map<Position, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionFactory.EARLIEST,
if (batchDeletedIndexes != null) {
batchDeletedIndexes.subMap(PositionFactory.EARLIEST,
false, PositionFactory.create(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId()), true);
subMap.values().forEach(BitSetRecyclable::recycle);
subMap.clear();
mdEntry.newPosition.getEntryId()), true).clear();
}
persistentMarkDeletePosition = mdEntry.newPosition;
} finally {
Expand Down Expand Up @@ -2348,11 +2333,8 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
}

if (internalIsMessageDeleted(position)) {
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
if (batchDeletedIndexes != null) {
batchDeletedIndexes.remove(position);
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
Expand All @@ -2361,11 +2343,8 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
}
long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position);
if (ackSet == null) {
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
if (batchDeletedIndexes != null) {
batchDeletedIndexes.remove(position);
}
// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will
// make the RangeSet recognize the "continuity" between adjacent Positions.
Expand All @@ -2378,23 +2357,19 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
individualDeletedMessages);
}
} else if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSet);
BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet);
} else if (batchDeletedIndexes != null) {
final var givenBitSet = BitSet.valueOf(ackSet);
final var bitSet = batchDeletedIndexes.computeIfAbsent(position, __ -> givenBitSet);
if (givenBitSet != bitSet) {
bitSet.and(givenBitSet);
givenBitSet.recycle();
}
if (bitSet.isEmpty()) {
Position previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
previousPosition.getEntryId(),
position.getLedgerId(), position.getEntryId());
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
batchDeletedIndexes.remove(position);
}
}
}
Expand Down Expand Up @@ -3185,17 +3160,17 @@ private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletionIndexInfoList() {
lock.readLock().lock();
try {
if (!getConfig().isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes.isEmpty()) {
if (batchDeletedIndexes == null || batchDeletedIndexes.isEmpty()) {
return Collections.emptyList();
}
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
.newBuilder();
MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats
.BatchedEntryDeletionIndexInfo.newBuilder();
List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = new ArrayList<>();
Iterator<Map.Entry<Position, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
final var iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && result.size() < getConfig().getMaxBatchDeletedIndexToPersist()) {
Map.Entry<Position, BitSetRecyclable> entry = iterator.next();
final var entry = iterator.next();
nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId());
nestedPositionBuilder.setEntryId(entry.getKey().getEntryId());
batchDeletedIndexInfoBuilder.setPosition(nestedPositionBuilder.build());
Expand Down Expand Up @@ -3615,11 +3590,11 @@ private boolean internalIsMessageDeleted(Position position) {
@Override
public long[] getBatchPositionAckSet(Position position) {
if (batchDeletedIndexes != null) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.get(position);
if (bitSetRecyclable == null) {
final var bitSet = batchDeletedIndexes.get(position);
if (bitSet == null) {
return null;
} else {
return bitSetRecyclable.toLongArray();
return bitSet.toLongArray();
}
} else {
return null;
Expand Down Expand Up @@ -3722,8 +3697,8 @@ private ManagedCursorImpl cursorImpl() {

@Override
public long[] getDeletedBatchIndexesAsLongArray(Position position) {
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSet = batchDeletedIndexes.get(position);
if (batchDeletedIndexes != null) {
final var bitSet = batchDeletedIndexes.get(position);
return bitSet == null ? null : bitSet.toLongArray();
} else {
return null;
Expand Down Expand Up @@ -3851,9 +3826,9 @@ public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) thro
lock.readLock().unlock();
}
if (batchDeletedIndexes != null) {
for (Map.Entry<Position, BitSetRecyclable> entry : this.batchDeletedIndexes.entrySet()) {
BitSetRecyclable copiedBitSet = BitSetRecyclable.valueOf(entry.getValue());
newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), copiedBitSet);
Objects.requireNonNull(newNonDurableCursor.batchDeletedIndexes);
for (final var entry : this.batchDeletedIndexes.entrySet()) {
newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), (BitSet) entry.getValue().clone());
}
}
return newNonDurableCursor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -45,7 +44,6 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -223,10 +221,7 @@ public void txnAckTestBatchAndSharedSubMemoryDeleteTest() throws Exception {
(LinkedMap<TxnID, HashMap<Position, Position>>) field.get(pendingAckHandle);
assertTrue(individualAckOfTransaction.isEmpty());
managedCursor = (ManagedCursorImpl) testPersistentSubscription.getCursor();
field = ManagedCursorImpl.class.getDeclaredField("batchDeletedIndexes");
field.setAccessible(true);
final ConcurrentSkipListMap<Position, BitSetRecyclable> batchDeletedIndexes =
(ConcurrentSkipListMap<Position, BitSetRecyclable>) field.get(managedCursor);
final var batchDeletedIndexes = managedCursor.getBatchDeletedIndexes();
if (retryCnt == 0) {
//one message are not ack
Awaitility.await().until(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class MLPendingAckStoreTest extends TransactionTestBase {
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setUpBase(1, 1, NAMESPACE1 + "/test", 0);
}

Expand Down Expand Up @@ -304,4 +305,4 @@ private LinkedHashSet<Long> calculatePendingAckIndexes(List<Long> positionList,
}
return indexes;
}
}
}
Loading