Skip to content

Commit

Permalink
Add batch async shard fetch transport action for replica
Browse files Browse the repository at this point in the history
Modify async replica shard fetch to use helper functions

Signed-off-by: sudarshan baliga <[email protected]>
  • Loading branch information
sudarshan-baliga committed Jun 29, 2023
1 parent 96a8073 commit 0919259
Show file tree
Hide file tree
Showing 7 changed files with 713 additions and 301 deletions.
123 changes: 111 additions & 12 deletions server/src/main/java/org/opensearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,11 @@
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.set.Sets;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.store.TransportNodesBatchListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -90,24 +88,34 @@ public class GatewayAllocator implements ExistingShardsAllocator {
ConcurrentCollections.newConcurrentMap();



private Map<DiscoveryNode, TransportNodesBulkListGatewayStartedShards.BulkOfNodeGatewayStartedShards> shardsPerNode= ConcurrentCollections.newConcurrentMap();
private Map<DiscoveryNode, TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> shardStoresPerNode= ConcurrentCollections.newConcurrentMap();


private AsyncShardsFetchPerNode<TransportNodesBulkListGatewayStartedShards.BulkOfNodeGatewayStartedShards> fetchShardsFromNodes=null;
private AsyncShardsFetchPerNode<TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> fetchShardStoreFromNodes=null;



private Set<String> lastSeenEphemeralIds = Collections.emptySet();
TransportNodesBulkListGatewayStartedShards testAction;
TransportNodesBatchListShardStoreMetadata testStoreAction;

@Inject
public GatewayAllocator(
RerouteService rerouteService,
TransportNodesListGatewayStartedShards startedAction,
TransportNodesListShardStoreMetadata storeAction,
TransportNodesBulkListGatewayStartedShards testAction
TransportNodesBulkListGatewayStartedShards testAction,
TransportNodesBatchListShardStoreMetadata testStoreAction
) {
this.rerouteService = rerouteService;
this.primaryShardAllocator = new TestInternalPrimaryShardAllocator(testAction);
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
// this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
this.replicaShardAllocator = new TestInternalReplicaShardAllocator(testStoreAction);
this.testAction=testAction;
this.testStoreAction = testStoreAction;
}

@Override
Expand All @@ -117,7 +125,9 @@ public void cleanCaches() {
Releasables.close(asyncFetchStore.values());
asyncFetchStore.clear();
Releasables.close(fetchShardsFromNodes);
Releasables.close(fetchShardStoreFromNodes);
shardsPerNode.clear();
shardStoresPerNode.clear();
}

// for tests
Expand Down Expand Up @@ -148,9 +158,12 @@ public void applyStartedShards(final List<ShardRouting> startedShards, final Rou

// clean async object and cache for per DiscoverNode if all shards are assigned and none are ignore list
if (allocation.routingNodes().unassigned().isEmpty() && allocation.routingNodes().unassigned().isIgnoredEmpty()){
Releasables.close(fetchShardsFromNodes);
Releasables.close(fetchShardsFromNodes);
shardsPerNode.clear();
shardStoresPerNode.clear();
fetchShardsFromNodes =null;
fetchShardStoreFromNodes = null;
}
}

Expand All @@ -165,6 +178,7 @@ public void applyFailedShards(final List<FailedShard> failedShards, final Routin
if (allocation.routingNodes().unassigned().isEmpty() && allocation.routingNodes().unassigned().isIgnoredEmpty()){
Releasables.close(fetchShardsFromNodes);
shardsPerNode.clear();
shardStoresPerNode.clear();
}
}

Expand All @@ -176,6 +190,7 @@ public void beforeAllocation(final RoutingAllocation allocation) {

//build the view of shards per node here by doing transport calls on nodes and populate shardsPerNode
collectShardsPerNode(allocation);
collectShardsStorePerNode(allocation);

}

Expand All @@ -201,7 +216,7 @@ public void allocateUnassigned(

private synchronized Map<DiscoveryNode, TransportNodesBulkListGatewayStartedShards.BulkOfNodeGatewayStartedShards> collectShardsPerNode(RoutingAllocation allocation) {

Map<ShardId, String> batchOfUnassignedShardsWithCustomDataPath = getBatchOfUnassignedShardsWithCustomDataPath(allocation);
Map<ShardId, String> batchOfUnassignedShardsWithCustomDataPath = getBatchOfUnassignedShardsWithCustomDataPath(allocation, true);
if (fetchShardsFromNodes == null) {
if (batchOfUnassignedShardsWithCustomDataPath.isEmpty()){
return null;
Expand Down Expand Up @@ -236,11 +251,49 @@ private synchronized Map<DiscoveryNode, TransportNodesBulkListGatewayStartedShar
}
}

private Map<ShardId, String> getBatchOfUnassignedShardsWithCustomDataPath(RoutingAllocation allocation){
private synchronized Map<DiscoveryNode, TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> collectShardsStorePerNode(RoutingAllocation allocation) {
logger.info("sdarbStore Collecting of total shards ={}, over transport");
Map<ShardId, String> batchOfUnassignedShardsWithCustomDataPath = getBatchOfUnassignedShardsWithCustomDataPath(allocation, false);
if (fetchShardStoreFromNodes == null) {
if (batchOfUnassignedShardsWithCustomDataPath.isEmpty()){
return null;
}
fetchShardStoreFromNodes = new TestAsyncShardFetch<>(logger, "collect_store", batchOfUnassignedShardsWithCustomDataPath, testStoreAction);
} else {
//verify if any new shards need to be batched?

// even if one shard is not in the map, we now update the batch with all unassigned shards
if (batchOfUnassignedShardsWithCustomDataPath.keySet().stream().allMatch(shard -> fetchShardStoreFromNodes.shardsToCustomDataPathMap.containsKey(shard)) == false) {
// right now update the complete map, but this can be optimized with only the diff
logger.info("Shards Batch not equal, updating it");
if (fetchShardStoreFromNodes.shardsToCustomDataPathMap.keySet().equals(batchOfUnassignedShardsWithCustomDataPath.keySet()) == false) {
fetchShardStoreFromNodes.updateBatchOfShards(batchOfUnassignedShardsWithCustomDataPath);
}
}
}

AsyncShardsFetchPerNode.TestFetchResult<TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> shardStoreTestFetchResult = fetchShardStoreFromNodes.testFetchData(allocation.nodes());

if (shardStoreTestFetchResult.getNodesToShards()==null)
{
logger.info("sdarbStore Fetching probably still going on some nodes for number of shards={}, current fetch = {}",fetchShardsFromNodes.shardsToCustomDataPathMap.size(),fetchShardsFromNodes.cache.size());
return null;
}
else {
logger.info("sdarbStore Fetching from nodes done with size of nodes fetched= {}", shardStoreTestFetchResult.getNodesToShards().size());
// update the view for GatewayAllocator
shardStoresPerNode = shardStoreTestFetchResult.getNodesToShards();
return shardStoresPerNode;
}
}

private Map<ShardId, String> getBatchOfUnassignedShardsWithCustomDataPath(RoutingAllocation allocation, boolean primary){
Map<ShardId, String> map = new HashMap<>();
RoutingNodes.UnassignedShards allUnassignedShards = allocation.routingNodes().unassigned();
for (ShardRouting shardIterator : allUnassignedShards) {
if (shardIterator.primary())
if(primary && shardIterator.primary())
map.put(shardIterator.shardId(), IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shardIterator.index()).getSettings()));
else if(!primary && !shardIterator.primary())
map.put(shardIterator.shardId(), IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shardIterator.index()).getSettings()));
}
return map;
Expand Down Expand Up @@ -364,7 +417,7 @@ class TestAsyncShardFetch<T extends BaseNodeResponse> extends AsyncShardsFetchPe

@Override
protected void reroute( String reason) {
logger.trace("TEST--scheduling reroute for {}", reason);
logger.info("sdarbReroute TEST--scheduling reroute for {}", reason);
assert rerouteService != null;
rerouteService.reroute(
"TEST_async_shard_fetch",
Expand Down Expand Up @@ -444,6 +497,47 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
}
}

class TestInternalReplicaShardAllocator extends ReplicaShardAllocator {


private final TransportNodesBatchListShardStoreMetadata storeAction;

TestInternalReplicaShardAllocator(TransportNodesBatchListShardStoreMetadata storeAction) {
this.storeAction = storeAction;
}
@Override
protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> fetchData(ShardRouting shard, RoutingAllocation allocation) {
ShardId shardId = shard.shardId();
Map<DiscoveryNode, TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> discoveryNodeListOfNodeGatewayStartedShardsMap = shardStoresPerNode;

if (shardsPerNode.isEmpty()) {
return new AsyncShardFetch.FetchResult<>(shardId, null, Collections.emptySet());
}
HashMap<DiscoveryNode, TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> dataToAdapt = new HashMap<>();
for (DiscoveryNode node : discoveryNodeListOfNodeGatewayStartedShardsMap.keySet()) {

TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch shardsStoreOnThatNode = discoveryNodeListOfNodeGatewayStartedShardsMap.get(node);
if (shardsStoreOnThatNode.getNodeStoreFilesMetadataBatch().containsKey(shardId)) {
TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadata nodeGatewayStartedShardsFromAdapt = shardsStoreOnThatNode.getNodeStoreFilesMetadataBatch().get(shardId);
// construct a object to adapt
TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata nodeGatewayStartedShardsToAdapt = new TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata(node, new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata(
nodeGatewayStartedShardsFromAdapt.storeFilesMetadata().shardId(),
nodeGatewayStartedShardsFromAdapt.storeFilesMetadata().getMetadataSnapshot(),
nodeGatewayStartedShardsFromAdapt.storeFilesMetadata().peerRecoveryRetentionLeases()
));
dataToAdapt.put(node, nodeGatewayStartedShardsToAdapt);
}
}
logger.info("sdarbStore replica data {}", dataToAdapt);
return new AsyncShardFetch.FetchResult<>(shardId, dataToAdapt, Collections.emptySet());
}

@Override
protected boolean hasInitiatedFetching(ShardRouting shard) {
return false;
}
}


class InternalReplicaShardAllocator extends ReplicaShardAllocator {

Expand Down Expand Up @@ -480,7 +574,12 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeS

@Override
protected boolean hasInitiatedFetching(ShardRouting shard) {
return asyncFetchStore.get(shard.shardId()) != null;
boolean fetchingDone = false;
for(Map.Entry<String, AsyncShardsFetchPerNode.NodeEntry<TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch>> asyncFetchStore : fetchShardStoreFromNodes.cache.entrySet()) {
fetchingDone = fetchingDone | asyncFetchStore.getValue().isFetching();
}
logger.info("sdarbStore fetchingDone {}", fetchingDone);
return fetchingDone;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,10 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;

Expand Down Expand Up @@ -107,7 +101,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
Expand Down Expand Up @@ -224,7 +218,7 @@ public AllocateUnassignedDecision makeAllocationDecision(
}
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// we want to let the replica be allocated in order to expose the actual problem with the primary that the replica
Expand Down Expand Up @@ -358,7 +352,7 @@ private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(
/**
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
*/
private static TransportNodesListShardStoreMetadata.StoreFilesMetadata findStore(
private static TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata findStore(
DiscoveryNode node,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data
) {
Expand All @@ -374,7 +368,7 @@ private MatchingNodes findMatchingNodes(
RoutingAllocation allocation,
boolean noMatchFailedNodes,
DiscoveryNode primaryNode,
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data,
boolean explain
) {
Expand All @@ -387,7 +381,7 @@ private MatchingNodes findMatchingNodes(
&& shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) {
continue;
}
TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata();
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata();
// we don't have any files at all, it is an empty index
if (storeFilesMetadata.isEmpty()) {
continue;
Expand Down Expand Up @@ -443,8 +437,8 @@ private MatchingNodes findMatchingNodes(
}

private static long computeMatchingBytes(
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata
) {
long sizeMatched = 0;
for (StoreFileMetadata storeFileMetadata : storeFilesMetadata) {
Expand All @@ -457,18 +451,18 @@ private static long computeMatchingBytes(
}

private static boolean hasMatchingSyncId(
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore
) {
String primarySyncId = primaryStore.syncId();
return primarySyncId != null && primarySyncId.equals(replicaStore.syncId());
}

private static MatchingNode computeMatchingNode(
DiscoveryNode primaryNode,
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
DiscoveryNode replicaNode,
TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore
) {
final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode);
final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode);
Expand All @@ -479,7 +473,7 @@ private static MatchingNode computeMatchingNode(
}

private static boolean canPerformOperationBasedRecovery(
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> shardStores,
DiscoveryNode targetNode
) {
Expand Down
Loading

0 comments on commit 0919259

Please sign in to comment.