Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce retention leases versioning #37951

Merged
merged 31 commits into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
98f4239
Introduce retention leases versioning
jasontedor Jan 29, 2019
38dffb5
Add Javadocs
jasontedor Jan 29, 2019
c715b5e
License header
jasontedor Jan 29, 2019
f8e8547
Relocate methods
jasontedor Jan 29, 2019
4aa6b77
Merge branch 'master' into retention-leases-version
jasontedor Jan 31, 2019
6ab494d
Add primary term
jasontedor Jan 31, 2019
2d3772c
Merge remote-tracking branch 'elastic/master' into retention-leases-v…
jasontedor Jan 31, 2019
954db81
Remove dead constructor
jasontedor Jan 31, 2019
9d09989
Partition leases when calculating expiration
jasontedor Jan 31, 2019
17d2095
Remove unnecessary map
jasontedor Jan 31, 2019
e724746
Fix test
jasontedor Jan 31, 2019
e8971b9
Fix spelling and imports
jasontedor Jan 31, 2019
f3cb4eb
Add tests and docs for supersedes
jasontedor Jan 31, 2019
0a7d19b
Adjust primary term limit
jasontedor Jan 31, 2019
034a840
Add missing newline
jasontedor Jan 31, 2019
05b69c0
Update tests
jasontedor Jan 31, 2019
600809c
Fix test
jasontedor Jan 31, 2019
39023d8
Fix imports
jasontedor Jan 31, 2019
80b08b3
Fix some naming
jasontedor Jan 31, 2019
ae0fc2a
Fix checkstyle
jasontedor Jan 31, 2019
08095cb
Update server/src/test/java/org/elasticsearch/index/seqno/Replication…
jasontedor Feb 1, 2019
8d9deae
Fix compilation
jasontedor Feb 1, 2019
fce4da6
Fix checkstyle
jasontedor Feb 1, 2019
76bb46b
Inline
jasontedor Feb 1, 2019
44c5e0f
Inline
jasontedor Feb 1, 2019
bc96632
Fix test
jasontedor Feb 1, 2019
becb055
Adjust docs
jasontedor Feb 1, 2019
73fb8f5
Disable BWC
jasontedor Feb 1, 2019
7cf145c
Merge branch 'master' into retention-leases-version
jasontedor Feb 1, 2019
686d35e
Merge branch 'master' into retention-leases-version
jasontedor Feb 1, 2019
be0edfc
Merge remote-tracking branch 'elastic/master' into retention-leases-v…
jasontedor Feb 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update tests
  • Loading branch information
jasontedor committed Jan 31, 2019
commit 05b69c01c53e1e3c0232559a138dc3f3f4f43581
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
volatile ReplicationGroup replicationGroup;

/**
* The current retention leases.
*/
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes

public void testAddOrRenewRetentionLease() {
final AllocationId allocationId = AllocationId.newInitializing();
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
randomNonNegativeLong(),
primaryTerm,
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
Expand All @@ -74,13 +75,21 @@ public void testAddOrRenewRetentionLease() {
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addRetentionLease(
Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, 1 + i, true);
if (rarely() && primaryTerm < Long.MAX_VALUE) {
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
replicationTracker.setOperationPrimaryTerm(primaryTerm);
}
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + i, true);
}

for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, 1 + length + i, true);
if (rarely() && primaryTerm < Long.MAX_VALUE) {
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
replicationTracker.setOperationPrimaryTerm(primaryTerm);
}
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + length + i, true);
}
}

Expand Down Expand Up @@ -147,11 +156,12 @@ private void runExpirationTest(final boolean primaryMode) {
.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
.build();
final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", settings),
randomNonNegativeLong(),
primaryTerm,
UNASSIGNED_SEQ_NO,
value -> {},
currentTimeMillis::get,
Expand All @@ -170,7 +180,7 @@ private void runExpirationTest(final boolean primaryMode) {
replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {}));
} else {
final RetentionLeases retentionLeases = new RetentionLeases(
1,
primaryTerm,
1,
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
replicationTracker.updateRetentionLeasesOnReplica(retentionLeases);
Expand All @@ -182,7 +192,7 @@ private void runExpirationTest(final boolean primaryMode) {
assertThat(retentionLeases.leases(), hasSize(1));
final RetentionLease retentionLease = retentionLeases.leases().iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, 1, primaryMode);
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 1, primaryMode);
}

