From 0919259abcb2b3aa9c85d0b69cbeca2818c36b1e Mon Sep 17 00:00:00 2001 From: sudarshan baliga Date: Thu, 29 Jun 2023 16:41:42 +0530 Subject: [PATCH] Add batch async shard fetch transport action for replica Modify async replica shard fetch to use helper functions Signed-off-by: sudarshan baliga --- .../opensearch/gateway/GatewayAllocator.java | 123 ++++++- .../gateway/ReplicaShardAllocator.java | 34 +- ...sportNodesBatchListShardStoreMetadata.java | 320 ++++++++++++++++++ .../TransportNodesListShardStoreMetadata.java | 205 +---------- ...portNodesListShardStoreMetadataHelper.java | 234 +++++++++++++ .../gateway/ReplicaShardAllocatorTests.java | 36 +- .../opensearch/index/store/StoreTests.java | 62 +--- 7 files changed, 713 insertions(+), 301 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/store/TransportNodesBatchListShardStoreMetadata.java create mode 100644 server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index da77799c9f37e..459b0c5450aa7 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -55,13 +55,11 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.store.TransportNodesBatchListShardStoreMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -90,24 +88,34 @@ public class GatewayAllocator implements ExistingShardsAllocator { ConcurrentCollections.newConcurrentMap(); + private Map shardsPerNode= ConcurrentCollections.newConcurrentMap(); + private Map shardStoresPerNode= ConcurrentCollections.newConcurrentMap(); + private AsyncShardsFetchPerNode fetchShardsFromNodes=null; + private AsyncShardsFetchPerNode fetchShardStoreFromNodes=null; + + private Set lastSeenEphemeralIds = Collections.emptySet(); TransportNodesBulkListGatewayStartedShards testAction; + TransportNodesBatchListShardStoreMetadata testStoreAction; @Inject public GatewayAllocator( RerouteService rerouteService, TransportNodesListGatewayStartedShards startedAction, TransportNodesListShardStoreMetadata storeAction, - TransportNodesBulkListGatewayStartedShards testAction + TransportNodesBulkListGatewayStartedShards testAction, + TransportNodesBatchListShardStoreMetadata testStoreAction ) { this.rerouteService = rerouteService; this.primaryShardAllocator = new TestInternalPrimaryShardAllocator(testAction); - this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); +// this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); + this.replicaShardAllocator = new TestInternalReplicaShardAllocator(testStoreAction); this.testAction=testAction; + this.testStoreAction = testStoreAction; } @Override @@ -117,7 +125,9 @@ public void cleanCaches() { Releasables.close(asyncFetchStore.values()); asyncFetchStore.clear(); Releasables.close(fetchShardsFromNodes); + Releasables.close(fetchShardStoreFromNodes); shardsPerNode.clear(); + shardStoresPerNode.clear(); } // for tests @@ -148,9 +158,12 @@ public void applyStartedShards(final List startedShards, final Rou // clean async object and cache for per DiscoverNode if all shards are assigned and none are ignore list if (allocation.routingNodes().unassigned().isEmpty() && allocation.routingNodes().unassigned().isIgnoredEmpty()){ + Releasables.close(fetchShardsFromNodes); Releasables.close(fetchShardsFromNodes); shardsPerNode.clear(); + shardStoresPerNode.clear(); fetchShardsFromNodes =null; + fetchShardStoreFromNodes = null; } } @@ -165,6 +178,7 @@ public void applyFailedShards(final List failedShards, final Routin if (allocation.routingNodes().unassigned().isEmpty() && allocation.routingNodes().unassigned().isIgnoredEmpty()){ Releasables.close(fetchShardsFromNodes); shardsPerNode.clear(); + shardStoresPerNode.clear(); } } @@ -176,6 +190,7 @@ public void beforeAllocation(final RoutingAllocation allocation) { //build the view of shards per node here by doing transport calls on nodes and populate shardsPerNode collectShardsPerNode(allocation); + collectShardsStorePerNode(allocation); } @@ -201,7 +216,7 @@ public void allocateUnassigned( private synchronized Map collectShardsPerNode(RoutingAllocation allocation) { - Map batchOfUnassignedShardsWithCustomDataPath = getBatchOfUnassignedShardsWithCustomDataPath(allocation); + Map batchOfUnassignedShardsWithCustomDataPath = getBatchOfUnassignedShardsWithCustomDataPath(allocation, true); if (fetchShardsFromNodes == null) { if (batchOfUnassignedShardsWithCustomDataPath.isEmpty()){ return null; @@ -236,11 +251,49 @@ private synchronized Map getBatchOfUnassignedShardsWithCustomDataPath(RoutingAllocation allocation){ + private synchronized Map collectShardsStorePerNode(RoutingAllocation allocation) { + logger.info("sdarbStore Collecting of total shards ={}, over transport"); + Map batchOfUnassignedShardsWithCustomDataPath = getBatchOfUnassignedShardsWithCustomDataPath(allocation, false); + if (fetchShardStoreFromNodes == null) { + if (batchOfUnassignedShardsWithCustomDataPath.isEmpty()){ + return null; + } + fetchShardStoreFromNodes = new TestAsyncShardFetch<>(logger, "collect_store", batchOfUnassignedShardsWithCustomDataPath, testStoreAction); + } else { + //verify if any new shards need to be batched? + + // even if one shard is not in the map, we now update the batch with all unassigned shards + if (batchOfUnassignedShardsWithCustomDataPath.keySet().stream().allMatch(shard -> fetchShardStoreFromNodes.shardsToCustomDataPathMap.containsKey(shard)) == false) { + // right now update the complete map, but this can be optimized with only the diff + logger.info("Shards Batch not equal, updating it"); + if (fetchShardStoreFromNodes.shardsToCustomDataPathMap.keySet().equals(batchOfUnassignedShardsWithCustomDataPath.keySet()) == false) { + fetchShardStoreFromNodes.updateBatchOfShards(batchOfUnassignedShardsWithCustomDataPath); + } + } + } + + AsyncShardsFetchPerNode.TestFetchResult shardStoreTestFetchResult = fetchShardStoreFromNodes.testFetchData(allocation.nodes()); + + if (shardStoreTestFetchResult.getNodesToShards()==null) + { + logger.info("sdarbStore Fetching probably still going on some nodes for number of shards={}, current fetch = {}",fetchShardsFromNodes.shardsToCustomDataPathMap.size(),fetchShardsFromNodes.cache.size()); + return null; + } + else { + logger.info("sdarbStore Fetching from nodes done with size of nodes fetched= {}", shardStoreTestFetchResult.getNodesToShards().size()); + // update the view for GatewayAllocator + shardStoresPerNode = shardStoreTestFetchResult.getNodesToShards(); + return shardStoresPerNode; + } + } + + private Map getBatchOfUnassignedShardsWithCustomDataPath(RoutingAllocation allocation, boolean primary){ Map map = new HashMap<>(); RoutingNodes.UnassignedShards allUnassignedShards = allocation.routingNodes().unassigned(); for (ShardRouting shardIterator : allUnassignedShards) { - if (shardIterator.primary()) + if(primary && shardIterator.primary()) + map.put(shardIterator.shardId(), IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shardIterator.index()).getSettings())); + else if(!primary && !shardIterator.primary()) map.put(shardIterator.shardId(), IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shardIterator.index()).getSettings())); } return map; @@ -364,7 +417,7 @@ class TestAsyncShardFetch extends AsyncShardsFetchPe @Override protected void reroute( String reason) { - logger.trace("TEST--scheduling reroute for {}", reason); + logger.info("sdarbReroute TEST--scheduling reroute for {}", reason); assert rerouteService != null; rerouteService.reroute( "TEST_async_shard_fetch", @@ -444,6 +497,47 @@ protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { + ShardId shardId = shard.shardId(); + Map discoveryNodeListOfNodeGatewayStartedShardsMap = shardStoresPerNode; + + if (shardsPerNode.isEmpty()) { + return new AsyncShardFetch.FetchResult<>(shardId, null, Collections.emptySet()); + } + HashMap dataToAdapt = new HashMap<>(); + for (DiscoveryNode node : discoveryNodeListOfNodeGatewayStartedShardsMap.keySet()) { + + TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch shardsStoreOnThatNode = discoveryNodeListOfNodeGatewayStartedShardsMap.get(node); + if (shardsStoreOnThatNode.getNodeStoreFilesMetadataBatch().containsKey(shardId)) { + TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadata nodeGatewayStartedShardsFromAdapt = shardsStoreOnThatNode.getNodeStoreFilesMetadataBatch().get(shardId); + // construct a object to adapt + TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata nodeGatewayStartedShardsToAdapt = new TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata(node, new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( + nodeGatewayStartedShardsFromAdapt.storeFilesMetadata().shardId(), + nodeGatewayStartedShardsFromAdapt.storeFilesMetadata().getMetadataSnapshot(), + nodeGatewayStartedShardsFromAdapt.storeFilesMetadata().peerRecoveryRetentionLeases() + )); + dataToAdapt.put(node, nodeGatewayStartedShardsToAdapt); + } + } + logger.info("sdarbStore replica data {}", dataToAdapt); + return new AsyncShardFetch.FetchResult<>(shardId, dataToAdapt, Collections.emptySet()); + } + + @Override + protected boolean hasInitiatedFetching(ShardRouting shard) { + return false; + } + } + class InternalReplicaShardAllocator extends ReplicaShardAllocator { @@ -480,7 +574,12 @@ protected AsyncShardFetch.FetchResult> asyncFetchStore : fetchShardStoreFromNodes.cache.entrySet()) { + fetchingDone = fetchingDone | asyncFetchStore.getValue().isFetching(); + } + logger.info("sdarbStore fetchingDone {}", fetchingDone); + return fetchingDone; } } } diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java index c0b831b6fe4d0..d15e8e5522f08 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java @@ -52,16 +52,10 @@ import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; @@ -107,7 +101,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) { assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary"; assert primaryShard.currentNodeId() != null; final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); - final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); + final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // just let the recovery find it out, no need to do anything about it for the initializing shard @@ -224,7 +218,7 @@ public AllocateUnassignedDecision makeAllocationDecision( } assert primaryShard.currentNodeId() != null; final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); - final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); + final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // we want to let the replica be allocated in order to expose the actual problem with the primary that the replica @@ -358,7 +352,7 @@ private static List augmentExplanationsWithStoreInfo( /** * Finds the store for the assigned shard in the fetched data, returns null if none is found. */ - private static TransportNodesListShardStoreMetadata.StoreFilesMetadata findStore( + private static TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata findStore( DiscoveryNode node, AsyncShardFetch.FetchResult data ) { @@ -374,7 +368,7 @@ private MatchingNodes findMatchingNodes( RoutingAllocation allocation, boolean noMatchFailedNodes, DiscoveryNode primaryNode, - TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, AsyncShardFetch.FetchResult data, boolean explain ) { @@ -387,7 +381,7 @@ private MatchingNodes findMatchingNodes( && shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) { continue; } - TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata(); + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata(); // we don't have any files at all, it is an empty index if (storeFilesMetadata.isEmpty()) { continue; @@ -443,8 +437,8 @@ private MatchingNodes findMatchingNodes( } private static long computeMatchingBytes( - TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, - TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata ) { long sizeMatched = 0; for (StoreFileMetadata storeFileMetadata : storeFilesMetadata) { @@ -457,8 +451,8 @@ private static long computeMatchingBytes( } private static boolean hasMatchingSyncId( - TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, - TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore ) { String primarySyncId = primaryStore.syncId(); return primarySyncId != null && primarySyncId.equals(replicaStore.syncId()); @@ -466,9 +460,9 @@ private static boolean hasMatchingSyncId( private static MatchingNode computeMatchingNode( DiscoveryNode primaryNode, - TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, DiscoveryNode replicaNode, - TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore ) { final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode); final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode); @@ -479,7 +473,7 @@ private static MatchingNode computeMatchingNode( } private static boolean canPerformOperationBasedRecovery( - TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, AsyncShardFetch.FetchResult shardStores, DiscoveryNode targetNode ) { diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesBatchListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesBatchListShardStoreMetadata.java new file mode 100644 index 0000000000000..ea46cbc16c463 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesBatchListShardStoreMetadata.java @@ -0,0 +1,320 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.store; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionType; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.*; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.settings.Settings; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.gateway.AsyncShardsFetchPerNode; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Transport action for fetching the batch of shard stores Metadata from a list of transport nodes + * + * @opensearch.internal + */ +public class TransportNodesBatchListShardStoreMetadata extends TransportNodesAction< + TransportNodesBatchListShardStoreMetadata.Request, + TransportNodesBatchListShardStoreMetadata.NodesStoreFilesMetadata, + TransportNodesBatchListShardStoreMetadata.NodeRequest, + TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> implements + AsyncShardsFetchPerNode.Lister< + TransportNodesBatchListShardStoreMetadata.NodesStoreFilesMetadata, + TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> { + + public static final String ACTION_NAME = "internal:cluster/nodes/indices/shard/store/batch"; + public static final ActionType TYPE = new ActionType<>(ACTION_NAME, TransportNodesListShardStoreMetadata.NodesStoreFilesMetadata::new); + + private final Settings settings; + private final IndicesService indicesService; + private final NodeEnvironment nodeEnv; + + @Inject + public TransportNodesBatchListShardStoreMetadata( + Settings settings, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + IndicesService indicesService, + NodeEnvironment nodeEnv, + ActionFilters actionFilters + ) { + super( + ACTION_NAME, + threadPool, + clusterService, + transportService, + actionFilters, + Request::new, + NodeRequest::new, + ThreadPool.Names.FETCH_SHARD_STORE, + NodeStoreFilesMetadataBatch.class + ); + this.settings = settings; + this.indicesService = indicesService; + this.nodeEnv = nodeEnv; + } + + @Override + public void list(DiscoveryNode[] nodes, Map shardIdsWithCustomDataPath, ActionListener listener) { + execute(new TransportNodesBatchListShardStoreMetadata.Request(shardIdsWithCustomDataPath, nodes), listener); + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(request); + } + + @Override + protected NodeStoreFilesMetadataBatch newNodeResponse(StreamInput in) throws IOException { + return new NodeStoreFilesMetadataBatch(in); + } + + @Override + protected NodesStoreFilesMetadata newResponse( + Request request, + List responses, + List failures + ) { + return new NodesStoreFilesMetadata(clusterService.getClusterName(), responses, failures); + } + + @Override + protected NodeStoreFilesMetadataBatch nodeOperation(NodeRequest request) { + try { + return new NodeStoreFilesMetadataBatch(clusterService.localNode(), listStoreMetadata(request)); + } catch (IOException e) { + throw new OpenSearchException("Failed to list store metadata for shards [" + request.getShardIdsWithCustomDataPath() + "]", e); + } + } + + /** + * This method is similar to listStoreMetadata method of {@link TransportNodesListShardStoreMetadata} + * In this case we fetch the shard store files for batch of shards instead of one shard. + */ + private Map listStoreMetadata(NodeRequest request) throws IOException { + Map shardStoreMetadataMap = new HashMap(); + for(Map.Entry shardToCustomDataPathEntry: request.getShardIdsWithCustomDataPath().entrySet()) { + final ShardId shardId = shardToCustomDataPathEntry.getKey(); + try { + logger.debug("Listing store meta data for {}", shardId); + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata nodeStoreFilesMetadata = TransportNodesListShardStoreMetadataHelper.getStoreFilesMetadata( + logger, + indicesService, + clusterService, + shardId, + shardToCustomDataPathEntry.getValue(), + settings, + nodeEnv + ); + shardStoreMetadataMap.put( + shardId, + new NodeStoreFilesMetadata( + nodeStoreFilesMetadata, + null + ) + ); + } catch (Exception storeFileFetchException) { + logger.trace(new ParameterizedMessage("Unable to fetch store files for shard [{}]", shardId), storeFileFetchException); + shardStoreMetadataMap.put( + shardId, + new NodeStoreFilesMetadata( + null, + storeFileFetchException + ) + ); + } + } + logger.info("Loaded store meta data for {} shards", shardStoreMetadataMap); + return shardStoreMetadataMap; + } + + + /** + * The request + * + * @opensearch.internal + */ + public static class Request extends BaseNodesRequest { + + private final Map shardIdsWithCustomDataPath; + + + public Request(StreamInput in) throws IOException { + super(in); + shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); + } + + public Request(Map shardIdsWithCustomDataPath, DiscoveryNode[] nodes) { + super(nodes); + this.shardIdsWithCustomDataPath = Objects.requireNonNull(shardIdsWithCustomDataPath); + } + + public Map getShardIdsWithCustomDataPath() { + return shardIdsWithCustomDataPath; + } + + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); + } + } + + /** + * Metadata for the nodes store files + * + * @opensearch.internal + */ + public static class NodesStoreFilesMetadata extends BaseNodesResponse { + + public NodesStoreFilesMetadata(StreamInput in) throws IOException { + super(in); + } + + public NodesStoreFilesMetadata(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeStoreFilesMetadataBatch::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + } + + /** + * The metadata for the node store files + * + * @opensearch.internal + */ + public static class NodeStoreFilesMetadata { + + private TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata; + private Exception storeFileFetchException; + + public NodeStoreFilesMetadata(TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata) { + this.storeFilesMetadata = storeFilesMetadata; + this.storeFileFetchException = null; + } + + public NodeStoreFilesMetadata(StreamInput in) throws IOException { + storeFilesMetadata = new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata(in); + this.storeFileFetchException = null; + } + + public NodeStoreFilesMetadata(TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata, Exception storeFileFetchException) { + this.storeFilesMetadata = storeFilesMetadata; + this.storeFileFetchException = storeFileFetchException; + } + + public TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata() { + return storeFilesMetadata; + } + + public static NodeStoreFilesMetadata readListShardStoreNodeOperationResponse(StreamInput in) throws IOException { + return new NodeStoreFilesMetadata(in); + } + + public void writeTo(StreamOutput out) throws IOException { + storeFilesMetadata.writeTo(out); + } + + public Exception getStoreFileFetchException() { + return storeFileFetchException; + } + + + @Override + public String toString() { + return "[[" + storeFilesMetadata + "]]"; + } + } + + /** + * The node request + * + * @opensearch.internal + */ + public static class NodeRequest extends BaseNodeRequest { + + private final Map shardIdsWithCustomDataPath; + + public NodeRequest(StreamInput in) throws IOException { + super(in); + shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); + } + + public NodeRequest(Request request) { + this.shardIdsWithCustomDataPath = Objects.requireNonNull(request.getShardIdsWithCustomDataPath()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); + } + + public Map getShardIdsWithCustomDataPath() { + return shardIdsWithCustomDataPath; + } + } + + + public static class NodeStoreFilesMetadataBatch extends BaseNodeResponse { + private final Map nodeStoreFilesMetadataBatch; + protected NodeStoreFilesMetadataBatch(StreamInput in) throws IOException { + super(in); + this.nodeStoreFilesMetadataBatch = in.readMap(ShardId::new, NodeStoreFilesMetadata::new); + } + + + public NodeStoreFilesMetadataBatch(DiscoveryNode node, Map nodeStoreFilesMetadataBatch) { + super(node); + this.nodeStoreFilesMetadataBatch = nodeStoreFilesMetadataBatch; + } + + public Map getNodeStoreFilesMetadataBatch() { + return this.nodeStoreFilesMetadataBatch; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(nodeStoreFilesMetadataBatch, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); + } + } + +} diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java index b49cdcd127962..6bcfbcc5274e6 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -32,50 +32,31 @@ package org.opensearch.indices.store; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionType; import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.nodes.BaseNodeRequest; -import org.opensearch.action.support.nodes.BaseNodeResponse; -import org.opensearch.action.support.nodes.BaseNodesRequest; -import org.opensearch.action.support.nodes.BaseNodesResponse; -import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.action.support.nodes.*; import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; import org.opensearch.env.NodeEnvironment; import org.opensearch.gateway.AsyncShardFetch; -import org.opensearch.index.IndexService; -import org.opensearch.index.IndexSettings; -import org.opensearch.index.seqno.ReplicationTracker; -import org.opensearch.index.seqno.RetentionLease; -import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.shard.ShardPath; -import org.opensearch.index.store.Store; -import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.concurrent.TimeUnit; /** * Metadata for shard stores from a list of transport nodes @@ -158,174 +139,18 @@ protected NodeStoreFilesMetadata nodeOperation(NodeRequest request) { } } - private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOException { + private TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOException { final ShardId shardId = request.getShardId(); logger.trace("listing store meta data for {}", shardId); - long startTimeNS = System.nanoTime(); - boolean exists = false; - try { - IndexService indexService = indicesService.indexService(shardId.getIndex()); - if (indexService != null) { - IndexShard indexShard = indexService.getShardOrNull(shardId.id()); - if (indexShard != null) { - try { - final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata( - shardId, - indexShard.snapshotStoreMetadata(), - indexShard.getPeerRecoveryRetentionLeases() - ); - exists = true; - return storeFilesMetadata; - } catch (org.apache.lucene.index.IndexNotFoundException e) { - logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e); - return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); - } catch (IOException e) { - logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e); - return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); - } - } - } - final String customDataPath; - if (request.getCustomDataPath() != null) { - customDataPath = request.getCustomDataPath(); - } else { - // TODO: Fallback for BWC with older predecessor (ES) versions. - // Remove this once request.getCustomDataPath() always returns non-null - if (indexService != null) { - customDataPath = indexService.getIndexSettings().customDataPath(); - } else { - IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); - if (metadata != null) { - customDataPath = new IndexSettings(metadata, settings).customDataPath(); - } else { - logger.trace("{} node doesn't have meta data for the requests index", shardId); - throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); - } - } - } - final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); - if (shardPath == null) { - return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); - } - // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means: - // 1) a shard is being constructed, which means the cluster-manager will not use a copy of this replica - // 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the cluster-manager may not - // reuse local resources. - final Store.MetadataSnapshot metadataSnapshot = Store.readMetadataSnapshot( - shardPath.resolveIndex(), - shardId, - nodeEnv::shardLock, - logger - ); - // We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when - // we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard. - return new StoreFilesMetadata(shardId, metadataSnapshot, Collections.emptyList()); - } finally { - TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); - if (exists) { - logger.debug("{} loaded store meta data (took [{}])", shardId, took); - } else { - logger.trace("{} didn't find any store meta data to load (took [{}])", shardId, took); - } - } - } - - /** - * Metadata for store files - * - * @opensearch.internal - */ - public static class StoreFilesMetadata implements Iterable, Writeable { - private final ShardId shardId; - private final Store.MetadataSnapshot metadataSnapshot; - private final List peerRecoveryRetentionLeases; - - public StoreFilesMetadata( - ShardId shardId, - Store.MetadataSnapshot metadataSnapshot, - List peerRecoveryRetentionLeases - ) { - this.shardId = shardId; - this.metadataSnapshot = metadataSnapshot; - this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases; - } - - public StoreFilesMetadata(StreamInput in) throws IOException { - this.shardId = new ShardId(in); - this.metadataSnapshot = new Store.MetadataSnapshot(in); - if (in.getVersion().onOrAfter(LegacyESVersion.V_7_5_0)) { - this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new); - } else { - this.peerRecoveryRetentionLeases = Collections.emptyList(); - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - shardId.writeTo(out); - metadataSnapshot.writeTo(out); - if (out.getVersion().onOrAfter(LegacyESVersion.V_7_5_0)) { - out.writeList(peerRecoveryRetentionLeases); - } - } - - public ShardId shardId() { - return this.shardId; - } - - public boolean isEmpty() { - return metadataSnapshot.size() == 0; - } - - @Override - public Iterator iterator() { - return metadataSnapshot.iterator(); - } - - public boolean fileExists(String name) { - return metadataSnapshot.asMap().containsKey(name); - } - - public StoreFileMetadata file(String name) { - return metadataSnapshot.asMap().get(name); - } - - /** - * Returns the retaining sequence number of the peer recovery retention lease for a given node if exists; otherwise, returns -1. - */ - public long getPeerRecoveryRetentionLeaseRetainingSeqNo(DiscoveryNode node) { - assert node != null; - final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()); - return peerRecoveryRetentionLeases.stream() - .filter(lease -> lease.id().equals(retentionLeaseId)) - .mapToLong(RetentionLease::retainingSequenceNumber) - .findFirst() - .orElse(-1L); - } - - public List peerRecoveryRetentionLeases() { - return peerRecoveryRetentionLeases; - } - - /** - * @return commit sync id if exists, else null - */ - public String syncId() { - return metadataSnapshot.getSyncId(); - } - - @Override - public String toString() { - return "StoreFilesMetadata{" - + ", shardId=" - + shardId - + ", metadataSnapshot{size=" - + metadataSnapshot.size() - + ", syncId=" - + metadataSnapshot.getSyncId() - + "}" - + '}'; - } + return TransportNodesListShardStoreMetadataHelper.getStoreFilesMetadata( + logger, + indicesService, + clusterService, + shardId, + request.getCustomDataPath(), + settings, + nodeEnv + ); } /** @@ -463,19 +288,19 @@ public String getCustomDataPath() { */ public static class NodeStoreFilesMetadata extends BaseNodeResponse { - private StoreFilesMetadata storeFilesMetadata; + private TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata; public NodeStoreFilesMetadata(StreamInput in) throws IOException { super(in); - storeFilesMetadata = new StoreFilesMetadata(in); + storeFilesMetadata = new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata(in); } - public NodeStoreFilesMetadata(DiscoveryNode node, StoreFilesMetadata storeFilesMetadata) { + public NodeStoreFilesMetadata(DiscoveryNode node, TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata) { super(node); this.storeFilesMetadata = storeFilesMetadata; } - public StoreFilesMetadata storeFilesMetadata() { + public TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata() { return storeFilesMetadata; } diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java new file mode 100644 index 0000000000000..b4f2257c72706 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java @@ -0,0 +1,234 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.store; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.IndexService; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.seqno.ReplicationTracker; +import org.opensearch.index.seqno.RetentionLease; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.IndicesService; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Helper class for common code used in {@link TransportNodesListShardStoreMetadata} and + * {@link TransportNodesBatchListShardStoreMetadata} which are used to list the store metadata of a shard + * on a set of nodes. + * + * @opensearch.internal + */ +public class TransportNodesListShardStoreMetadataHelper { + /** + * Metadata for store files + * + * @opensearch.internal + */ + public static class StoreFilesMetadata implements Iterable, Writeable { + private final ShardId shardId; + private final Store.MetadataSnapshot metadataSnapshot; + private final List peerRecoveryRetentionLeases; + + public StoreFilesMetadata( + ShardId shardId, + Store.MetadataSnapshot metadataSnapshot, + List peerRecoveryRetentionLeases + ) { + this.shardId = shardId; + this.metadataSnapshot = metadataSnapshot; + this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases; + } + + public StoreFilesMetadata(StreamInput in) throws IOException { + this.shardId = new ShardId(in); + this.metadataSnapshot = new Store.MetadataSnapshot(in); + this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + metadataSnapshot.writeTo(out); + out.writeList(peerRecoveryRetentionLeases); + } + + public ShardId shardId() { + return this.shardId; + } + + public boolean isEmpty() { + return metadataSnapshot.size() == 0; + } + + @Override + public Iterator iterator() { + return metadataSnapshot.iterator(); + } + + public boolean fileExists(String name) { + return metadataSnapshot.asMap().containsKey(name); + } + + public StoreFileMetadata file(String name) { + return metadataSnapshot.asMap().get(name); + } + + /** + * Returns the retaining sequence number of the peer recovery retention lease for a given node if exists; otherwise, returns -1. + */ + public long getPeerRecoveryRetentionLeaseRetainingSeqNo(DiscoveryNode node) { + assert node != null; + final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()); + return peerRecoveryRetentionLeases.stream() + .filter(lease -> lease.id().equals(retentionLeaseId)) + .mapToLong(RetentionLease::retainingSequenceNumber) + .findFirst() + .orElse(-1L); + } + + public Store.MetadataSnapshot getMetadataSnapshot() { + return metadataSnapshot; + } + + public List peerRecoveryRetentionLeases() { + return peerRecoveryRetentionLeases; + } + + /** + * @return commit sync id if exists, else null + */ + public String syncId() { + return metadataSnapshot.getSyncId(); + } + + @Override + public String toString() { + return "StoreFilesMetadata{" + + ", shardId=" + + shardId + + ", metadataSnapshot{size=" + + metadataSnapshot.size() + + ", syncId=" + + metadataSnapshot.getSyncId() + + "}" + + '}'; + } + } + + /** + * Function to fetch metadata of the store files on the local node. + * @param logger + * @param indicesService + * @param clusterService + * @param shardId + * @param shardDataPathInRequest + * @param settings + * @param nodeEnv + * @return + * @throws IOException + */ + public static StoreFilesMetadata getStoreFilesMetadata( + Logger logger, + IndicesService indicesService, + ClusterService clusterService, + ShardId shardId, + String shardDataPathInRequest, + Settings settings, + NodeEnvironment nodeEnv + ) throws IOException { + boolean exists = false; + long startTimeNS = System.nanoTime(); + try { + IndexService indexService = indicesService.indexService(shardId.getIndex()); + if (indexService != null) { + IndexShard indexShard = indexService.getShardOrNull(shardId.id()); + if (indexShard != null) { + try { + final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata( + shardId, + indexShard.snapshotStoreMetadata(), + indexShard.getPeerRecoveryRetentionLeases() + ); + exists = true; + return storeFilesMetadata; + } catch (org.apache.lucene.index.IndexNotFoundException e) { + logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e); + return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + } catch (IOException e) { + logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e); + return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + } + } + } + final String customDataPath; + if (shardDataPathInRequest != null) { + customDataPath = shardDataPathInRequest; + } else { + // TODO: Fallback for BWC with older predecessor (ES) versions. + // Remove this once request.getCustomDataPath() always returns non-null + if (indexService != null) { + customDataPath = indexService.getIndexSettings().customDataPath(); + } else { + IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); + if (metadata != null) { + customDataPath = new IndexSettings(metadata, settings).customDataPath(); + } else { + logger.trace("{} node doesn't have meta data for the requests index", shardId); + throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); + } + } + } + final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); + if (shardPath == null) { + return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + } + // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means: + // 1) a shard is being constructed, which means the cluster-manager will not use a copy of this replica + // 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the cluster-manager may not + // reuse local resources. + final Store.MetadataSnapshot metadataSnapshot = Store.readMetadataSnapshot( + shardPath.resolveIndex(), + shardId, + nodeEnv::shardLock, + logger + ); + // We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when + // we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard. + return new StoreFilesMetadata(shardId, metadataSnapshot, Collections.emptyList()); + } finally { + TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); + if (exists) { + logger.debug("{} loaded store meta data (took [{}])", shardId, took); + } else { + logger.trace("{} didn't find any store meta data to load (took [{}])", shardId, took); + } + } + } + +} diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java index 36ac93524d6aa..e906816c56b3b 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java @@ -33,23 +33,16 @@ package org.opensearch.gateway; import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import org.junit.Before; import org.opensearch.Version; import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.IndexRoutingTable; -import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.cluster.routing.RecoverySource; -import org.opensearch.cluster.routing.RoutingNode; -import org.opensearch.cluster.routing.RoutingNodes; -import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.ShardRoutingState; -import org.opensearch.cluster.routing.TestShardRouting; -import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.*; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -66,25 +59,14 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; -import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; import org.opensearch.snapshots.SnapshotShardSizeInfo; -import org.junit.Before; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.unmodifiableMap; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.*; public class ReplicaShardAllocatorTests extends OpenSearchAllocationTestCase { private static final org.apache.lucene.util.Version MIN_SUPPORTED_LUCENE_VERSION = org.opensearch.Version.CURRENT @@ -664,7 +646,7 @@ static String randomSyncId() { class TestAllocator extends ReplicaShardAllocator { - private Map data = null; + private Map data = null; private AtomicBoolean fetchDataCalled = new AtomicBoolean(false); public void clean() { @@ -702,7 +684,7 @@ TestAllocator addData( } data.put( node, - new TransportNodesListShardStoreMetadata.StoreFilesMetadata( + new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( shardId, new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()), peerRecoveryRetentionLeases @@ -720,7 +702,7 @@ protected AsyncShardFetch.FetchResult tData = null; if (data != null) { tData = new HashMap<>(); - for (Map.Entry entry : data.entrySet()) { + for (Map.Entry entry : data.entrySet()) { tData.put( entry.getKey(), new TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata(entry.getKey(), entry.getValue()) diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 89b11d604d7a1..b09e61597f06e 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -32,33 +32,9 @@ package org.opensearch.index.store; import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.SortedDocValuesField; -import org.apache.lucene.document.StringField; -import org.apache.lucene.document.TextField; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexFileNames; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.index.IndexNotFoundException; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; -import org.apache.lucene.index.NoDeletionPolicy; -import org.apache.lucene.index.NoMergePolicy; -import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.index.SnapshotDeletionPolicy; -import org.apache.lucene.index.Term; -import org.apache.lucene.store.ByteBuffersDirectory; -import org.apache.lucene.store.ChecksumIndexInput; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.store.NIOFSDirectory; +import org.apache.lucene.document.*; +import org.apache.lucene.index.*; +import org.apache.lucene.store.*; import org.apache.lucene.tests.analysis.MockAnalyzer; import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.apache.lucene.tests.util.TestUtil; @@ -82,7 +58,7 @@ import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; import org.opensearch.test.DummyShardLock; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; @@ -93,29 +69,11 @@ import java.io.IOException; import java.nio.file.NoSuchFileException; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.unmodifiableMap; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.endsWith; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.*; import static org.opensearch.test.VersionUtils.randomVersion; public class StoreTests extends OpenSearchTestCase { @@ -951,8 +909,8 @@ public void testStreamStoreFilesMetadata() throws Exception { ) ); } - TransportNodesListShardStoreMetadata.StoreFilesMetadata outStoreFileMetadata = - new TransportNodesListShardStoreMetadata.StoreFilesMetadata( + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata outStoreFileMetadata = + new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( new ShardId("test", "_na_", 0), metadataSnapshot, peerRecoveryRetentionLeases @@ -965,8 +923,8 @@ public void testStreamStoreFilesMetadata() throws Exception { ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); InputStreamStreamInput in = new InputStreamStreamInput(inBuffer); in.setVersion(targetNodeVersion); - TransportNodesListShardStoreMetadata.StoreFilesMetadata inStoreFileMetadata = - new TransportNodesListShardStoreMetadata.StoreFilesMetadata(in); + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata inStoreFileMetadata = + new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata(in); Iterator outFiles = outStoreFileMetadata.iterator(); for (StoreFileMetadata inFile : inStoreFileMetadata) { assertThat(inFile.name(), equalTo(outFiles.next().name()));