diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index a5c4adc53c42a..4cf81c24fbf1a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.CommitStats; +import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; @@ -101,21 +102,25 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq // only report on fully started shards CommitStats commitStats; SeqNoStats seqNoStats; + RetentionLeaseStats retentionLeaseStats; try { commitStats = indexShard.commitStats(); seqNoStats = indexShard.seqNoStats(); - } catch (AlreadyClosedException e) { + retentionLeaseStats = indexShard.getRetentionLeaseStats(); + } catch (final AlreadyClosedException e) { // shard is closed - no stats is fine commitStats = null; seqNoStats = null; + retentionLeaseStats = null; } shardsStats.add( - new ShardStats( - indexShard.routingEntry(), - indexShard.shardPath(), - new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS), - commitStats, - seqNoStats)); + new ShardStats( + indexShard.routingEntry(), + indexShard.shardPath(), + new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS), + commitStats, + seqNoStats, + retentionLeaseStats)); } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index 898f3d69456b0..9297447695a61 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -26,22 +26,36 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.engine.CommitStats; +import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.ShardPath; import java.io.IOException; public class ShardStats implements Streamable, Writeable, ToXContentFragment { + private ShardRouting shardRouting; private CommonStats commonStats; @Nullable private CommitStats commitStats; @Nullable private SeqNoStats seqNoStats; + + @Nullable + private RetentionLeaseStats retentionLeaseStats; + + /** + * Gets the current retention lease stats. + * + * @return the current retention lease stats + */ + public RetentionLeaseStats getRetentionLeaseStats() { + return retentionLeaseStats; + } + private String dataPath; private String statePath; private boolean isCustomDataPath; @@ -49,7 +63,13 @@ public class ShardStats implements Streamable, Writeable, ToXContentFragment { ShardStats() { } - public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats, SeqNoStats seqNoStats) { + public ShardStats( + final ShardRouting routing, + final ShardPath shardPath, + final CommonStats commonStats, + final CommitStats commitStats, + final SeqNoStats seqNoStats, + final RetentionLeaseStats retentionLeaseStats) { this.shardRouting = routing; this.dataPath = shardPath.getRootDataPath().toString(); this.statePath = shardPath.getRootStatePath().toString(); @@ -57,6 +77,7 @@ public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonS this.commitStats = commitStats; this.commonStats = commonStats; this.seqNoStats = seqNoStats; + this.retentionLeaseStats = retentionLeaseStats; } /** @@ -109,6 +130,9 @@ public void readFrom(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { seqNoStats = in.readOptionalWriteable(SeqNoStats::new); } + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + retentionLeaseStats = in.readOptionalWriteable(RetentionLeaseStats::new); + } } @Override @@ -122,6 +146,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { out.writeOptionalWriteable(seqNoStats); } + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeOptionalWriteable(retentionLeaseStats); + } } @Override @@ -140,6 +167,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (seqNoStats != null) { seqNoStats.toXContent(builder, params); } + if (retentionLeaseStats != null) { + retentionLeaseStats.toXContent(builder, params); + } builder.startObject(Fields.SHARD_PATH); builder.field(Fields.STATE_PATH, statePath); builder.field(Fields.DATA_PATH, dataPath); @@ -159,4 +189,5 @@ static final class Fields { static final String NODE = "node"; static final String RELOCATING_NODE = "relocating_node"; } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index d339184c5f814..8371023738b3b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.CommitStats; +import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -106,15 +107,23 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh CommonStats commonStats = new CommonStats(indicesService.getIndicesQueryCache(), indexShard, request.flags()); CommitStats commitStats; SeqNoStats seqNoStats; + RetentionLeaseStats retentionLeaseStats; try { commitStats = indexShard.commitStats(); seqNoStats = indexShard.seqNoStats(); - } catch (AlreadyClosedException e) { + retentionLeaseStats = indexShard.getRetentionLeaseStats(); + } catch (final AlreadyClosedException e) { // shard is closed - no stats is fine commitStats = null; seqNoStats = null; + retentionLeaseStats = null; } - return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), commonStats, - commitStats, seqNoStats); + return new ShardStats( + indexShard.routingEntry(), + indexShard.shardPath(), + commonStats, + commitStats, + seqNoStats, + retentionLeaseStats); } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index 24d144d810d9c..362d068f45e2d 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -28,7 +28,9 @@ import java.util.Collection; import java.util.Collections; import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -242,4 +244,14 @@ public String toString() { '}'; } + /** + * A utility method to convert a collection of retention leases to a map from retention lease ID to retention lease. + * + * @param leases the leases + * @return the map from retention lease ID to retention lease + */ + static Map toMap(final Collection leases) { + return leases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())); + } + } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseStats.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseStats.java new file mode 100644 index 0000000000000..b8f1454a12c2b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseStats.java @@ -0,0 +1,124 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collection; +import java.util.Objects; + +/** + * Represents retention lease stats. + */ +public final class RetentionLeaseStats implements ToXContentFragment, Writeable { + + private final Collection leases; + + /** + * The underlying retention leases backing this stats object. + * + * @return the leases + */ + public Collection leases() { + return leases; + } + + /** + * Constructs a new retention lease stats object from the specified leases. + * + * @param leases the leases + */ + public RetentionLeaseStats(final Collection leases) { + this.leases = Objects.requireNonNull(leases); + } + + /** + * Constructs a new retention lease stats object from a stream. The retention lease stats should have been written via + * {@link #writeTo(StreamOutput)}. + * + * @param in the stream to construct the retention lease stats from + * @throws IOException if an I/O exception occurs reading from the stream + */ + public RetentionLeaseStats(final StreamInput in) throws IOException { + leases = in.readList(RetentionLease::new); + } + + /** + * Writes a retention lease stats object to a stream in a manner suitable for later reconstruction via + * {@link #RetentionLeaseStats(StreamInput)} (StreamInput)}. + * + * @param out the stream to write the retention lease stats to + * @throws IOException if an I/O exception occurs writing to the stream + */ + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeCollection(leases); + } + + /** + * Converts the retention lease stats to {@link org.elasticsearch.common.xcontent.XContent} using the specified builder and pararms. + * + * @param builder the builder + * @param params the params + * @return the builder that these retention leases were converted to {@link org.elasticsearch.common.xcontent.XContent} into + * @throws IOException if an I/O exception occurs writing to the builder + */ + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject("retention_leases"); + { + builder.startArray("leases"); + { + for (final RetentionLease retentionLease : leases) { + builder.startObject(); + { + builder.field("id", retentionLease.id()); + builder.field("retaining_seq_no", retentionLease.retainingSequenceNumber()); + builder.field("timestamp", retentionLease.timestamp()); + builder.field("source", retentionLease.source()); + } + builder.endObject(); + } + } + builder.endArray(); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final RetentionLeaseStats that = (RetentionLeaseStats) o; + return Objects.equals(leases, that.leases); + } + + @Override + public int hashCode() { + return Objects.hash(leases); + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index dc43d42c94a5c..48a7d64036a5f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -108,6 +108,7 @@ import org.elasticsearch.index.search.stats.ShardSearchStats; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; @@ -1895,6 +1896,11 @@ public Collection getRetentionLeases() { return replicationTracker.getRetentionLeases(); } + public RetentionLeaseStats getRetentionLeaseStats() { + verifyNotClosed(); + return new RetentionLeaseStats(getRetentionLeases()); + } + /** * Adds a new retention lease. * diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index cca63c015f1c7..9113ba3f23b14 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -95,6 +95,7 @@ import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; +import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IllegalIndexShardStateException; @@ -367,23 +368,29 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index CommitStats commitStats; SeqNoStats seqNoStats; + RetentionLeaseStats retentionLeaseStats; try { commitStats = indexShard.commitStats(); seqNoStats = indexShard.seqNoStats(); + retentionLeaseStats = indexShard.getRetentionLeaseStats(); } catch (AlreadyClosedException e) { // shard is closed - no stats is fine commitStats = null; seqNoStats = null; + retentionLeaseStats = null; } - return new IndexShardStats(indexShard.shardId(), - new ShardStats[] { - new ShardStats(indexShard.routingEntry(), - indexShard.shardPath(), - new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), - commitStats, - seqNoStats) - }); + return new IndexShardStats( + indexShard.shardId(), + new ShardStats[]{ + new ShardStats( + indexShard.routingEntry(), + indexShard.shardPath(), + new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), + commitStats, + seqNoStats, + retentionLeaseStats) + }); } /** diff --git a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index c4fcb9bdb53e2..fcccaf6c0f021 100644 --- a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -113,8 +113,8 @@ public void testFillShardLevelInfo() { CommonStats commonStats1 = new CommonStats(); commonStats1.store = new StoreStats(1000); ShardStats[] stats = new ShardStats[] { - new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, test_0.shardId()), commonStats0 , null, null), - new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, test_1.shardId()), commonStats1 , null, null) + new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, test_0.shardId()), commonStats0 , null, null, null), + new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, test_1.shardId()), commonStats1 , null, null, null) }; ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder routingToPath = ImmutableOpenMap.builder(); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java new file mode 100644 index 0000000000000..d77acc53b247e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESSingleNodeTestCase; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.equalTo; + +public class RetentionLeaseStatsTests extends ESSingleNodeTestCase { + + public void testRetentionLeaseStats() throws InterruptedException { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .build(); + createIndex("index", settings); + ensureGreen("index"); + final IndexShard primary = + node().injector().getInstance(IndicesService.class).getShardOrNull(new ShardId(resolveIndex("index"), 0)); + final int length = randomIntBetween(0, 8); + final Map currentRetentionLeases = new HashMap<>(); + for (int i = 0; i < length; i++) { + final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8)); + final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); + final String source = randomAlphaOfLength(8); + final CountDownLatch latch = new CountDownLatch(1); + final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); + currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); + latch.await(); + } + + final IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("index").execute().actionGet(); + assertThat(indicesStats.getShards(), arrayWithSize(1)); + final RetentionLeaseStats retentionLeaseStats = indicesStats.getShards()[0].getRetentionLeaseStats(); + assertThat(RetentionLease.toMap(retentionLeaseStats.leases()), equalTo(currentRetentionLeases)); + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsWireSerializingTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsWireSerializingTests.java new file mode 100644 index 0000000000000..fe5dee782c4fe --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsWireSerializingTests.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +public class RetentionLeaseStatsWireSerializingTests extends AbstractWireSerializingTestCase { + + @Override + protected RetentionLeaseStats createTestInstance() { + final int length = randomIntBetween(0, 8); + final Collection leases; + if (length == 0) { + leases = Collections.emptyList(); + } else { + leases = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + final long timestamp = randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + leases.add(new RetentionLease(id, retainingSequenceNumber, timestamp, source)); + } + } + return new RetentionLeaseStats(leases); + } + + @Override + protected Writeable.Reader instanceReader() { + return RetentionLeaseStats::new; + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java index a99d0caea8e6c..f2819bfe161eb 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java @@ -37,8 +37,6 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; @@ -46,6 +44,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) public class RetentionLeaseSyncIT extends ESIntegTestCase { public void testRetentionLeasesSyncedOnAdd() throws Exception { @@ -77,7 +76,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { // check retention leases have been committed on the primary final Collection primaryCommittedRetentionLeases = RetentionLease.decodeRetentionLeases( primary.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES)); - assertThat(currentRetentionLeases, equalTo(toMap(primaryCommittedRetentionLeases))); + assertThat(currentRetentionLeases, equalTo(RetentionLease.toMap(primaryCommittedRetentionLeases))); // check current retention leases have been synced to all replicas for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { @@ -86,13 +85,13 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - final Map retentionLeasesOnReplica = toMap(replica.getRetentionLeases()); + final Map retentionLeasesOnReplica = RetentionLease.toMap(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); // check retention leases have been committed on the replica final Collection replicaCommittedRetentionLeases = RetentionLease.decodeRetentionLeases( replica.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES)); - assertThat(currentRetentionLeases, equalTo(toMap(replicaCommittedRetentionLeases))); + assertThat(currentRetentionLeases, equalTo(RetentionLease.toMap(replicaCommittedRetentionLeases))); } } } @@ -164,8 +163,4 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { } } - private static Map toMap(final Collection replicaCommittedRetentionLeases) { - return replicaCommittedRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())); - } - } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index 26e67fb6dd264..f66b383c2799c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.threadpool.ThreadPool; @@ -81,7 +82,8 @@ public void testAddOrRenewRetentionLease() throws IOException { for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); indexShard.addRetentionLease( - Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> { + })); assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true); } @@ -115,7 +117,8 @@ private void runExpirationTest(final boolean primary) throws IOException { final long[] retainingSequenceNumbers = new long[1]; retainingSequenceNumbers[0] = randomLongBetween(0, Long.MAX_VALUE); if (primary) { - indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); + indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> { + })); } else { indexShard.updateRetentionLeasesOnReplica( Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); @@ -176,7 +179,8 @@ public void testCommit() throws IOException { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(randomNonNegativeLong())); indexShard.addRetentionLease( - Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> { + })); } currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE)); @@ -215,13 +219,52 @@ public void testCommit() throws IOException { } } + public void testRetentionLeaseStats() throws IOException { + final IndexShard indexShard = newStartedShard(true); + try { + final int length = randomIntBetween(0, 8); + final long[] minimumRetainingSequenceNumbers = new long[length]; + for (int i = 0; i < length; i++) { + minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + indexShard.addRetentionLease( + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> { + })); + } + final RetentionLeaseStats stats = indexShard.getRetentionLeaseStats(); + assertRetentionLeases( + stats.leases(), + indexShard.indexSettings().getRetentionLeaseMillis(), + length, + minimumRetainingSequenceNumbers, + () -> 0L, + true); + } finally { + closeShards(indexShard); + } + } + private void assertRetentionLeases( final IndexShard indexShard, final int size, final long[] minimumRetainingSequenceNumbers, final LongSupplier currentTimeMillisSupplier, final boolean primary) { - final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); + assertRetentionLeases( + indexShard.getEngine().config().retentionLeasesSupplier().get(), + indexShard.indexSettings().getRetentionLeaseMillis(), + size, + minimumRetainingSequenceNumbers, + currentTimeMillisSupplier, + primary); + } + + private void assertRetentionLeases( + final Collection retentionLeases, + final long retentionLeaseMillis, + final int size, + final long[] minimumRetainingSequenceNumbers, + final LongSupplier currentTimeMillisSupplier, + final boolean primary) { final Map idToRetentionLease = new HashMap<>(); for (final RetentionLease retentionLease : retentionLeases) { idToRetentionLease.put(retentionLease.id(), retentionLease); @@ -234,9 +277,7 @@ private void assertRetentionLeases( assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i])); if (primary) { // retention leases can be expired on replicas, so we can only assert on primaries here - assertThat( - currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), - lessThanOrEqualTo(indexShard.indexSettings().getRetentionLeaseMillis())); + assertThat(currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), lessThanOrEqualTo(retentionLeaseMillis)); } assertThat(retentionLease.source(), equalTo("test-" + i)); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 1ea90ec6e8fbc..94a55d1e0c400 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1385,8 +1385,13 @@ public void testMinimumCompatVersion() throws IOException { public void testShardStats() throws IOException { IndexShard shard = newStartedShard(); - ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), - new CommonStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()), shard.commitStats(), shard.seqNoStats()); + ShardStats stats = new ShardStats( + shard.routingEntry(), + shard.shardPath(), + new CommonStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()), + shard.commitStats(), + shard.seqNoStats(), + shard.getRetentionLeaseStats()); assertEquals(shard.shardPath().getRootDataPath().toString(), stats.getDataPath()); assertEquals(shard.shardPath().getRootStatePath().toString(), stats.getStatePath()); assertEquals(shard.shardPath().isCustomDataPath(), stats.isCustomDataPath()); diff --git a/server/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java index 13e94f7fe5368..83bb8b309a7cb 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java @@ -163,7 +163,7 @@ private IndicesStatsResponse randomIndicesStatsResponse(final Index[] indices) { stats.get = new GetStats(); stats.flush = new FlushStats(); stats.warmer = new WarmerStats(); - shardStats.add(new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null)); + shardStats.add(new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null)); } } return IndicesStatsTests.newIndicesStatsResponse( diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java index 66b41d40943d0..be2eb529d86be 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java @@ -47,10 +47,10 @@ public void setUp() throws Exception { super.setUp(); indicesStats = Collections.singletonList(new IndexStats("index-0", "dcvO5uZATE-EhIKc3tk9Bg", new ShardStats[] { // Primaries - new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null), - new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null), + new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null, null), + new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null, null), // Replica - new ShardStats(mockShardRouting(false), mockShardPath(), mockCommonStats(), null, null) + new ShardStats(mockShardRouting(false), mockShardPath(), mockCommonStats(), null, null, null) })); }