diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java index 363336e83113e..6e9bf78b14cfe 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java @@ -18,6 +18,8 @@ */ package org.apache.bookkeeper.mledger.impl; +import javax.annotation.Nullable; + /** * Interface to manage the ackSet state attached to a position. * Helpers in {@link AckSetStateUtil} to create positions with @@ -28,7 +30,7 @@ public interface AckSetState { * Get the ackSet bitset information encoded as a long array. * @return the ackSet */ - long[] getAckSet(); + @Nullable long[] getAckSet(); /** * Set the ackSet bitset information as a long array. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 934bfba4b0d81..50e67470ea30b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -35,13 +35,14 @@ import java.time.Clock; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -61,6 +62,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.LongStream; +import javax.annotation.Nullable; +import lombok.Getter; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.BKException; @@ -105,7 +108,6 @@ import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.LongPairRangeSet; import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer; import org.apache.pulsar.common.util.collections.LongPairRangeSet.RangeBoundConsumer; @@ -200,7 +202,9 @@ public class ManagedCursorImpl implements ManagedCursor { // Maintain the deletion status for batch messages // (ledgerId, entryId) -> deletion indexes - protected final ConcurrentSkipListMap batchDeletedIndexes; + @Getter + @VisibleForTesting + @Nullable protected final ConcurrentSkipListMap batchDeletedIndexes; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private RateLimiter markDeleteLimiter; @@ -709,6 +713,7 @@ private void recoverIndividualDeletedMessages(List i private void recoverBatchDeletedIndexes ( List batchDeletedIndexInfoList) { + Objects.requireNonNull(batchDeletedIndexes); lock.writeLock().lock(); try { this.batchDeletedIndexes.clear(); @@ -720,8 +725,7 @@ private void recoverBatchDeletedIndexes ( } this.batchDeletedIndexes.put( PositionFactory.create(batchDeletedIndexInfo.getPosition().getLedgerId(), - batchDeletedIndexInfo.getPosition().getEntryId()), - BitSetRecyclable.create().resetWords(array)); + batchDeletedIndexInfo.getPosition().getEntryId()), BitSet.valueOf(array)); } }); } finally { @@ -1381,14 +1385,12 @@ public void operationComplete() { lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), null, null); individualDeletedMessages.clear(); - if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { - batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle); + if (batchDeletedIndexes != null) { batchDeletedIndexes.clear(); AckSetStateUtil.maybeGetAckSetState(newReadPosition).ifPresent(ackSetState -> { long[] resetWords = ackSetState.getAckSet(); if (resetWords != null) { - BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords); - batchDeletedIndexes.put(newReadPosition, ackSet); + batchDeletedIndexes.put(newReadPosition, BitSet.valueOf(resetWords)); } }); } @@ -2017,47 +2019,7 @@ public void asyncMarkDelete(final Position position, Map propertie log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position); } - Position newPosition = position; - - Optional ackSetStateOptional = AckSetStateUtil.maybeGetAckSetState(newPosition); - if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { - if (ackSetStateOptional.isPresent()) { - AtomicReference bitSetRecyclable = new AtomicReference<>(); - BitSetRecyclable givenBitSet = - BitSetRecyclable.create().resetWords(ackSetStateOptional.map(AckSetState::getAckSet).get()); - // In order to prevent the batch index recorded in batchDeletedIndexes from rolling back, - // only update batchDeletedIndexes when the submitted batch index is greater - // than the recorded index. - batchDeletedIndexes.compute(newPosition, - (k, v) -> { - if (v == null) { - return givenBitSet; - } - if (givenBitSet.nextSetBit(0) > v.nextSetBit(0)) { - bitSetRecyclable.set(v); - return givenBitSet; - } else { - bitSetRecyclable.set(givenBitSet); - return v; - } - }); - if (bitSetRecyclable.get() != null) { - bitSetRecyclable.get().recycle(); - } - newPosition = ledger.getPreviousPosition(newPosition); - } - Map subMap = batchDeletedIndexes.subMap(PositionFactory.EARLIEST, newPosition); - subMap.values().forEach(BitSetRecyclable::recycle); - subMap.clear(); - } else { - if (ackSetStateOptional.isPresent()) { - AckSetState ackSetState = ackSetStateOptional.get(); - if (ackSetState.getAckSet() != null) { - newPosition = ledger.getPreviousPosition(newPosition); - } - } - } - + Position newPosition = ackBatchPosition(position); if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) { boolean shouldCursorMoveForward = false; try { @@ -2103,6 +2065,31 @@ public void asyncMarkDelete(final Position position, Map propertie internalAsyncMarkDelete(newPosition, properties, callback, ctx); } + private Position ackBatchPosition(Position position) { + return AckSetStateUtil.maybeGetAckSetState(position) + .map(AckSetState::getAckSet) + .map(ackSet -> { + if (batchDeletedIndexes == null) { + return ledger.getPreviousPosition(position); + } + // In order to prevent the batch index recorded in batchDeletedIndexes from rolling back, + // only update batchDeletedIndexes when the submitted batch index is greater + // than the recorded index. + final var givenBitSet = BitSet.valueOf(ackSet); + batchDeletedIndexes.compute(position, (k, v) -> { + if (v == null || givenBitSet.nextSetBit(0) > v.nextSetBit(0)) { + return givenBitSet; + } else { + return v; + } + }); + final var newPosition = ledger.getPreviousPosition(position); + batchDeletedIndexes.subMap(PositionFactory.EARLIEST, newPosition).clear(); + return newPosition; + }) + .orElse(position); + } + protected void internalAsyncMarkDelete(final Position newPosition, Map properties, final MarkDeleteCallback callback, final Object ctx) { ledger.mbean.addMarkDeleteOp(); @@ -2208,12 +2195,10 @@ public void operationComplete() { try { individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId()); - if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { - Map subMap = batchDeletedIndexes.subMap(PositionFactory.EARLIEST, + if (batchDeletedIndexes != null) { + batchDeletedIndexes.subMap(PositionFactory.EARLIEST, false, PositionFactory.create(mdEntry.newPosition.getLedgerId(), - mdEntry.newPosition.getEntryId()), true); - subMap.values().forEach(BitSetRecyclable::recycle); - subMap.clear(); + mdEntry.newPosition.getEntryId()), true).clear(); } persistentMarkDeletePosition = mdEntry.newPosition; } finally { @@ -2348,11 +2333,8 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } if (internalIsMessageDeleted(position)) { - if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { - BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); - if (bitSetRecyclable != null) { - bitSetRecyclable.recycle(); - } + if (batchDeletedIndexes != null) { + batchDeletedIndexes.remove(position); } if (log.isDebugEnabled()) { log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position); @@ -2361,11 +2343,8 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position); if (ackSet == null) { - if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { - BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); - if (bitSetRecyclable != null) { - bitSetRecyclable.recycle(); - } + if (batchDeletedIndexes != null) { + batchDeletedIndexes.remove(position); } // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will // make the RangeSet recognize the "continuity" between adjacent Positions. @@ -2378,12 +2357,11 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name, individualDeletedMessages); } - } else if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { - BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSet); - BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet); + } else if (batchDeletedIndexes != null) { + final var givenBitSet = BitSet.valueOf(ackSet); + final var bitSet = batchDeletedIndexes.computeIfAbsent(position, __ -> givenBitSet); if (givenBitSet != bitSet) { bitSet.and(givenBitSet); - givenBitSet.recycle(); } if (bitSet.isEmpty()) { Position previousPosition = ledger.getPreviousPosition(position); @@ -2391,10 +2369,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId()); MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this); - BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); - if (bitSetRecyclable != null) { - bitSetRecyclable.recycle(); - } + batchDeletedIndexes.remove(position); } } } @@ -3185,7 +3160,7 @@ private List buildIndividualDeletedMessageRanges() { private List buildBatchEntryDeletionIndexInfoList() { lock.readLock().lock(); try { - if (!getConfig().isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes.isEmpty()) { + if (batchDeletedIndexes == null || batchDeletedIndexes.isEmpty()) { return Collections.emptyList(); } MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo @@ -3193,9 +3168,9 @@ private List buildBatchEntryDeletio MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats .BatchedEntryDeletionIndexInfo.newBuilder(); List result = new ArrayList<>(); - Iterator> iterator = batchDeletedIndexes.entrySet().iterator(); + final var iterator = batchDeletedIndexes.entrySet().iterator(); while (iterator.hasNext() && result.size() < getConfig().getMaxBatchDeletedIndexToPersist()) { - Map.Entry entry = iterator.next(); + final var entry = iterator.next(); nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId()); nestedPositionBuilder.setEntryId(entry.getKey().getEntryId()); batchDeletedIndexInfoBuilder.setPosition(nestedPositionBuilder.build()); @@ -3615,11 +3590,11 @@ private boolean internalIsMessageDeleted(Position position) { @Override public long[] getBatchPositionAckSet(Position position) { if (batchDeletedIndexes != null) { - BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.get(position); - if (bitSetRecyclable == null) { + final var bitSet = batchDeletedIndexes.get(position); + if (bitSet == null) { return null; } else { - return bitSetRecyclable.toLongArray(); + return bitSet.toLongArray(); } } else { return null; @@ -3722,8 +3697,8 @@ private ManagedCursorImpl cursorImpl() { @Override public long[] getDeletedBatchIndexesAsLongArray(Position position) { - if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { - BitSetRecyclable bitSet = batchDeletedIndexes.get(position); + if (batchDeletedIndexes != null) { + final var bitSet = batchDeletedIndexes.get(position); return bitSet == null ? null : bitSet.toLongArray(); } else { return null; @@ -3851,9 +3826,9 @@ public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) thro lock.readLock().unlock(); } if (batchDeletedIndexes != null) { - for (Map.Entry entry : this.batchDeletedIndexes.entrySet()) { - BitSetRecyclable copiedBitSet = BitSetRecyclable.valueOf(entry.getValue()); - newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), copiedBitSet); + Objects.requireNonNull(newNonDurableCursor.batchDeletedIndexes); + for (final var entry : this.batchDeletedIndexes.entrySet()) { + newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), (BitSet) entry.getValue().clone()); } } return newNonDurableCursor; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java index 58cf59aa6b3b9..1b4fd451c104f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -45,7 +44,6 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; -import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -223,10 +221,7 @@ public void txnAckTestBatchAndSharedSubMemoryDeleteTest() throws Exception { (LinkedMap>) field.get(pendingAckHandle); assertTrue(individualAckOfTransaction.isEmpty()); managedCursor = (ManagedCursorImpl) testPersistentSubscription.getCursor(); - field = ManagedCursorImpl.class.getDeclaredField("batchDeletedIndexes"); - field.setAccessible(true); - final ConcurrentSkipListMap batchDeletedIndexes = - (ConcurrentSkipListMap) field.get(managedCursor); + final var batchDeletedIndexes = managedCursor.getBatchDeletedIndexes(); if (retryCnt == 0) { //one message are not ack Awaitility.await().until(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java index 6dd3e6e7c7822..6bbfb25ee2ff7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java @@ -73,6 +73,7 @@ public class MLPendingAckStoreTest extends TransactionTestBase { @BeforeClass @Override protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); setUpBase(1, 1, NAMESPACE1 + "/test", 0); } @@ -304,4 +305,4 @@ private LinkedHashSet calculatePendingAckIndexes(List positionList, } return indexes; } -} \ No newline at end of file +}