Skip to content

Commit

Permalink
Add contains method to LocalCheckpointTracker (#33871)
Browse files Browse the repository at this point in the history
This change adds "contains" method to LocalCheckpointTracker.
One of the use cases is to check if a given operation has been processed
in an engine or not by looking up its seq_no in LocalCheckpointTracker.

Relates #33656
  • Loading branch information
dnhatn authored Sep 20, 2018
1 parent 4767a01 commit 05bf9dc
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,25 @@ public synchronized void waitForOpsToComplete(final long seqNo) throws Interrupt
}
}

/**
* Checks if the given sequence number was marked as completed in this tracker.
*/
public boolean contains(final long seqNo) {
assert seqNo >= 0 : "invalid seq_no=" + seqNo;
if (seqNo >= nextSeqNo) {
return false;
}
if (seqNo <= checkpoint) {
return true;
}
final long bitSetKey = getBitSetKey(seqNo);
final CountedBitSet bitSet;
synchronized (this) {
bitSet = processedSeqNo.get(bitSetKey);
}
return bitSet != null && bitSet.get(seqNoToBitSetOffset(seqNo));
}

/**
* Moves the checkpoint to the last consecutively processed sequence number. This method assumes that the sequence number following the
* current checkpoint is processed.
Expand Down Expand Up @@ -206,7 +225,6 @@ private long lastSeqNoInBitSet(final long bitSetKey) {
* @return the bit set corresponding to the provided sequence number
*/
private long getBitSetKey(final long seqNo) {
assert Thread.holdsLock(this);
return seqNo / BIT_SET_SIZE;
}

Expand All @@ -232,7 +250,6 @@ private CountedBitSet getBitSetForSeqNo(final long seqNo) {
* @return the position in the bit set corresponding to the provided sequence number
*/
private int seqNoToBitSetOffset(final long seqNo) {
assert Thread.holdsLock(this);
return Math.toIntExact(seqNo % BIT_SET_SIZE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,52 +65,71 @@ public void testSimplePrimary() {
assertThat(seqNo1, equalTo(0L));
tracker.markSeqNoAsCompleted(seqNo1);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.contains(0L), equalTo(true));
assertThat(tracker.contains(atLeast(1)), equalTo(false));
seqNo1 = tracker.generateSeqNo();
seqNo2 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(1L));
assertThat(seqNo2, equalTo(2L));
tracker.markSeqNoAsCompleted(seqNo2);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.contains(seqNo1), equalTo(false));
assertThat(tracker.contains(seqNo2), equalTo(true));
tracker.markSeqNoAsCompleted(seqNo1);
assertThat(tracker.getCheckpoint(), equalTo(2L));
assertThat(tracker.contains(between(0, 2)), equalTo(true));
assertThat(tracker.contains(atLeast(3)), equalTo(false));
}

public void testSimpleReplica() {
assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
assertThat(tracker.contains(randomNonNegativeLong()), equalTo(false));
tracker.markSeqNoAsCompleted(0L);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.contains(0), equalTo(true));
tracker.markSeqNoAsCompleted(2L);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.contains(1L), equalTo(false));
assertThat(tracker.contains(2L), equalTo(true));
tracker.markSeqNoAsCompleted(1L);
assertThat(tracker.getCheckpoint(), equalTo(2L));
assertThat(tracker.contains(between(0, 2)), equalTo(true));
assertThat(tracker.contains(atLeast(3)), equalTo(false));
}

public void testLazyInitialization() {
/*
* Previously this would allocate the entire chain of bit sets to the one for the sequence number being marked; for very large
* sequence numbers this could lead to excessive memory usage resulting in out of memory errors.
*/
tracker.markSeqNoAsCompleted(randomNonNegativeLong());
long seqNo = randomNonNegativeLong();
tracker.markSeqNoAsCompleted(seqNo);
assertThat(tracker.processedSeqNo.size(), equalTo(1));
assertThat(tracker.contains(seqNo), equalTo(true));
assertThat(tracker.contains(randomValueOtherThan(seqNo, ESTestCase::randomNonNegativeLong)), equalTo(false));
assertThat(tracker.processedSeqNo.size(), equalTo(1));
}

public void testSimpleOverFlow() {
List<Integer> seqNoList = new ArrayList<>();
List<Long> seqNoList = new ArrayList<>();
final boolean aligned = randomBoolean();
final int maxOps = BIT_SET_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_SET_SIZE - 1));

for (int i = 0; i < maxOps; i++) {
for (long i = 0; i < maxOps; i++) {
seqNoList.add(i);
}
Collections.shuffle(seqNoList, random());
for (Integer seqNo : seqNoList) {
for (Long seqNo : seqNoList) {
tracker.markSeqNoAsCompleted(seqNo);
}
assertThat(tracker.checkpoint, equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1));
if (aligned == false) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
}
assertThat(tracker.contains(randomFrom(seqNoList)), equalTo(true));
final long notCompletedSeqNo = randomValueOtherThanMany(seqNoList::contains, ESTestCase::randomNonNegativeLong);
assertThat(tracker.contains(notCompletedSeqNo), equalTo(false));
}

public void testConcurrentPrimary() throws InterruptedException {
Expand Down Expand Up @@ -199,8 +218,12 @@ protected void doRun() throws Exception {
}
assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L));
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
assertThat(tracker.contains(randomValueOtherThan(unFinishedSeq, () -> (long) randomFrom(seqNos))), equalTo(true));
assertThat(tracker.contains(unFinishedSeq), equalTo(false));
tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.contains(unFinishedSeq), equalTo(true));
assertThat(tracker.contains(randomLongBetween(maxOps, Long.MAX_VALUE)), equalTo(false));
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
if (tracker.processedSeqNo.size() == 1) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
Expand Down Expand Up @@ -272,4 +295,23 @@ public void describeTo(Description description) {
});
assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
}

public void testContains() {
final long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, 100);
final long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo);
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(maxSeqNo, localCheckpoint);
if (localCheckpoint >= 0) {
assertThat(tracker.contains(randomLongBetween(0, localCheckpoint)), equalTo(true));
}
assertThat(tracker.contains(randomLongBetween(localCheckpoint + 1, Long.MAX_VALUE)), equalTo(false));
final int numOps = between(1, 100);
final List<Long> seqNos = new ArrayList<>();
for (int i = 0; i < numOps; i++) {
long seqNo = randomLongBetween(0, 1000);
seqNos.add(seqNo);
tracker.markSeqNoAsCompleted(seqNo);
}
final long seqNo = randomNonNegativeLong();
assertThat(tracker.contains(seqNo), equalTo(seqNo <= localCheckpoint || seqNos.contains(seqNo)));
}
}

0 comments on commit 05bf9dc

Please sign in to comment.