Skip to content

Commit

Permalink
Change startNode to startDataOnlyNode to avoid cluster specific issues
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Jun 19, 2023
1 parent c25c572 commit 8bc89f4
Showing 1 changed file with 40 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.junit.Before;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
Expand Down Expand Up @@ -95,11 +96,16 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends SegmentReplicationBaseIT {

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
Expand All @@ -125,7 +131,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

// start another node, index another doc and replicate.
String nodeC = internalCluster().startNode();
String nodeC = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get();
refresh(INDEX_NAME);
Expand All @@ -134,10 +140,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
}

public void testRestartPrimary() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);
Expand All @@ -160,10 +166,10 @@ public void testRestartPrimary() throws Exception {

public void testCancelPrimaryAllocation() throws Exception {
// this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica.
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int initialDocCount = 1;
Expand All @@ -190,8 +196,8 @@ public void testCancelPrimaryAllocation() throws Exception {
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
final Settings settings = Settings.builder()
.put(indexSettings())
.put(
Expand Down Expand Up @@ -233,8 +239,8 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
}

public void testIndexReopenClose() throws Exception {
final String primary = internalCluster().startNode();
final String replica = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
final String replica = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -274,8 +280,8 @@ public void testMultipleShards() throws Exception {
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -310,8 +316,8 @@ public void testMultipleShards() throws Exception {
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -351,14 +357,13 @@ public void testReplicationAfterForceMerge() throws Exception {
* This test verifies that segment replication does not fail for closed indices
*/
public void testClosedIndices() {
internalCluster().startClusterManagerOnlyNode();
List<String> nodes = new ArrayList<>();
// start 1st node so that it contains the primary
nodes.add(internalCluster().startNode());
nodes.add(internalCluster().startDataOnlyNode());
createIndex(INDEX_NAME, super.indexSettings());
ensureYellowAndNoInitializingShards(INDEX_NAME);
// start 2nd node so that it contains the replica
nodes.add(internalCluster().startNode());
nodes.add(internalCluster().startDataOnlyNode());
ensureGreen(INDEX_NAME);

logger.info("--> Close index");
Expand All @@ -373,8 +378,7 @@ public void testClosedIndices() {
* @throws Exception when issue is encountered
*/
public void testNodeDropWithOngoingReplication() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
Expand All @@ -385,7 +389,7 @@ public void testNodeDropWithOngoingReplication() throws Exception {
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
// Get replica allocation id
Expand Down Expand Up @@ -447,11 +451,11 @@ public void testNodeDropWithOngoingReplication() throws Exception {
}

public void testCancellation() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);

final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();

final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance(
SegmentReplicationSourceService.class,
Expand Down Expand Up @@ -506,7 +510,7 @@ public void testCancellation() throws Exception {
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

Expand All @@ -529,7 +533,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
Expand All @@ -544,8 +548,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
}

public void testDeleteOperations() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -591,7 +595,6 @@ public void testDeleteOperations() throws Exception {
*/
public void testReplicationPostDeleteAndForceMerge() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
Expand Down Expand Up @@ -649,7 +652,6 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception {
}

public void testUpdateOperations() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellow(INDEX_NAME);
Expand Down Expand Up @@ -703,7 +705,6 @@ public void testDropPrimaryDuringReplication() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replica_count)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, settings);
final List<String> dataNodes = internalCluster().startDataOnlyNodes(6);
Expand Down Expand Up @@ -743,7 +744,6 @@ public void testDropPrimaryDuringReplication() throws Exception {
}

public void testReplicaHasDiffFilesThanPrimary() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);
Expand Down Expand Up @@ -797,7 +797,6 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}

public void testPressureServiceStats() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
Expand Down Expand Up @@ -850,7 +849,7 @@ public void testPressureServiceStats() throws Exception {
assertEquals(0, replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats().size());

// start another replica.
String replicaNode_2 = internalCluster().startNode();
String replicaNode_2 = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
String docId = String.valueOf(initialDocCount + 1);
client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get();
Expand Down Expand Up @@ -889,10 +888,10 @@ public void testPressureServiceStats() throws Exception {
public void testScrollCreatedOnReplica() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

// index 100 docs
Expand Down Expand Up @@ -983,15 +982,15 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
);

// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
prepareCreate(
INDEX_NAME,
Settings.builder()
// we want to control refreshes
.put("index.refresh_interval", -1)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int initialDocCount = 10;
Expand Down Expand Up @@ -1106,10 +1105,10 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
}

public void testPitCreatedOnReplica() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME)
.setId("1")
Expand Down Expand Up @@ -1236,13 +1235,13 @@ public void testPitCreatedOnReplica() throws Exception {
*/
public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
final List<String> nodes = new ArrayList<>();
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
nodes.add(primaryNode);
final Settings settings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build();
createIndex(INDEX_NAME, settings);
ensureGreen(INDEX_NAME);
// start a replica node, initially will be empty with no shard assignment.
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();
nodes.add(replicaNode);

// index a doc.
Expand Down

0 comments on commit 8bc89f4

Please sign in to comment.