Skip to content

Commit

Permalink
Add more unit tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Aug 1, 2023
1 parent 38f04f4 commit 703785e
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3752,7 +3752,7 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept
*/
public void testCheckpointRefreshListener() throws IOException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(p -> newShard(mock), true);
IndexShard shard = newStartedShard(p -> newShard(true, mock), true);
List<ReferenceManager.RefreshListener> refreshListeners = shard.getEngine().config().getInternalRefreshListener();
assertTrue(refreshListeners.stream().anyMatch(e -> e instanceof CheckpointRefreshListener));
closeShards(shard);
Expand All @@ -3768,52 +3768,6 @@ public void testCheckpointRefreshListenerWithNull() throws IOException {
closeShards(shard);
}

/**
* creates a new initializing shard. The shard will be put in its proper path under the
* current node id the shard is assigned to.
* @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint
*/
private IndexShard newShard(SegmentReplicationCheckpointPublisher checkpointPublisher) throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(10),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);

Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT")
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), between(0, 1000))
.put(Settings.EMPTY)
.build();
IndexMetadata metadata = IndexMetadata.builder(shardRouting.getIndexName())
.settings(indexSettings)
.primaryTerm(0, primaryTerm)
.putMapping("{ \"properties\": {} }")
.build();
return newShard(
shardRouting,
shardPath,
metadata,
null,
null,
new InternalEngineFactory(),
new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())),
() -> {},
RetentionLeaseSyncer.EMPTY,
EMPTY_EVENT_LISTENER,
checkpointPublisher,
null
);
}

public void testIndexCheckOnStartup() throws Exception {
final IndexShard indexShard = newStartedShard(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingHelper;
import org.opensearch.common.collect.Tuple;
Expand Down Expand Up @@ -488,7 +489,38 @@ public void testShardIdleWithNoReplicas() throws Exception {
/**
* here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh.
*/
public void testPublishCheckpointOnPrimaryMode() throws IOException {
public void testPublishCheckpointOnPrimaryMode() throws IOException, InterruptedException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(p -> newShard(false, mock), false);

final ShardRouting shardRouting = shard.routingEntry();
promoteReplica(
shard,
Collections.singleton(shardRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(shardRouting.shardId()).addShard(shardRouting).build()
);

final CountDownLatch latch = new CountDownLatch(1);
shard.acquirePrimaryOperationPermit(new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
releasable.close();
latch.countDown();
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
}, ThreadPool.Names.GENERIC, "");

latch.await();
// verify checkpoint is published
verify(mock, times(1)).publish(any(), any());
closeShards(shard);
}

public void testPublishCheckpointPostFailover() throws IOException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(true);
CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {
private IndicesService indicesService;

private SegmentReplicationState state;
private ReplicationCheckpoint initialCheckpoint;

private static final long TRANSPORT_TIMEOUT = 30000;// 30sec

Expand Down Expand Up @@ -276,24 +277,22 @@ public void getSegmentFiles(
}
};
final SegmentReplicationTarget target = spy(
new SegmentReplicationTarget(replicaShard, source, mock(SegmentReplicationTargetService.SegmentReplicationListener.class))
new SegmentReplicationTarget(
replicaShard,
primaryShard.getLatestReplicationCheckpoint(),
source,
mock(SegmentReplicationTargetService.SegmentReplicationListener.class)
)
);

final SegmentReplicationTargetService spy = spy(sut);
doReturn(false).when(spy).processLatestReceivedCheckpoint(eq(replicaShard), any());
// Start first round of segment replication.
sut.startReplication(target);
spy.startReplication(target);

// Start second round of segment replication, this should fail to start as first round is still in-progress
sut.startReplication(replicaShard, new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
Assert.fail("Should not succeed");
}

@Override
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
assertEquals("Shard " + replicaShard.shardId() + " is already replicating", e.getMessage());
assertFalse(sendShardFailure);
}
});
spy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard);
verify(spy, times(1)).processLatestReceivedCheckpoint(eq(replicaShard), any());
blockGetCheckpointMetadata.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.engine.EngineTestCase;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
Expand Down Expand Up @@ -526,6 +527,52 @@ protected IndexShard newShard(
);
}

/**
* creates a new initializing shard. The shard will be put in its proper path under the
* current node id the shard is assigned to.
* @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint
*/
protected IndexShard newShard(boolean primary, SegmentReplicationCheckpointPublisher checkpointPublisher) throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(10),
primary,
ShardRoutingState.INITIALIZING,
primary ? RecoverySource.EmptyStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE
);
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);

Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT")
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), between(0, 1000))
.put(Settings.EMPTY)
.build();
IndexMetadata metadata = IndexMetadata.builder(shardRouting.getIndexName())
.settings(indexSettings)
.primaryTerm(0, primaryTerm)
.putMapping("{ \"properties\": {} }")
.build();
return newShard(
shardRouting,
shardPath,
metadata,
null,
null,
new NRTReplicationEngineFactory(),
new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())),
() -> {},
RetentionLeaseSyncer.EMPTY,
EMPTY_EVENT_LISTENER,
checkpointPublisher,
null
);
}

/**
* creates a new initializing shard.
* @param routing shard routing to use
Expand Down

0 comments on commit 703785e

Please sign in to comment.