From f181e170387ca50727b750605fad51a3e85fed3a Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Feb 2019 17:19:19 -0500 Subject: [PATCH] Introduce retention leases versioning (#37951) Because concurrent sync requests from a primary to its replicas could be in flight, it can be the case that an older retention leases collection arrives and is processed on the replica after a newer retention leases collection has arrived and been processed. Without a defense, in this case the replica would overwrite the newer retention leases with the older retention leases. This commit addresses this issue by introducing a versioning scheme to retention leases. This versioning scheme is used to resolve out-of-order processing on the replica. We persist this version into Lucene and restore it on recovery. The encoding of retention leases is starting to get a little ugly. We can consider addressing this in a follow-up. --- build.gradle | 2 +- docs/reference/indices/flush.asciidoc | 4 +- .../index/engine/EngineConfig.java | 9 +- .../index/engine/InternalEngine.java | 6 +- .../index/engine/SoftDeletesPolicy.java | 11 +- .../index/seqno/ReplicationTracker.java | 70 ++--- .../index/seqno/RetentionLease.java | 52 +--- .../index/seqno/RetentionLeaseStats.java | 33 +-- .../index/seqno/RetentionLeaseSyncAction.java | 13 +- .../index/seqno/RetentionLeaseSyncer.java | 4 +- .../index/seqno/RetentionLeases.java | 253 ++++++++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 14 +- .../engine/CombinedDeletionPolicyTests.java | 12 +- .../index/engine/InternalEngineTests.java | 59 ++-- .../index/engine/SoftDeletesPolicyTests.java | 19 +- ...ReplicationTrackerRetentionLeaseTests.java | 144 ++++++++-- .../index/seqno/ReplicationTrackerTests.java | 2 +- .../index/seqno/RetentionLeaseStatsTests.java | 2 +- ...tentionLeaseStatsWireSerializingTests.java | 4 +- .../seqno/RetentionLeaseSyncActionTests.java | 16 +- .../index/seqno/RetentionLeaseSyncIT.java | 25 +- .../index/seqno/RetentionLeaseTests.java | 25 -- .../index/seqno/RetentionLeasesTests.java | 95 +++++++ .../shard/IndexShardRetentionLeaseTests.java | 85 +++--- .../index/shard/RefreshListenersTests.java | 31 ++- .../index/engine/EngineTestCase.java | 13 +- .../index/engine/FollowingEngineTests.java | 3 +- 27 files changed, 729 insertions(+), 277 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java create mode 100644 server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java diff --git a/build.gradle b/build.gradle index 42a4a42002222..22505ed69a66d 100644 --- a/build.gradle +++ b/build.gradle @@ -160,7 +160,7 @@ task verifyVersions { * after the backport of the backcompat code is complete. */ final boolean bwc_tests_enabled = false -final String bwc_tests_disabled_issue = "backport of#37977, #37857 and #37872" /* place a PR link here when committing bwc changes */ +final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/37951" /* place a PR link here when committing bwc changes */ if (bwc_tests_enabled == false) { if (bwc_tests_disabled_issue.isEmpty()) { throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false") diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index fdfcd80ecd463..ea8667aa1b713 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -104,7 +104,7 @@ which returns something similar to: "sync_id" : "AVvFY-071siAOuFGEO9P", <1> "max_unsafe_auto_id_timestamp" : "-1", "min_retained_seq_no" : "0", - "retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica" + "retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica" }, "num_docs" : 0 } @@ -119,7 +119,7 @@ which returns something similar to: // TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/] // TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/] // TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/] -// TESTRESPONSE[s/"retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/] +// TESTRESPONSE[s/"retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/] <1> the `sync id` marker [float] diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 1cc92319b5e45..7716cf93ffd6b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -35,7 +35,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogConfig; @@ -43,7 +43,6 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.function.LongSupplier; @@ -81,7 +80,7 @@ public final class EngineConfig { @Nullable private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; - private final Supplier> retentionLeasesSupplier; + private final Supplier retentionLeasesSupplier; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -89,7 +88,7 @@ public final class EngineConfig { * * @return a supplier of outstanding retention leases */ - public Supplier> retentionLeasesSupplier() { + public Supplier retentionLeasesSupplier() { return retentionLeasesSupplier; } @@ -141,7 +140,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool, List externalRefreshListener, List internalRefreshListener, Sort indexSort, CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier, - Supplier> retentionLeasesSupplier, + Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier) { this.shardId = shardId; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bb2da0947363d..acedd8356ea9e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -75,7 +75,7 @@ import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.LocalCheckpointTracker; -import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; @@ -2348,9 +2348,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl * We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum * retained sequence number, and the retention leases. */ - final Tuple> retentionPolicy = softDeletesPolicy.getRetentionPolicy(); + final Tuple retentionPolicy = softDeletesPolicy.getRetentionPolicy(); commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1())); - commitData.put(Engine.RETENTION_LEASES, RetentionLease.encodeRetentionLeases(retentionPolicy.v2())); + commitData.put(Engine.RETENTION_LEASES, RetentionLeases.encodeRetentionLeases(retentionPolicy.v2())); } logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index 17ec9a172e384..49b8f9d3483f2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -25,10 +25,10 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; -import java.util.Collection; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; @@ -47,13 +47,13 @@ final class SoftDeletesPolicy { // The min seq_no value that is retained - ops after this seq# should exist in the Lucene index. private long minRetainedSeqNo; // provides the retention leases used to calculate the minimum sequence number to retain - private final Supplier> retentionLeasesSupplier; + private final Supplier retentionLeasesSupplier; SoftDeletesPolicy( final LongSupplier globalCheckpointSupplier, final long minRetainedSeqNo, final long retentionOperations, - final Supplier> retentionLeasesSupplier) { + final Supplier retentionLeasesSupplier) { this.globalCheckpointSupplier = globalCheckpointSupplier; this.retentionOperations = retentionOperations; this.minRetainedSeqNo = minRetainedSeqNo; @@ -110,12 +110,12 @@ synchronized long getMinRetainedSeqNo() { return getRetentionPolicy().v1(); } - public synchronized Tuple> getRetentionPolicy() { + public synchronized Tuple getRetentionPolicy() { /* * When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is * locked for peer recovery. */ - final Collection retentionLeases = retentionLeasesSupplier.get(); + final RetentionLeases retentionLeases = retentionLeasesSupplier.get(); // do not advance if the retention lock is held if (retentionLockCount == 0) { /* @@ -130,6 +130,7 @@ public synchronized Tuple> getRetentionPolicy() // calculate the minimum sequence number to retain based on retention leases final long minimumRetainingSequenceNumber = retentionLeases + .leases() .stream() .mapToLong(RetentionLease::retainingSequenceNumber) .min() diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index bc51bc7b67164..34ec443a5404a 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -38,11 +38,11 @@ import org.elasticsearch.index.shard.ShardId; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.OptionalLong; @@ -54,6 +54,7 @@ import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; +import java.util.stream.Stream; /** * This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints). @@ -157,7 +158,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L * A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the * retention lease sync action, to sync retention leases to replicas. */ - private final BiConsumer, ActionListener> onSyncRetentionLeases; + private final BiConsumer> onSyncRetentionLeases; /** * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the @@ -170,12 +171,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ volatile ReplicationGroup replicationGroup; - private final Map retentionLeases = new HashMap<>(); - - private Collection copyRetentionLeases() { - assert Thread.holdsLock(this); - return Collections.unmodifiableCollection(new ArrayList<>(retentionLeases.values())); - } + /** + * The current retention leases. + */ + private RetentionLeases retentionLeases = RetentionLeases.EMPTY; /** * Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only @@ -183,27 +182,25 @@ private Collection copyRetentionLeases() { * * @return the retention leases */ - public Collection getRetentionLeases() { + public RetentionLeases getRetentionLeases() { final boolean wasPrimaryMode; - final Collection nonExpiredRetentionLeases; + final RetentionLeases nonExpiredRetentionLeases; synchronized (this) { if (primaryMode) { // the primary calculates the non-expired retention leases and syncs them to replicas final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); - final Collection expiredRetentionLeases = retentionLeases - .values() + final Map> partitionByExpiration = retentionLeases + .leases() .stream() - .filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() > retentionLeaseMillis) - .collect(Collectors.toList()); - if (expiredRetentionLeases.isEmpty()) { + .collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis)); + if (partitionByExpiration.get(true) == null) { // early out as no retention leases have expired - return copyRetentionLeases(); - } - // clean up the expired retention leases - for (final RetentionLease expiredRetentionLease : expiredRetentionLeases) { - retentionLeases.remove(expiredRetentionLease.id()); + return retentionLeases; } + final Collection nonExpiredLeases = + partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList(); + retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases); } /* * At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or @@ -211,7 +208,7 @@ public Collection getRetentionLeases() { * non-expired retention leases, instead receiving them on syncs from the primary. */ wasPrimaryMode = primaryMode; - nonExpiredRetentionLeases = copyRetentionLeases(); + nonExpiredRetentionLeases = retentionLeases; } if (wasPrimaryMode) { onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {})); @@ -236,15 +233,18 @@ public RetentionLease addRetentionLease( final ActionListener listener) { Objects.requireNonNull(listener); final RetentionLease retentionLease; - final Collection currentRetentionLeases; + final RetentionLeases currentRetentionLeases; synchronized (this) { assert primaryMode; - if (retentionLeases.containsKey(id)) { + if (retentionLeases.contains(id)) { throw new IllegalArgumentException("retention lease with ID [" + id + "] already exists"); } retentionLease = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source); - retentionLeases.put(id, retentionLease); - currentRetentionLeases = copyRetentionLeases(); + retentionLeases = new RetentionLeases( + operationPrimaryTerm, + retentionLeases.version() + 1, + Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList())); + currentRetentionLeases = retentionLeases; } onSyncRetentionLeases.accept(currentRetentionLeases, listener); return retentionLease; @@ -261,18 +261,25 @@ public RetentionLease addRetentionLease( */ public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) { assert primaryMode; - if (retentionLeases.containsKey(id) == false) { + if (retentionLeases.contains(id) == false) { throw new IllegalArgumentException("retention lease with ID [" + id + "] does not exist"); } final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source); - final RetentionLease existingRetentionLease = retentionLeases.put(id, retentionLease); + final RetentionLease existingRetentionLease = retentionLeases.get(id); assert existingRetentionLease != null; assert existingRetentionLease.retainingSequenceNumber() <= retentionLease.retainingSequenceNumber() : "retention lease renewal for [" + id + "]" + " from [" + source + "]" + " renewed a lower retaining sequence number [" + retentionLease.retainingSequenceNumber() + "]" + " than the current lease retaining sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]"; + retentionLeases = new RetentionLeases( + operationPrimaryTerm, + retentionLeases.version() + 1, + Stream.concat( + retentionLeases.leases().stream().filter(lease -> lease.id().equals(id) == false), + Stream.of(retentionLease)) + .collect(Collectors.toList())); return retentionLease; } @@ -281,10 +288,11 @@ public synchronized RetentionLease renewRetentionLease(final String id, final lo * * @param retentionLeases the retention leases */ - public synchronized void updateRetentionLeasesOnReplica(final Collection retentionLeases) { + public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases) { assert primaryMode == false; - this.retentionLeases.clear(); - this.retentionLeases.putAll(retentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity()))); + if (retentionLeases.supersedes(this.retentionLeases)) { + this.retentionLeases = retentionLeases; + } } public static class CheckpointState implements Writeable { @@ -565,7 +573,7 @@ public ReplicationTracker( final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, - final BiConsumer, ActionListener> onSyncRetentionLeases) { + final BiConsumer> onSyncRetentionLeases) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index 362d068f45e2d..e1d362d98764a 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -25,13 +25,8 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.Locale; -import java.util.Map; import java.util.Objects; -import java.util.function.Function; -import java.util.stream.Collectors; /** * A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such @@ -162,22 +157,10 @@ static String encodeRetentionLease(final RetentionLease retentionLease) { return String.format( Locale.ROOT, "id:%s;retaining_seq_no:%d;timestamp:%d;source:%s", - retentionLease.id(), - retentionLease.retainingSequenceNumber(), - retentionLease.timestamp(), - retentionLease.source()); - } - - /** - * Encodes a collection of retention leases as a string. This encoding can be decoed by {@link #decodeRetentionLeases(String)}. The - * encoding is a comma-separated encoding of each retention lease as encoded by {@link #encodeRetentionLease(RetentionLease)}. - * - * @param retentionLeases the retention leases - * @return the encoding of the retention leases - */ - public static String encodeRetentionLeases(final Collection retentionLeases) { - Objects.requireNonNull(retentionLeases); - return retentionLeases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(",")); + retentionLease.id, + retentionLease.retainingSequenceNumber, + retentionLease.timestamp, + retentionLease.source); } /** @@ -201,23 +184,6 @@ static RetentionLease decodeRetentionLease(final String encodedRetentionLease) { return new RetentionLease(id, retainingSequenceNumber, timestamp, source); } - /** - * Decodes retention leases encoded by {@link #encodeRetentionLeases(Collection)}. - * - * @param encodedRetentionLeases an encoded collection of retention leases - * @return the decoded retention leases - */ - public static Collection decodeRetentionLeases(final String encodedRetentionLeases) { - Objects.requireNonNull(encodedRetentionLeases); - if (encodedRetentionLeases.isEmpty()) { - return Collections.emptyList(); - } - assert Arrays.stream(encodedRetentionLeases.split(",")) - .allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+")) - : encodedRetentionLeases; - return Arrays.stream(encodedRetentionLeases.split(",")).map(RetentionLease::decodeRetentionLease).collect(Collectors.toList()); - } - @Override public boolean equals(final Object o) { if (this == o) return true; @@ -244,14 +210,4 @@ public String toString() { '}'; } - /** - * A utility method to convert a collection of retention leases to a map from retention lease ID to retention lease. - * - * @param leases the leases - * @return the map from retention lease ID to retention lease - */ - static Map toMap(final Collection leases) { - return leases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())); - } - } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseStats.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseStats.java index b8f1454a12c2b..14f485d314928 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseStats.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseStats.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; -import java.util.Collection; import java.util.Objects; /** @@ -34,24 +33,24 @@ */ public final class RetentionLeaseStats implements ToXContentFragment, Writeable { - private final Collection leases; + private final RetentionLeases retentionLeases; /** - * The underlying retention leases backing this stats object. + * The underlying retention lease collection backing this stats object. * - * @return the leases + * @return the retention lease collection */ - public Collection leases() { - return leases; + public RetentionLeases retentionLeases() { + return retentionLeases; } /** - * Constructs a new retention lease stats object from the specified leases. + * Constructs a new retention lease stats object from the specified retention lease collection. * - * @param leases the leases + * @param retentionLeases the retention lease collection */ - public RetentionLeaseStats(final Collection leases) { - this.leases = Objects.requireNonNull(leases); + public RetentionLeaseStats(final RetentionLeases retentionLeases) { + this.retentionLeases = Objects.requireNonNull(retentionLeases); } /** @@ -62,7 +61,7 @@ public RetentionLeaseStats(final Collection leases) { * @throws IOException if an I/O exception occurs reading from the stream */ public RetentionLeaseStats(final StreamInput in) throws IOException { - leases = in.readList(RetentionLease::new); + retentionLeases = new RetentionLeases(in); } /** @@ -74,7 +73,7 @@ public RetentionLeaseStats(final StreamInput in) throws IOException { */ @Override public void writeTo(final StreamOutput out) throws IOException { - out.writeCollection(leases); + retentionLeases.writeTo(out); } /** @@ -82,16 +81,18 @@ public void writeTo(final StreamOutput out) throws IOException { * * @param builder the builder * @param params the params - * @return the builder that these retention leases were converted to {@link org.elasticsearch.common.xcontent.XContent} into + * @return the builder that this retention lease collection was converted to {@link org.elasticsearch.common.xcontent.XContent} into * @throws IOException if an I/O exception occurs writing to the builder */ @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject("retention_leases"); { + builder.field("primary_term", retentionLeases.primaryTerm()); + builder.field("version", retentionLeases.version()); builder.startArray("leases"); { - for (final RetentionLease retentionLease : leases) { + for (final RetentionLease retentionLease : retentionLeases.leases()) { builder.startObject(); { builder.field("id", retentionLease.id()); @@ -113,12 +114,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final RetentionLeaseStats that = (RetentionLeaseStats) o; - return Objects.equals(leases, that.leases); + return Objects.equals(retentionLeases, that.retentionLeases); } @Override public int hashCode() { - return Objects.hash(leases); + return Objects.hash(retentionLeases); } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 3b7df41f72d05..89a679abea591 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -47,7 +47,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Collection; import java.util.Objects; /** @@ -99,7 +98,7 @@ public RetentionLeaseSyncAction( */ public void syncRetentionLeasesForShard( final ShardId shardId, - final Collection retentionLeases, + final RetentionLeases retentionLeases, final ActionListener listener) { Objects.requireNonNull(shardId); Objects.requireNonNull(retentionLeases); @@ -149,9 +148,9 @@ private void flush(final IndexShard indexShard) { public static final class Request extends ReplicatedWriteRequest { - private Collection retentionLeases; + private RetentionLeases retentionLeases; - public Collection getRetentionLeases() { + public RetentionLeases getRetentionLeases() { return retentionLeases; } @@ -159,7 +158,7 @@ public Request() { } - public Request(final ShardId shardId, final Collection retentionLeases) { + public Request(final ShardId shardId, final RetentionLeases retentionLeases) { super(Objects.requireNonNull(shardId)); this.retentionLeases = Objects.requireNonNull(retentionLeases); } @@ -167,13 +166,13 @@ public Request(final ShardId shardId, final Collection retention @Override public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); - retentionLeases = in.readList(RetentionLease::new); + retentionLeases = new RetentionLeases(in); } @Override public void writeTo(final StreamOutput out) throws IOException { super.writeTo(Objects.requireNonNull(out)); - out.writeCollection(retentionLeases); + retentionLeases.writeTo(out); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java index 1e276eb98adaf..a19700a94da4b 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java @@ -23,8 +23,6 @@ import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.index.shard.ShardId; -import java.util.Collection; - /** * A functional interface that represents a method for syncing retention leases to replica shards after a new retention lease is added on * the primary. @@ -42,7 +40,7 @@ public interface RetentionLeaseSyncer { */ void syncRetentionLeasesForShard( ShardId shardId, - Collection retentionLeases, + RetentionLeases retentionLeases, ActionListener listener); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java new file mode 100644 index 0000000000000..5a9d9e333b27b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java @@ -0,0 +1,253 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Represents a versioned collection of retention leases. We version the collection of retention leases to ensure that sync requests that + * arrive out of order on the replica, using the version to ensure that older sync requests are rejected. + */ +public class RetentionLeases implements Writeable { + + private final long primaryTerm; + + /** + * The primary term of this retention lease collection. + * + * @return the primary term + */ + public long primaryTerm() { + return primaryTerm; + } + + private final long version; + + /** + * The version of this retention lease collection. The version is managed on the primary and incremented any time that a retention lease + * is added, renewed, or when retention leases expire. + * + * @return the version of this retention lease collection + */ + public long version() { + return version; + } + + /** + * Checks if this retention leases collection supersedes the specified retention leases collection. A retention leases collection + * supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher + * + * @param that the retention leases collection to test against + * @return true if this retention leases collection supercedes the specified retention lease collection, otherwise false + */ + public boolean supersedes(final RetentionLeases that) { + return primaryTerm > that.primaryTerm || primaryTerm == that.primaryTerm && version > that.version; + } + + private final Map leases; + + /** + * The underlying collection of retention leases + * + * @return the retention leases + */ + public Collection leases() { + return Collections.unmodifiableCollection(leases.values()); + } + + /** + * Checks if this retention lease collection contains a retention lease with the specified {@link RetentionLease#id()}. + * + * @param id the retention lease ID + * @return true if this retention lease collection contains a retention lease with the specified ID, otherwise false + */ + public boolean contains(final String id) { + return leases.containsKey(id); + } + + /** + * Returns the retention lease with the specified ID, or null if no such retention lease exists. + * + * @param id the retention lease ID + * @return the retention lease, or null if no retention lease with the specified ID exists + */ + public RetentionLease get(final String id) { + return leases.get(id); + } + + /** + * Represents an empty an un-versioned retention lease collection. This is used when no retention lease collection is found in the + * commit point + */ + public static RetentionLeases EMPTY = new RetentionLeases(1, 0, Collections.emptyList()); + + /** + * Constructs a new retention lease collection with the specified version and underlying collection of retention leases. + * + * @param primaryTerm the primary term under which this retention lease collection was created + * @param version the version of this retention lease collection + * @param leases the retention leases + */ + public RetentionLeases(final long primaryTerm, final long version, final Collection leases) { + if (primaryTerm <= 0) { + throw new IllegalArgumentException("primary term must be positive but was [" + primaryTerm + "]"); + } + if (version < 0) { + throw new IllegalArgumentException("version must be non-negative but was [" + version + "]"); + } + Objects.requireNonNull(leases); + this.primaryTerm = primaryTerm; + this.version = version; + this.leases = Collections.unmodifiableMap(toMap(leases)); + } + + /** + * Constructs a new retention lease collection from a stream. The retention lease collection should have been written via + * {@link #writeTo(StreamOutput)}. + * + * @param in the stream to construct the retention lease collection from + * @throws IOException if an I/O exception occurs reading from the stream + */ + public RetentionLeases(final StreamInput in) throws IOException { + primaryTerm = in.readVLong(); + version = in.readVLong(); + leases = Collections.unmodifiableMap(toMap(in.readList(RetentionLease::new))); + } + + /** + * Writes a retention lease collection to a stream in a manner suitable for later reconstruction via + * {@link #RetentionLeases(StreamInput)} (StreamInput)}. + * + * @param out the stream to write the retention lease collection to + * @throws IOException if an I/O exception occurs writing to the stream + */ + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeVLong(primaryTerm); + out.writeVLong(version); + out.writeCollection(leases.values()); + } + + /** + * Encodes a retention lease collection as a string. This encoding can be decoded by + * {@link RetentionLeases#decodeRetentionLeases(String)}. The encoding is a comma-separated encoding of each retention lease as encoded + * by {@link RetentionLease#encodeRetentionLease(RetentionLease)}, prefixed by the version of the retention lease collection. + * + * @param retentionLeases the retention lease collection + * @return the encoding of the retention lease collection + */ + public static String encodeRetentionLeases(final RetentionLeases retentionLeases) { + Objects.requireNonNull(retentionLeases); + return String.format( + Locale.ROOT, + "primary_term:%d;version:%d;%s", + retentionLeases.primaryTerm, + retentionLeases.version, + retentionLeases.leases.values().stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(","))); + } + + /** + * Decodes retention leases encoded by {@link #encodeRetentionLeases(RetentionLeases)}. + * + * @param encodedRetentionLeases an encoded retention lease collection + * @return the decoded retention lease collection + */ + public static RetentionLeases decodeRetentionLeases(final String encodedRetentionLeases) { + Objects.requireNonNull(encodedRetentionLeases); + if (encodedRetentionLeases.isEmpty()) { + return EMPTY; + } + assert encodedRetentionLeases.matches("primary_term:\\d+;version:\\d+;.*") : encodedRetentionLeases; + final int firstSemicolon = encodedRetentionLeases.indexOf(";"); + final long primaryTerm = Long.parseLong(encodedRetentionLeases.substring("primary_term:".length(), firstSemicolon)); + final int secondSemicolon = encodedRetentionLeases.indexOf(";", firstSemicolon + 1); + final long version = Long.parseLong(encodedRetentionLeases.substring(firstSemicolon + 1 + "version:".length(), secondSemicolon)); + final Collection leases; + if (secondSemicolon + 1 == encodedRetentionLeases.length()) { + leases = Collections.emptyList(); + } else { + assert Arrays.stream(encodedRetentionLeases.substring(secondSemicolon + 1).split(",")) + .allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+")) + : encodedRetentionLeases; + leases = Arrays.stream(encodedRetentionLeases.substring(secondSemicolon + 1).split(",")) + .map(RetentionLease::decodeRetentionLease) + .collect(Collectors.toList()); + } + + return new RetentionLeases(primaryTerm, version, leases); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final RetentionLeases that = (RetentionLeases) o; + return primaryTerm == that.primaryTerm && + version == that.version && + Objects.equals(leases, that.leases); + } + + @Override + public int hashCode() { + return Objects.hash(primaryTerm, version, leases); + } + + @Override + public String toString() { + return "RetentionLeases{" + + "primaryTerm=" + primaryTerm + + ", version=" + version + + ", leases=" + leases + + '}'; + } + + /** + * A utility method to convert retention leases to a map from retention lease ID to retention lease. + * + * @param leases the retention leases + * @return the map from retention lease ID to retention lease + */ + private static Map toMap(final Collection leases) { + return leases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())); + } + + /** + * A utility method to convert a retention lease collection to a map from retention lease ID to retention lease. + * + * @param retentionLeases the retention lease collection + * @return the map from retention lease ID to retention lease + */ + static Map toMap(final RetentionLeases retentionLeases) { + return retentionLeases.leases; + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 42822942e3adf..261ee919f2a64 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -109,6 +109,7 @@ import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseStats; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; @@ -143,7 +144,6 @@ import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -267,7 +267,7 @@ public IndexShard( final List searchOperationListener, final List listeners, final Runnable globalCheckpointSyncer, - final BiConsumer, ActionListener> retentionLeaseSyncer, + final BiConsumer> retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -1444,12 +1444,12 @@ private void innerOpenEngineAndTranslog() throws IOException { assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } - static Collection getRetentionLeases(final SegmentInfos segmentInfos) { + static RetentionLeases getRetentionLeases(final SegmentInfos segmentInfos) { final String committedRetentionLeases = segmentInfos.getUserData().get(Engine.RETENTION_LEASES); if (committedRetentionLeases == null) { - return Collections.emptyList(); + return RetentionLeases.EMPTY; } - return RetentionLease.decodeRetentionLeases(committedRetentionLeases); + return RetentionLeases.decodeRetentionLeases(committedRetentionLeases); } private void trimUnsafeCommits() throws IOException { @@ -1892,7 +1892,7 @@ public void addGlobalCheckpointListener( * * @return the retention leases */ - public Collection getRetentionLeases() { + public RetentionLeases getRetentionLeases() { verifyNotClosed(); return replicationTracker.getRetentionLeases(); } @@ -1943,7 +1943,7 @@ public RetentionLease renewRetentionLease(final String id, final long retainingS * * @param retentionLeases the retention leases */ - public void updateRetentionLeasesOnReplica(final Collection retentionLeases) { + public void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases) { assert assertReplicationTarget(); verifyNotClosed(); replicationTracker.updateRetentionLeasesOnReplica(retentionLeases); diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 054bfb8bad695..617d23c44e1c6 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.LongArrayList; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.store.Directory; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogDeletionPolicy; @@ -30,7 +31,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -55,7 +55,7 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); final int extraRetainedOps = between(0, 100); final SoftDeletesPolicy softDeletesPolicy = - new SoftDeletesPolicy(globalCheckpoint::get, NO_OPS_PERFORMED, extraRetainedOps, Collections::emptyList); + new SoftDeletesPolicy(globalCheckpoint::get, NO_OPS_PERFORMED, extraRetainedOps, () -> RetentionLeases.EMPTY); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); @@ -101,7 +101,7 @@ public void testAcquireIndexCommit() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); final int extraRetainedOps = between(0, 100); final SoftDeletesPolicy softDeletesPolicy = - new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, Collections::emptyList); + new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, () -> RetentionLeases.EMPTY); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); @@ -182,7 +182,7 @@ public void testAcquireIndexCommit() throws Exception { public void testLegacyIndex() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); - final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList); + final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); @@ -217,7 +217,7 @@ public void testLegacyIndex() throws Exception { public void testDeleteInvalidCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); - final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList); + final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); @@ -251,7 +251,7 @@ public void testDeleteInvalidCommits() throws Exception { public void testCheckUnreferencedCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); - final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList); + final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY); final UUID translogUUID = UUID.randomUUID(); final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index d984d1702f257..25b0c9e00cb7d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -117,6 +117,7 @@ import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexSearcherWrapper; @@ -141,7 +142,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -3052,12 +3052,29 @@ public void testRecoverFromForeignTranslog() throws IOException { TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE); - EngineConfig brokenConfig = new EngineConfig(shardId, allocationId.getId(), - threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), - new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), - config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, - new NoneCircuitBreakerService(), () -> UNASSIGNED_SEQ_NO, Collections::emptyList, primaryTerm::get, + EngineConfig brokenConfig = new EngineConfig( + shardId, + allocationId.getId(), + threadPool, + config.getIndexSettings(), + null, + store, + newMergePolicy(), + config.getAnalyzer(), + config.getSimilarity(), + new CodecService(null, logger), + config.getEventListener(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + translogConfig, + TimeValue.timeValueMinutes(5), + config.getExternalRefreshListener(), + config.getInternalRefreshListener(), + null, + new NoneCircuitBreakerService(), + () -> UNASSIGNED_SEQ_NO, + () -> RetentionLeases.EMPTY, + primaryTerm::get, tombstoneDocSupplier()); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); @@ -5287,14 +5304,23 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - final AtomicReference> leasesHolder = new AtomicReference<>(Collections.emptyList()); + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + final AtomicLong retentionLeasesVersion = new AtomicLong(); + final AtomicReference retentionLeasesHolder = new AtomicReference<>(RetentionLeases.EMPTY); final List operations = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "2"); Randomness.shuffle(operations); Set existingSeqNos = new HashSet<>(); store = createStore(); - engine = createEngine( - config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, leasesHolder::get)); + engine = createEngine(config( + indexSettings, + store, + createTempDir(), + newMergePolicy(), + null, + null, + globalCheckpoint::get, + retentionLeasesHolder::get)); assertThat(engine.getMinRetainedSeqNo(), equalTo(0L)); long lastMinRetainedSeqNo = engine.getMinRetainedSeqNo(); for (Engine.Operation op : operations) { @@ -5309,6 +5335,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint())); } if (randomBoolean()) { + retentionLeasesVersion.incrementAndGet(); final int length = randomIntBetween(0, 8); final List leases = new ArrayList<>(length); for (int i = 0; i < length; i++) { @@ -5318,7 +5345,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { final String source = randomAlphaOfLength(8); leases.add(new RetentionLease(id, retainingSequenceNumber, timestamp, source)); } - leasesHolder.set(leases); + retentionLeasesHolder.set(new RetentionLeases(primaryTerm, retentionLeasesVersion.get(), leases)); } if (rarely()) { settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10)); @@ -5332,13 +5359,15 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { engine.flush(true, true); assertThat(Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(Engine.MIN_RETAINED_SEQNO)), equalTo(engine.getMinRetainedSeqNo())); - final Collection leases = leasesHolder.get(); - if (leases.isEmpty()) { - assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), equalTo("")); + final RetentionLeases leases = retentionLeasesHolder.get(); + if (leases.leases().isEmpty()) { + assertThat( + engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), + equalTo("primary_term:" + primaryTerm + ";version:" + retentionLeasesVersion.get() + ";")); } else { assertThat( engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), - equalTo(RetentionLease.encodeRetentionLeases(leases))); + equalTo(RetentionLeases.encodeRetentionLeases(leases))); } } if (rarely()) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java index 8a34b0d1b5207..8257aa99d0486 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java @@ -24,15 +24,14 @@ import org.apache.lucene.search.Query; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -54,13 +53,13 @@ public void testSoftDeletesRetentionLock() { for (int i = 0; i < retainingSequenceNumbers.length; i++) { retainingSequenceNumbers[i] = new AtomicLong(); } - final Supplier> retentionLeasesSupplier = + final Supplier retentionLeasesSupplier = () -> { - final Set leases = new HashSet<>(retainingSequenceNumbers.length); + final List leases = new ArrayList<>(retainingSequenceNumbers.length); for (int i = 0; i < retainingSequenceNumbers.length; i++) { leases.add(new RetentionLease(Integer.toString(i), retainingSequenceNumbers[i].get(), 0L, "test")); } - return leases; + return new RetentionLeases(1, 1, leases); }; long safeCommitCheckpoint = globalCheckpoint.get(); SoftDeletesPolicy policy = new SoftDeletesPolicy(globalCheckpoint::get, between(1, 10000), retainedOps, retentionLeasesSupplier); @@ -126,16 +125,20 @@ public void testAlwaysFetchLatestRetentionLeases() { for (int i = 0; i < numLeases; i++) { leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(0, 1000), randomNonNegativeLong(), "test")); } - final Supplier> leasesSupplier = () -> Collections.unmodifiableCollection(new ArrayList<>(leases)); + final Supplier leasesSupplier = + () -> new RetentionLeases( + randomNonNegativeLong(), + randomNonNegativeLong(), + Collections.unmodifiableCollection(new ArrayList<>(leases))); final SoftDeletesPolicy policy = new SoftDeletesPolicy(globalCheckpoint::get, randomIntBetween(1, 1000), randomIntBetween(0, 1000), leasesSupplier); if (randomBoolean()) { policy.acquireRetentionLock(); } if (numLeases == 0) { - assertThat(policy.getRetentionPolicy().v2(), empty()); + assertThat(policy.getRetentionPolicy().v2().leases(), empty()); } else { - assertThat(policy.getRetentionPolicy().v2(), contains(leases.toArray(new RetentionLease[0]))); + assertThat(policy.getRetentionPolicy().v2().leases(), contains(leases.toArray(new RetentionLease[0]))); } } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index d7f135ffe4816..9781d893a1d53 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.IndexSettingsModule; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -40,6 +41,8 @@ import java.util.stream.Collectors; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; @@ -49,11 +52,12 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes public void testAddOrRenewRetentionLease() { final AllocationId allocationId = AllocationId.newInitializing(); + long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); final ReplicationTracker replicationTracker = new ReplicationTracker( new ShardId("test", "_na", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), - randomNonNegativeLong(), + primaryTerm, UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, @@ -70,19 +74,27 @@ public void testAddOrRenewRetentionLease() { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); replicationTracker.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); - assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true); + if (rarely() && primaryTerm < Long.MAX_VALUE) { + primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE); + replicationTracker.setOperationPrimaryTerm(primaryTerm); + } + assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + i, true); } for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); - assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, true); + if (rarely() && primaryTerm < Long.MAX_VALUE) { + primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE); + replicationTracker.setOperationPrimaryTerm(primaryTerm); + } + assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + length + i, true); } } public void testAddRetentionLeaseCausesRetentionLeaseSync() { final AllocationId allocationId = AllocationId.newInitializing(); - final Map retentionLeases = new HashMap<>(); + final Map retainingSequenceNumbers = new HashMap<>(); final AtomicBoolean invoked = new AtomicBoolean(); final AtomicReference reference = new AtomicReference<>(); final ReplicationTracker replicationTracker = new ReplicationTracker( @@ -98,8 +110,10 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() { assertFalse(Thread.holdsLock(reference.get())); invoked.set(true); assertThat( - leases.stream().collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)), - equalTo(retentionLeases)); + leases.leases() + .stream() + .collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)), + equalTo(retainingSequenceNumbers)); }); reference.set(replicationTracker); replicationTracker.updateFromMaster( @@ -113,7 +127,7 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() { for (int i = 0; i < length; i++) { final String id = randomAlphaOfLength(8); final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); - retentionLeases.put(id, retainingSequenceNumber); + retainingSequenceNumbers.put(id, retainingSequenceNumber); replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {})); // assert that the new retention lease callback was invoked assertTrue(invoked.get()); @@ -141,11 +155,12 @@ private void runExpirationTest(final boolean primaryMode) { .builder() .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis)) .build(); + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); final ReplicationTracker replicationTracker = new ReplicationTracker( new ShardId("test", "_na", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", settings), - randomNonNegativeLong(), + primaryTerm, UNASSIGNED_SEQ_NO, value -> {}, currentTimeMillis::get, @@ -163,16 +178,20 @@ private void runExpirationTest(final boolean primaryMode) { if (primaryMode) { replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); } else { - replicationTracker.updateRetentionLeasesOnReplica( + final RetentionLeases retentionLeases = new RetentionLeases( + primaryTerm, + 1, Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + replicationTracker.updateRetentionLeasesOnReplica(retentionLeases); } { - final Collection retentionLeases = replicationTracker.getRetentionLeases(); - assertThat(retentionLeases, hasSize(1)); - final RetentionLease retentionLease = retentionLeases.iterator().next(); + final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases(); + assertThat(retentionLeases.version(), equalTo(1L)); + assertThat(retentionLeases.leases(), hasSize(1)); + final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 1, primaryMode); } // renew the lease @@ -181,25 +200,29 @@ private void runExpirationTest(final boolean primaryMode) { if (primaryMode) { replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0"); } else { - replicationTracker.updateRetentionLeasesOnReplica( + final RetentionLeases retentionLeases = new RetentionLeases( + primaryTerm, + 2, Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + replicationTracker.updateRetentionLeasesOnReplica(retentionLeases); } { - final Collection retentionLeases = replicationTracker.getRetentionLeases(); - assertThat(retentionLeases, hasSize(1)); - final RetentionLease retentionLease = retentionLeases.iterator().next(); + final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases(); + assertThat(retentionLeases.version(), equalTo(2L)); + assertThat(retentionLeases.leases(), hasSize(1)); + final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, primaryMode); } // now force the lease to expire currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); if (primaryMode) { - assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, true); + assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 3, true); } else { // leases do not expire on replicas until synced from the primary - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, false); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, false); } } @@ -227,7 +250,9 @@ public void testRetentionLeaseExpirationCausesRetentionLeaseSync() { assertFalse(Thread.holdsLock(reference.get())); invoked.set(true); assertThat( - leases.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)), + leases.leases() + .stream() + .collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)), equalTo(retentionLeases)); }); reference.set(replicationTracker); @@ -239,11 +264,14 @@ public void testRetentionLeaseExpirationCausesRetentionLeaseSync() { replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); final int length = randomIntBetween(0, 8); + long version = 0; for (int i = 0; i < length; i++) { final String id = randomAlphaOfLength(8); final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get())); replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {})); + version++; + assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version)); // assert that the new retention lease callback was invoked assertTrue(invoked.get()); @@ -252,6 +280,8 @@ public void testRetentionLeaseExpirationCausesRetentionLeaseSync() { currentTimeMillis.set(1 + currentTimeMillis.get()); retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get())); replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test"); + version++; + assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version)); // reset the invocation marker so that we can assert the callback was invoked if any leases are expired assertFalse(invoked.get()); @@ -264,16 +294,76 @@ public void testRetentionLeaseExpirationCausesRetentionLeaseSync() { .map(Map.Entry::getKey) .collect(Collectors.toList()); expiredIds.forEach(retentionLeases::remove); + if (expiredIds.isEmpty() == false) { + version++; + } currentTimeMillis.set(currentTimeMillis.get() + currentTimeMillisIncrement); // getting the leases has the side effect of calculating which leases are expired and invoking the sync callback - final Collection current = replicationTracker.getRetentionLeases(); + final RetentionLeases current = replicationTracker.getRetentionLeases(); + assertThat(current.version(), equalTo(version)); // the current leases should equal our tracking map assertThat( - current.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)), + current.leases() + .stream() + .collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)), equalTo(retentionLeases)); // the callback should only be invoked if there were expired leases assertThat(invoked.get(), equalTo(expiredIds.isEmpty() == false)); } + assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version)); + } + + public void testReplicaIgnoresOlderRetentionLeasesVersion() { + final AllocationId allocationId = AllocationId.newInitializing(); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomNonNegativeLong(), + UNASSIGNED_SEQ_NO, + value -> {}, + () -> 0L, + (leases, listener) -> {}); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId), + Collections.emptySet()); + final int length = randomIntBetween(0, 8); + final List retentionLeasesCollection = new ArrayList<>(length); + long primaryTerm = 1; + long version = 0; + for (int i = 0; i < length; i++) { + final int innerLength = randomIntBetween(0, 8); + final Collection leases = new ArrayList<>(); + for (int j = 0; j < innerLength; j++) { + leases.add( + new RetentionLease(i + "-" + j, randomNonNegativeLong(), randomNonNegativeLong(), randomAlphaOfLength(8))); + version++; + } + if (rarely()) { + primaryTerm++; + } + retentionLeasesCollection.add(new RetentionLeases(primaryTerm, version, leases)); + } + final Collection expectedLeases; + if (length == 0 || retentionLeasesCollection.get(length - 1).leases().isEmpty()) { + expectedLeases = Collections.emptyList(); + } else { + expectedLeases = retentionLeasesCollection.get(length - 1).leases(); + } + Collections.shuffle(retentionLeasesCollection, random()); + for (final RetentionLeases retentionLeases : retentionLeasesCollection) { + replicationTracker.updateRetentionLeasesOnReplica(retentionLeases); + } + assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version)); + if (expectedLeases.isEmpty()) { + assertThat(replicationTracker.getRetentionLeases().leases(), empty()); + } else { + assertThat( + replicationTracker.getRetentionLeases().leases(), + contains(expectedLeases.toArray(new RetentionLease[0]))); + } } private static Tuple toTuple(final RetentionLease retentionLease) { @@ -285,10 +375,14 @@ private void assertRetentionLeases( final int size, final long[] minimumRetainingSequenceNumbers, final LongSupplier currentTimeMillisSupplier, + final long primaryTerm, + final long version, final boolean primaryMode) { - final Collection retentionLeases = replicationTracker.getRetentionLeases(); + final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases(); + assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm)); + assertThat(retentionLeases.version(), equalTo(version)); final Map idToRetentionLease = new HashMap<>(); - for (final RetentionLease retentionLease : retentionLeases) { + for (final RetentionLease retentionLease : retentionLeases.leases()) { idToRetentionLease.put(retentionLease.id(), retentionLease); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index 7731f3cbf1d5f..037d2130b5c7b 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -687,7 +687,7 @@ public void testPrimaryContextHandoff() throws IOException { final LongConsumer onUpdate = updatedGlobalCheckpoint -> {}; final long primaryTerm = randomNonNegativeLong(); final long globalCheckpoint = UNASSIGNED_SEQ_NO; - final BiConsumer, ActionListener> onNewRetentionLease = + final BiConsumer> onNewRetentionLease = (leases, listener) -> {}; ReplicationTracker oldPrimary = new ReplicationTracker( shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java index d77acc53b247e..8721450073531 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java @@ -61,7 +61,7 @@ public void testRetentionLeaseStats() throws InterruptedException { final IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("index").execute().actionGet(); assertThat(indicesStats.getShards(), arrayWithSize(1)); final RetentionLeaseStats retentionLeaseStats = indicesStats.getShards()[0].getRetentionLeaseStats(); - assertThat(RetentionLease.toMap(retentionLeaseStats.leases()), equalTo(currentRetentionLeases)); + assertThat(RetentionLeases.toMap(retentionLeaseStats.retentionLeases()), equalTo(currentRetentionLeases)); } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsWireSerializingTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsWireSerializingTests.java index fe5dee782c4fe..9c7aee5191ac8 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsWireSerializingTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsWireSerializingTests.java @@ -30,6 +30,8 @@ public class RetentionLeaseStatsWireSerializingTests extends AbstractWireSeriali @Override protected RetentionLeaseStats createTestInstance() { + final long primaryTerm = randomNonNegativeLong(); + final long version = randomNonNegativeLong(); final int length = randomIntBetween(0, 8); final Collection leases; if (length == 0) { @@ -44,7 +46,7 @@ protected RetentionLeaseStats createTestInstance() { leases.add(new RetentionLease(id, retainingSequenceNumber, timestamp, source)); } } - return new RetentionLeaseStats(leases); + return new RetentionLeaseStats(new RetentionLeases(primaryTerm, version, leases)); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index 0cd85ef60f21a..ab92d5ad2326d 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -43,7 +43,6 @@ import org.elasticsearch.transport.TransportService; import org.mockito.ArgumentCaptor; -import java.util.Collection; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; @@ -114,10 +113,8 @@ public void testRetentionLeaseSyncActionOnPrimary() { shardStateAction, new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver()); - @SuppressWarnings("unchecked") final Collection retentionLeases = - (Collection) mock(Collection.class); - final RetentionLeaseSyncAction.Request request = - new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); + final RetentionLeases retentionLeases = mock(RetentionLeases.class); + final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); final TransportWriteAction.WritePrimaryResult result = action.shardOperationOnPrimary(request, indexShard); @@ -155,10 +152,8 @@ public void testRetentionLeaseSyncActionOnReplica() { shardStateAction, new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver()); - @SuppressWarnings("unchecked") final Collection retentionLeases = - (Collection) mock(Collection.class); - final RetentionLeaseSyncAction.Request request = - new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); + final RetentionLeases retentionLeases = mock(RetentionLeases.class); + final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); final TransportWriteAction.WriteReplicaResult result = action.shardOperationOnReplica(request, indexShard); // the retention leases on the shard should be updated @@ -190,8 +185,7 @@ public void testRetentionLeaseSyncExecution() { final Logger retentionLeaseSyncActionLogger = mock(Logger.class); - @SuppressWarnings("unchecked") final Collection retentionLeases = - (Collection) mock(Collection.class); + final RetentionLeases retentionLeases = mock(RetentionLeases.class); final AtomicBoolean invoked = new AtomicBoolean(); final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction( Settings.EMPTY, diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java index d009486778d89..3e69c84e3cde3 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java @@ -33,7 +33,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -78,9 +77,9 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { retentionLock.close(); // check retention leases have been committed on the primary - final Collection primaryCommittedRetentionLeases = RetentionLease.decodeRetentionLeases( + final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases( primary.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES)); - assertThat(currentRetentionLeases, equalTo(RetentionLease.toMap(primaryCommittedRetentionLeases))); + assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primaryCommittedRetentionLeases))); // check current retention leases have been synced to all replicas for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { @@ -89,13 +88,13 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - final Map retentionLeasesOnReplica = RetentionLease.toMap(replica.getRetentionLeases()); + final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); // check retention leases have been committed on the replica - final Collection replicaCommittedRetentionLeases = RetentionLease.decodeRetentionLeases( + final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases( replica.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES)); - assertThat(currentRetentionLeases, equalTo(RetentionLease.toMap(replicaCommittedRetentionLeases))); + assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases))); } } } @@ -138,14 +137,14 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - assertThat(replica.getRetentionLeases(), hasItem(currentRetentionLease)); + assertThat(replica.getRetentionLeases().leases(), hasItem(currentRetentionLease)); } // sleep long enough that *possibly* the current retention lease has expired, and certainly that any previous have final long later = System.nanoTime(); Thread.sleep(Math.max(0, retentionLeaseTimeToLive.millis() - TimeUnit.NANOSECONDS.toMillis(later - now))); - final Collection currentRetentionLeases = primary.getRetentionLeases(); - assertThat(currentRetentionLeases, anyOf(empty(), contains(currentRetentionLease))); + final RetentionLeases currentRetentionLeases = primary.getRetentionLeases(); + assertThat(currentRetentionLeases.leases(), anyOf(empty(), contains(currentRetentionLease))); /* * Check that expiration of retention leases has been synced to all replicas. We have to assert busy since syncing happens in @@ -158,10 +157,12 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - if (currentRetentionLeases.isEmpty()) { - assertThat(replica.getRetentionLeases(), empty()); + if (currentRetentionLeases.leases().isEmpty()) { + assertThat(replica.getRetentionLeases().leases(), empty()); } else { - assertThat(replica.getRetentionLeases(), contains(currentRetentionLeases.toArray(new RetentionLease[0]))); + assertThat( + replica.getRetentionLeases().leases(), + contains(currentRetentionLeases.leases().toArray(new RetentionLease[0]))); } } }); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java index 1a8d159c18757..bd2dee78b05ed 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java @@ -24,13 +24,8 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; @@ -107,24 +102,4 @@ public void testRetentionLeaseEncoding() { assertThat(RetentionLease.decodeRetentionLease(RetentionLease.encodeRetentionLease(retentionLease)), equalTo(retentionLease)); } - public void testRetentionLeasesEncoding() { - final int length = randomIntBetween(0, 8); - final List retentionLeases = new ArrayList<>(length); - for (int i = 0; i < length; i++) { - final String id = randomAlphaOfLength(8); - final long retainingSequenceNumber = randomNonNegativeLong(); - final long timestamp = randomNonNegativeLong(); - final String source = randomAlphaOfLength(8); - final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source); - retentionLeases.add(retentionLease); - } - final Collection decodedRetentionLeases = - RetentionLease.decodeRetentionLeases(RetentionLease.encodeRetentionLeases(retentionLeases)); - if (length == 0) { - assertThat(decodedRetentionLeases, empty()); - } else { - assertThat(decodedRetentionLeases, contains(retentionLeases.toArray(new RetentionLease[0]))); - } - } - } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java new file mode 100644 index 0000000000000..33cc83f602860 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java @@ -0,0 +1,95 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; + +public class RetentionLeasesTests extends ESTestCase { + + public void testPrimaryTermOutOfRange() { + final long primaryTerm = randomLongBetween(Long.MIN_VALUE, 0); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> new RetentionLeases(primaryTerm, randomNonNegativeLong(), Collections.emptyList())); + assertThat(e, hasToString(containsString("primary term must be positive but was [" + primaryTerm + "]"))); + } + + public void testVersionOutOfRange() { + final long version = randomLongBetween(Long.MIN_VALUE, -1); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> new RetentionLeases(randomLongBetween(1, Long.MAX_VALUE), version, Collections.emptyList())); + assertThat(e, hasToString(containsString("version must be non-negative but was [" + version + "]"))); + } + + public void testRetentionLeasesEncoding() { + final long primaryTerm = randomNonNegativeLong(); + final long version = randomNonNegativeLong(); + final int length = randomIntBetween(0, 8); + final List retentionLeases = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomNonNegativeLong(); + final long timestamp = randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source); + retentionLeases.add(retentionLease); + } + final RetentionLeases decodedRetentionLeases = + RetentionLeases.decodeRetentionLeases( + RetentionLeases.encodeRetentionLeases(new RetentionLeases(primaryTerm, version, retentionLeases))); + assertThat(decodedRetentionLeases.version(), equalTo(version)); + if (length == 0) { + assertThat(decodedRetentionLeases.leases(), empty()); + } else { + assertThat(decodedRetentionLeases.leases(), containsInAnyOrder(retentionLeases.toArray(new RetentionLease[0]))); + } + } + + public void testSupersedesByPrimaryTerm() { + final long lowerPrimaryTerm = randomLongBetween(1, Long.MAX_VALUE); + final RetentionLeases left = new RetentionLeases(lowerPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList()); + final long higherPrimaryTerm = randomLongBetween(lowerPrimaryTerm + 1, Long.MAX_VALUE); + final RetentionLeases right = new RetentionLeases(higherPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList()); + assertTrue(right.supersedes(left)); + assertFalse(left.supersedes(right)); + } + + public void testSupersedesByVersion() { + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + final long lowerVersion = randomLongBetween(1, Long.MAX_VALUE); + final long higherVersion = randomLongBetween(lowerVersion + 1, Long.MAX_VALUE); + final RetentionLeases left = new RetentionLeases(primaryTerm, lowerVersion, Collections.emptyList()); + final RetentionLeases right = new RetentionLeases(primaryTerm, higherVersion, Collections.emptyList()); + assertTrue(right.supersedes(left)); + assertFalse(left.supersedes(right)); + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index f66b383c2799c..76ca9f5b02458 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -31,11 +31,11 @@ import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseStats; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -76,21 +76,22 @@ protected void tearDownThreadPool() { public void testAddOrRenewRetentionLease() throws IOException { final IndexShard indexShard = newStartedShard(true); + final long primaryTerm = indexShard.getOperationPrimaryTerm(); try { final int length = randomIntBetween(0, 8); final long[] minimumRetainingSequenceNumbers = new long[length]; for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); indexShard.addRetentionLease( - Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> { - })); - assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true); + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); + assertRetentionLeases( + indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + i, true); } for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); indexShard.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); - assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L, true); + assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + length + i, true); } } finally { closeShards(indexShard); @@ -113,6 +114,7 @@ private void runExpirationTest(final boolean primary) throws IOException { .build(); // current time is mocked through the thread pool final IndexShard indexShard = newStartedShard(primary, settings, new InternalEngineFactory()); + final long primaryTerm = indexShard.getOperationPrimaryTerm(); try { final long[] retainingSequenceNumbers = new long[1]; retainingSequenceNumbers[0] = randomLongBetween(0, Long.MAX_VALUE); @@ -120,16 +122,20 @@ private void runExpirationTest(final boolean primary) throws IOException { indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> { })); } else { - indexShard.updateRetentionLeasesOnReplica( + final RetentionLeases retentionLeases = new RetentionLeases( + primaryTerm, + 1, Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + indexShard.updateRetentionLeasesOnReplica(retentionLeases); } { - final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); - assertThat(retentionLeases, hasSize(1)); - final RetentionLease retentionLease = retentionLeases.iterator().next(); + final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); + assertThat(retentionLeases.version(), equalTo(1L)); + assertThat(retentionLeases.leases(), hasSize(1)); + final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primary); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 1, primary); } // renew the lease @@ -138,25 +144,29 @@ private void runExpirationTest(final boolean primary) throws IOException { if (primary) { indexShard.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0"); } else { - indexShard.updateRetentionLeasesOnReplica( + final RetentionLeases retentionLeases = new RetentionLeases( + primaryTerm, + 2, Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + indexShard.updateRetentionLeasesOnReplica(retentionLeases); } { - final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); - assertThat(retentionLeases, hasSize(1)); - final RetentionLease retentionLease = retentionLeases.iterator().next(); + final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); + assertThat(retentionLeases.version(), equalTo(2L)); + assertThat(retentionLeases.leases(), hasSize(1)); + final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primary); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, primary); } // now force the lease to expire currentTimeMillis.set( currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); if (primary) { - assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get, true); + assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 3, true); } else { - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, false); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, false); } } finally { closeShards(indexShard); @@ -191,11 +201,14 @@ public void testCommit() throws IOException { // the committed retention leases should equal our current retention leases final SegmentInfos segmentCommitInfos = indexShard.store().readLastCommittedSegmentsInfo(); assertTrue(segmentCommitInfos.getUserData().containsKey(Engine.RETENTION_LEASES)); - final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); - if (retentionLeases.isEmpty()) { - assertThat(IndexShard.getRetentionLeases(segmentCommitInfos), empty()); + final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); + final RetentionLeases committedRetentionLeases = IndexShard.getRetentionLeases(segmentCommitInfos); + if (retentionLeases.leases().isEmpty()) { + assertThat(committedRetentionLeases.version(), equalTo(0L)); + assertThat(committedRetentionLeases.leases(), empty()); } else { - assertThat(IndexShard.getRetentionLeases(segmentCommitInfos), contains(retentionLeases.toArray(new RetentionLease[0]))); + assertThat(committedRetentionLeases.version(), equalTo((long) length)); + assertThat(retentionLeases.leases(), contains(retentionLeases.leases().toArray(new RetentionLease[0]))); } // when we recover, we should recover the retention leases @@ -204,12 +217,15 @@ public void testCommit() throws IOException { ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), RecoverySource.ExistingStoreRecoverySource.INSTANCE)); try { recoverShardFromStore(recoveredShard); - if (retentionLeases.isEmpty()) { - assertThat(recoveredShard.getEngine().config().retentionLeasesSupplier().get(), empty()); + final RetentionLeases recoveredRetentionLeases = recoveredShard.getEngine().config().retentionLeasesSupplier().get(); + if (retentionLeases.leases().isEmpty()) { + assertThat(recoveredRetentionLeases.version(), equalTo(0L)); + assertThat(recoveredRetentionLeases.leases(), empty()); } else { + assertThat(recoveredRetentionLeases.version(), equalTo((long) length)); assertThat( - recoveredShard.getEngine().config().retentionLeasesSupplier().get(), - contains(retentionLeases.toArray(new RetentionLease[0]))); + recoveredRetentionLeases.leases(), + contains(retentionLeases.leases().toArray(new RetentionLease[0]))); } } finally { closeShards(recoveredShard); @@ -227,16 +243,17 @@ public void testRetentionLeaseStats() throws IOException { for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); indexShard.addRetentionLease( - Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> { - })); + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); } final RetentionLeaseStats stats = indexShard.getRetentionLeaseStats(); assertRetentionLeases( - stats.leases(), + stats.retentionLeases(), indexShard.indexSettings().getRetentionLeaseMillis(), length, minimumRetainingSequenceNumbers, () -> 0L, + length == 0 ? RetentionLeases.EMPTY.primaryTerm() : indexShard.getOperationPrimaryTerm(), + length, true); } finally { closeShards(indexShard); @@ -248,6 +265,8 @@ private void assertRetentionLeases( final int size, final long[] minimumRetainingSequenceNumbers, final LongSupplier currentTimeMillisSupplier, + final long primaryTerm, + final long version, final boolean primary) { assertRetentionLeases( indexShard.getEngine().config().retentionLeasesSupplier().get(), @@ -255,18 +274,24 @@ private void assertRetentionLeases( size, minimumRetainingSequenceNumbers, currentTimeMillisSupplier, + primaryTerm, + version, primary); } private void assertRetentionLeases( - final Collection retentionLeases, + final RetentionLeases retentionLeases, final long retentionLeaseMillis, final int size, final long[] minimumRetainingSequenceNumbers, final LongSupplier currentTimeMillisSupplier, + final long primaryTerm, + final long version, final boolean primary) { + assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm)); + assertThat(retentionLeases.version(), equalTo(version)); final Map idToRetentionLease = new HashMap<>(); - for (final RetentionLease retentionLease : retentionLeases) { + for (final RetentionLease retentionLease : retentionLeases.leases()) { idToRetentionLease.put(retentionLease.id(), retentionLease); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 53c3e86ee01fb..c80b3b5074921 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -122,12 +123,30 @@ public void onFailedEngine(String reason, @Nullable Exception e) { final String translogUUID = Translog.createEmptyTranslog(translogConfig.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm); store.associateIndexWithNewTranslog(translogUUID); - EngineConfig config = new EngineConfig(shardId, allocationId, threadPool, - indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), - eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null, - new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, Collections::emptyList, - () -> primaryTerm, EngineTestCase.tombstoneDocSupplier()); + EngineConfig config = new EngineConfig( + shardId, + allocationId, + threadPool, + indexSettings, + null, + store, + newMergePolicy(), + iwc.getAnalyzer(), + iwc.getSimilarity(), + new CodecService(null, logger), + eventListener, + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + translogConfig, + TimeValue.timeValueMinutes(5), + Collections.singletonList(listeners), + Collections.emptyList(), + null, + new NoneCircuitBreakerService(), + () -> SequenceNumbers.NO_OPS_PERFORMED, + () -> RetentionLeases.EMPTY, + () -> primaryTerm, + EngineTestCase.tombstoneDocSupplier()); engine = new InternalEngine(config); engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index d893168b08205..e09455b55bd52 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -84,7 +84,7 @@ import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.ReplicationTracker; -import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -105,7 +105,6 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -586,7 +585,7 @@ public EngineConfig config(IndexSettings indexSettings, Store store, Path transl refreshListener, indexSort, globalCheckpointSupplier, - globalCheckpointSupplier == null ? null : Collections::emptyList); + globalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY); } public EngineConfig config( @@ -597,7 +596,7 @@ public EngineConfig config( final ReferenceManager.RefreshListener refreshListener, final Sort indexSort, final LongSupplier globalCheckpointSupplier, - final Supplier> retentionLeasesSupplier) { + final Supplier retentionLeasesSupplier) { return config( indexSettings, store, @@ -625,7 +624,7 @@ public EngineConfig config(IndexSettings indexSettings, Store store, Path transl internalRefreshListener, indexSort, maybeGlobalCheckpointSupplier, - maybeGlobalCheckpointSupplier == null ? null : Collections::emptyList, + maybeGlobalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY, breakerService); } @@ -638,7 +637,7 @@ public EngineConfig config( final ReferenceManager.RefreshListener internalRefreshListener, final Sort indexSort, final @Nullable LongSupplier maybeGlobalCheckpointSupplier, - final @Nullable Supplier> maybeRetentionLeasesSupplier, + final @Nullable Supplier maybeRetentionLeasesSupplier, final CircuitBreakerService breakerService) { final IndexWriterConfig iwc = newIndexWriterConfig(); final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); @@ -648,7 +647,7 @@ public EngineConfig config( final List intRefreshListenerList = internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener); final LongSupplier globalCheckpointSupplier; - final Supplier> retentionLeasesSupplier; + final Supplier retentionLeasesSupplier; if (maybeGlobalCheckpointSupplier == null) { assert maybeRetentionLeasesSupplier == null; final ReplicationTracker replicationTracker = new ReplicationTracker( diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index bccc5fed8364e..df406a4c09a68 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.engine.TranslogHandler; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -270,7 +271,7 @@ public void onFailedEngine(String reason, Exception e) { null, new NoneCircuitBreakerService(), globalCheckpoint::longValue, - Collections::emptyList, + () -> RetentionLeases.EMPTY, () -> primaryTerm.get(), EngineTestCase.tombstoneDocSupplier()); }