Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Optimize subscription seek (cursor reset) by timestamp #22792

Merged
merged 11 commits into from
Jan 9, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,31 @@ void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry>
void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
FindEntryCallback callback, Object ctx, boolean isFindFromLedger);


/**
* Find the newest entry that matches the given predicate.
*
* @param constraint
* search only active entries or all entries
* @param condition
* predicate that reads an entry an applies a condition
* @param callback
* callback object returning the resultant position
* @param startPosition
* start position to search from.
* @param endPosition
* end position to search to.
* @param ctx
* opaque context
* @param isFindFromLedger
* find the newest entry from ledger
*/
default void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
Position startPosition, Position endPosition, FindEntryCallback callback,
Object ctx, boolean isFindFromLedger) {
asyncFindNewestMatching(constraint, condition, callback, ctx, isFindFromLedger);
}

/**
* reset the cursor to specified position to enable replay of messages.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,27 +1272,55 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate
@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
OpFindNewest op;
Position startPosition = null;
long max = 0;
asyncFindNewestMatching(constraint, condition, null, null, callback, ctx,
isFindFromLedger);
}


@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
Position start, Position end, FindEntryCallback callback,
Object ctx, boolean isFindFromLedger) {
Position startPosition;
switch (constraint) {
case SearchAllAvailableEntries:
startPosition = getFirstPosition();
max = ledger.getNumberOfEntries() - 1;
break;
case SearchActiveEntries:
startPosition = ledger.getNextValidPosition(markDeletePosition);
max = getNumberOfEntriesInStorage();
break;
default:
callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx);
return;
case SearchAllAvailableEntries ->
startPosition = start == null ? getFirstPosition() : start;
case SearchActiveEntries -> {
if (start == null) {
startPosition = ledger.getNextValidPosition(markDeletePosition);
} else {
startPosition = start;
startPosition = startPosition.compareTo(markDeletePosition) <= 0
? ledger.getNextValidPosition(startPosition) : startPosition;
}
}
default -> {
callback.findEntryFailed(
new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx);
return;
}
}
// startPosition can't be null, should never go here.
if (startPosition == null) {
callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"),
Optional.empty(), ctx);
return;
}
// Calculate the end position
Position endPosition = end == null ? ledger.lastConfirmedEntry : end;
endPosition = endPosition.compareTo(ledger.lastConfirmedEntry) > 0 ? ledger.lastConfirmedEntry : endPosition;
// Calculate the number of entries between the startPosition and endPosition
long max = 0;
if (startPosition.compareTo(endPosition) <= 0) {
max = ledger.getNumberOfEntries(Range.closed(startPosition, endPosition));
}

if (max <= 0) {
callback.findEntryComplete(null, ctx);
return;
}

OpFindNewest op;
if (isFindFromLedger) {
op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4873,6 +4873,297 @@ public void operationFailed(ManagedLedgerException exception) {
assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
}

@Test
public void testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(2);
managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
@Cleanup
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd", managedLedgerConfig);
@Cleanup
ManagedCursor managedCursor = ledger.openCursor("test");

Position position = ledger.addEntry("test".getBytes(Encoding));
Position position1 = ledger.addEntry("test1".getBytes(Encoding));
Position position2 = ledger.addEntry("test2".getBytes(Encoding));
Position position3 = ledger.addEntry("test3".getBytes(Encoding));

Predicate<Entry> condition = entry -> {
try {
Position p = entry.getPosition();
return p.compareTo(position1) <= 0;
} finally {
entry.release();
}
};

// find the newest entry with start and end position
AtomicBoolean failed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Position> positionRef = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, position, position2, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef.set(position);
latch.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed.set(true);
latch.countDown();
}
}, null, true);

latch.await();
assertFalse(failed.get());
assertNotNull(positionRef.get());
assertEquals(positionRef.get(), position1);

// find the newest entry with start
AtomicBoolean failed1 = new AtomicBoolean(false);
CountDownLatch latch1 = new CountDownLatch(1);
AtomicReference<Position> positionRef1 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, position, null, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef1.set(position);
latch1.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed1.set(true);
latch1.countDown();
}
}, null, true);
latch1.await();
assertFalse(failed1.get());
assertNotNull(positionRef1.get());
assertEquals(positionRef1.get(), position1);

// find the newest entry with end
AtomicBoolean failed2 = new AtomicBoolean(false);
CountDownLatch latch2 = new CountDownLatch(1);
AtomicReference<Position> positionRef2 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, null, position2, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef2.set(position);
latch2.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed2.set(true);
latch2.countDown();
}
}, null, true);
latch2.await();
assertFalse(failed2.get());
assertNotNull(positionRef2.get());
assertEquals(positionRef2.get(), position1);

// find the newest entry without start and end position
AtomicBoolean failed3 = new AtomicBoolean(false);
CountDownLatch latch3 = new CountDownLatch(1);
AtomicReference<Position> positionRef3 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, null, null, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef3.set(position);
latch3.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed3.set(true);
latch3.countDown();
}
}, null, true);
latch3.await();
assertFalse(failed3.get());
assertNotNull(positionRef3.get());
assertEquals(positionRef3.get(), position1);

// find position3
AtomicBoolean failed4 = new AtomicBoolean(false);
CountDownLatch latch4 = new CountDownLatch(1);
AtomicReference<Position> positionRef4 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
try {
Position p = entry.getPosition();
return p.compareTo(position3) <= 0;
} finally {
entry.release();
}
}, position3, position3, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef4.set(position);
latch4.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed4.set(true);
latch4.countDown();
}
}, null, true);
latch4.await();
assertFalse(failed4.get());
assertNotNull(positionRef4.get());
assertEquals(positionRef4.get(), position3);
}


@Test
public void testFindNewestMatching_SearchActiveEntries_ByStartAndEnd() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(2);
managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
@Cleanup
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testFindNewestMatching_SearchActiveEntries_ByStartAndEnd", managedLedgerConfig);
@Cleanup
ManagedCursorImpl managedCursor = (ManagedCursorImpl) ledger.openCursor("test");

Position position = ledger.addEntry("test".getBytes(Encoding));
Position position1 = ledger.addEntry("test1".getBytes(Encoding));
Position position2 = ledger.addEntry("test2".getBytes(Encoding));
Position position3 = ledger.addEntry("test3".getBytes(Encoding));
Position position4 = ledger.addEntry("test4".getBytes(Encoding));
managedCursor.markDelete(position1);
assertEquals(managedCursor.getNumberOfEntries(), 3);

Predicate<Entry> condition = entry -> {
try {
Position p = entry.getPosition();
return p.compareTo(position3) <= 0;
} finally {
entry.release();
}
};

// find the newest entry with start and end position
AtomicBoolean failed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Position> positionRef = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, position2, position4, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef.set(position);
latch.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed.set(true);
latch.countDown();
}
}, null, true);
latch.await();
assertFalse(failed.get());
assertNotNull(positionRef.get());
assertEquals(positionRef.get(), position3);

// find the newest entry with start
AtomicBoolean failed1 = new AtomicBoolean(false);
CountDownLatch latch1 = new CountDownLatch(1);
AtomicReference<Position> positionRef1 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, position2, null, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef1.set(position);
latch1.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed1.set(true);
latch1.countDown();
}
}, null, true);

latch1.await();
assertFalse(failed1.get());
assertNotNull(positionRef1.get());
assertEquals(positionRef1.get(), position3);

// find the newest entry with end
AtomicBoolean failed2 = new AtomicBoolean(false);
CountDownLatch latch2 = new CountDownLatch(1);
AtomicReference<Position> positionRef2 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, null, position4, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef2.set(position);
latch2.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed2.set(true);
latch2.countDown();
}
}, null, true);

latch2.await();
assertFalse(failed2.get());
assertNotNull(positionRef2.get());
assertEquals(positionRef2.get(), position3);

// find the newest entry without start and end position
AtomicBoolean failed3 = new AtomicBoolean(false);
CountDownLatch latch3 = new CountDownLatch(1);
AtomicReference<Position> positionRef3 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, null, null, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef3.set(position);
latch3.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed3.set(true);
latch3.countDown();
}
}, null, true);
latch3.await();
assertFalse(failed3.get());
assertNotNull(positionRef3.get());
assertEquals(positionRef3.get(), position3);

// find position4
AtomicBoolean failed4 = new AtomicBoolean(false);
CountDownLatch latch4 = new CountDownLatch(1);
AtomicReference<Position> positionRef4 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
try {
Position p = entry.getPosition();
return p.compareTo(position4) <= 0;
} finally {
entry.release();
}
}, position4, position4, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef4.set(position);
latch4.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed4.set(true);
latch4.countDown();
}
}, null, true);
latch4.await();
assertFalse(failed4.get());
assertNotNull(positionRef4.get());
assertEquals(positionRef4.get(), position4);
}

@Test
void testForceCursorRecovery() throws Exception {
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
Expand Down
Loading
Loading