// renew the lease
Expand All @@ -192,7 +202,7 @@ private void runExpirationTest(final boolean primaryMode) {
replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0");
} else {
final RetentionLeases retentionLeases = new RetentionLeases(
1,
primaryTerm,
2,
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
replicationTracker.updateRetentionLeasesOnReplica(retentionLeases);
Expand All @@ -204,16 +214,16 @@ private void runExpirationTest(final boolean primaryMode) {
assertThat(retentionLeases.leases(), hasSize(1));
final RetentionLease retentionLease = retentionLeases.leases().iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, 2, primaryMode);
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, primaryMode);
}

// now force the lease to expire
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
if (primaryMode) {
assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, 3, true);
assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 3, true);
} else {
// leases do not expire on replicas until synced from the primary
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, 2, false);
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, false);
}
}

Expand Down Expand Up @@ -366,9 +376,11 @@ private void assertRetentionLeases(
final int size,
final long[] minimumRetainingSequenceNumbers,
final LongSupplier currentTimeMillisSupplier,
final long primaryTerm,
final long version,
final boolean primaryMode) {
final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases();
assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm));
assertThat(retentionLeases.version(), equalTo(version));
final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
for (final RetentionLease retentionLease : retentionLeases.leases()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,22 @@ protected void tearDownThreadPool() {

public void testAddOrRenewRetentionLease() throws IOException {
final IndexShard indexShard = newStartedShard(true);
final long primaryTerm = indexShard.getOperationPrimaryTerm();
try {
final int length = randomIntBetween(0, 8);
final long[] minimumRetainingSequenceNumbers = new long[length];
for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
indexShard.addRetentionLease(
Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, 1 + i, true);
assertRetentionLeases(
indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + i, true);
}

for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
indexShard.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L, 1 + length + i, true);
assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + length + i, true);
}
} finally {
closeShards(indexShard);
Expand All @@ -112,6 +114,7 @@ private void runExpirationTest(final boolean primary) throws IOException {
.build();
// current time is mocked through the thread pool
final IndexShard indexShard = newStartedShard(primary, settings, new InternalEngineFactory());
final long primaryTerm = indexShard.getOperationPrimaryTerm();
try {
final long[] retainingSequenceNumbers = new long[1];
retainingSequenceNumbers[0] = randomLongBetween(0, Long.MAX_VALUE);
Expand All @@ -120,7 +123,7 @@ private void runExpirationTest(final boolean primary) throws IOException {
}));
} else {
final RetentionLeases retentionLeases = new RetentionLeases(
1,
primaryTerm,
1,
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
indexShard.updateRetentionLeasesOnReplica(retentionLeases);
Expand All @@ -132,7 +135,7 @@ private void runExpirationTest(final boolean primary) throws IOException {
assertThat(retentionLeases.leases(), hasSize(1));
final RetentionLease retentionLease = retentionLeases.leases().iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, 1, primary);
assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 1, primary);
}

// renew the lease
Expand All @@ -142,7 +145,7 @@ private void runExpirationTest(final boolean primary) throws IOException {
indexShard.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0");
} else {
final RetentionLeases retentionLeases = new RetentionLeases(
1,
primaryTerm,
2,
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
indexShard.updateRetentionLeasesOnReplica(retentionLeases);
Expand All @@ -154,16 +157,16 @@ private void runExpirationTest(final boolean primary) throws IOException {
assertThat(retentionLeases.leases(), hasSize(1));
final RetentionLease retentionLease = retentionLeases.leases().iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, 2, primary);
assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, primary);
}

// now force the lease to expire
currentTimeMillis.set(
currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
if (primary) {
assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get, 3, true);
assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 3, true);
} else {
assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, 2, false);
assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, false);
}
} finally {
closeShards(indexShard);
Expand Down Expand Up @@ -249,6 +252,7 @@ public void testRetentionLeaseStats() throws IOException {
length,
minimumRetainingSequenceNumbers,
() -> 0L,
indexShard.getOperationPrimaryTerm(),
length,
true);
} finally {
Expand All @@ -261,6 +265,7 @@ private void assertRetentionLeases(
final int size,
final long[] minimumRetainingSequenceNumbers,
final LongSupplier currentTimeMillisSupplier,
final long primaryTerm,
final long version,
final boolean primary) {
assertRetentionLeases(
Expand All @@ -269,6 +274,7 @@ private void assertRetentionLeases(
size,
minimumRetainingSequenceNumbers,
currentTimeMillisSupplier,
primaryTerm,
version,
primary);
}
Expand All @@ -279,8 +285,10 @@ private void assertRetentionLeases(
final int size,
final long[] minimumRetainingSequenceNumbers,
final LongSupplier currentTimeMillisSupplier,
final long primaryTerm,
final long version,
final boolean primary) {
assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm));
assertThat(retentionLeases.version(), equalTo(version));
final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
for (final RetentionLease retentionLease : retentionLeases.leases()) {
Expand Down