Skip to content

Commit

Permalink
self review
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Oct 19, 2023
1 parent accf058 commit 97a8e0c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ protected IndexShard getIndexShard(String node, ShardId shardId, String indexNam
/**
* Fetch IndexShard, assumes only a single shard per node.
*/
public static IndexShard getIndexShard(String node, String indexName) {
protected IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,7 @@ protected String setup(Path repoLocation, double ioFailureRate, String skipExcep

internalCluster().startClusterManagerOnlyNode(settings.build());
String dataNodeName = internalCluster().startDataOnlyNode(settings.build());
for (int i = 0; i < replicaCount; i++) {
internalCluster().startDataOnlyNode(settings.build());
}
internalCluster().startDataOnlyNodes(replicaCount);
createIndex(INDEX_NAME);
logger.info("--> Created index={}", INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.replication.SegmentReplicationBaseIT;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTarget;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
Expand All @@ -20,6 +22,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -49,7 +52,7 @@ public void testCancelReplicationWhileSyncingSegments() throws Exception {
SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode);
ensureGreen(INDEX_NAME);
blockNodeOnAnySegmentFile(REPOSITORY_NAME, replicaNode);
final IndexShard indexShard = SegmentReplicationBaseIT.getIndexShard(replicaNode, INDEX_NAME);
final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
indexSingleDoc();
refresh(INDEX_NAME);
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
Expand Down Expand Up @@ -84,7 +87,7 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception {
SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode);
ensureGreen(INDEX_NAME);
blockNodeOnAnyFiles(REPOSITORY_NAME, replicaNode);
final IndexShard indexShard = SegmentReplicationBaseIT.getIndexShard(replicaNode, INDEX_NAME);
final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
indexSingleDoc();
refresh(INDEX_NAME);
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
Expand All @@ -111,11 +114,20 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception {
private String getNode(Set<String> dataNodeNames, boolean primary) {
assertEquals(2, dataNodeNames.size());
for (String name : dataNodeNames) {
final IndexShard indexShard = SegmentReplicationBaseIT.getIndexShard(name, INDEX_NAME);
final IndexShard indexShard = getIndexShard(name, INDEX_NAME);
if (indexShard.routingEntry().primary() == primary) {
return name;
}
}
return null;
}

private IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService(index);
assertNotNull(indexService);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return shardId.map(indexService::getShard).orElse(null);
}
}

0 comments on commit 97a8e0c

Please sign in to comment.