Skip to content

Commit

Permalink
add unit test for cluster state update
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Nov 30, 2023
1 parent 08d58b7 commit 6c93efb
Showing 1 changed file with 54 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,26 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.index.IndexService;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -51,6 +59,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -91,6 +100,8 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {
private SegmentReplicationState state;
private ReplicationCheckpoint initialCheckpoint;

private ClusterState clusterState;

private static final long TRANSPORT_TIMEOUT = 30000;// 30sec

@Override
Expand Down Expand Up @@ -129,7 +140,7 @@ public void setUp() throws Exception {

indicesService = mock(IndicesService.class);
ClusterService clusterService = mock(ClusterService.class);
ClusterState clusterState = mock(ClusterState.class);
clusterState = mock(ClusterState.class);
RoutingTable mockRoutingTable = mock(RoutingTable.class);
when(clusterService.state()).thenReturn(clusterState);
when(clusterState.routingTable()).thenReturn(mockRoutingTable);
Expand Down Expand Up @@ -630,4 +641,46 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile
target.cancel("test");
sut.startReplication(target);
}

public void testProcessCheckpointOnClusterStateUpdate() {
// set up mocks on indicies & index service to return our replica's index & shard.
IndexService indexService = mock(IndexService.class);
when(indexService.iterator()).thenReturn(Set.of(replicaShard).iterator());
when(indexService.getIndexSettings()).thenReturn(replicaShard.indexSettings());
when(indexService.index()).thenReturn(replicaShard.routingEntry().index());
when(indicesService.iterator()).thenReturn(Set.of(indexService).iterator());

// create old & new cluster states
final String targetNodeId = "targetNodeId";
ShardRouting initialRouting = primaryShard.routingEntry().relocate(targetNodeId, 0L);
assertEquals(ShardRoutingState.RELOCATING, initialRouting.state());

ShardRouting targetRouting = ShardRouting.newUnassigned(
primaryShard.shardId(),
true,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, "test")
).initialize(targetNodeId, initialRouting.allocationId().getId(), 0L).moveToStarted();
assertEquals(targetNodeId, targetRouting.currentNodeId());
assertEquals(ShardRoutingState.STARTED, targetRouting.state());
ClusterState oldState = ClusterState.builder(ClusterName.DEFAULT)
.routingTable(
RoutingTable.builder()
.add(IndexRoutingTable.builder(primaryShard.shardId().getIndex()).addShard(initialRouting).build())
.build()
)
.build();
ClusterState newState = ClusterState.builder(ClusterName.DEFAULT)
.routingTable(
RoutingTable.builder()
.add(IndexRoutingTable.builder(primaryShard.shardId().getIndex()).addShard(targetRouting).build())
.build()
)
.build();

// spy so we can verify process is invoked
SegmentReplicationTargetService spy = spy(sut);
spy.clusterChanged(new ClusterChangedEvent("ignored", oldState, newState));
verify(spy, times(1)).processLatestReceivedCheckpoint(eq(replicaShard), any());
}
}

0 comments on commit 6c93efb

Please sign in to comment.