Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft PR for Segment Replication Checkpoint Replay on Replica Shard #3606

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -2289,7 +2289,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
}

@Override
public SegmentInfos getLatestSegmentInfos() {
protected SegmentInfos getLatestSegmentInfos() {
OpenSearchDirectoryReader reader = null;
try {
reader = internalReaderManager.acquire();
Expand Down
46 changes: 39 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -1377,19 +1376,52 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}
}

/**
* Returns the lastest segmentInfos
*/
public SegmentInfos getLatestSegmentInfos() {
return getEngine().getSegmentInfosSnapshot().get();
}

/**
* Returns the lastest Replication Checkpoint that shard received
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return new ReplicationCheckpoint(shardId, 0, 0, 0, 0);
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
return new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
latestSegmentInfos.getGeneration(),
getProcessedLocalCheckpoint(),
latestSegmentInfos.getVersion()
);
}

/**
* Invoked when a new checkpoint is received from a primary shard. Starts the copy process.
*/
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) {
assert shardRouting.primary() == false;
// TODO
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
*/
public boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (state().equals(IndexShardState.STARTED) == false) {
logger.trace("Ignoring new replication checkpoint - shard is not started {}", state());
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
"Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}",
localCheckpoint,
requestCheckpoint
);
return false;
}
if (localCheckpoint.equals(requestCheckpoint)) {
logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint);
return false;
}
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.common.Nullable;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
Expand All @@ -31,14 +32,16 @@
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportService;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Service class that orchestrates replication events on replicas.
*
* @opensearch.internal
*/
public final class SegmentReplicationTargetService implements IndexEventListener {
public class SegmentReplicationTargetService implements IndexEventListener {

private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class);

Expand All @@ -49,6 +52,8 @@ public final class SegmentReplicationTargetService implements IndexEventListener

private final SegmentReplicationSourceFactory sourceFactory;

private static final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = new HashMap<>();

/**
* The internal actions
*
Expand All @@ -58,6 +63,7 @@ public static class Actions {
public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk";
}

@Inject
public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
Expand All @@ -84,6 +90,55 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
}
}

ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) {
if (latestReceivedCheckpoint.containsKey(shardId)) {
return latestReceivedCheckpoint.get(shardId);
}
return null;
}

/**
* Invoked when a new checkpoint is received from a primary shard.
* It checks if a new checkpoint should be processed or not and starts replication if needed.
* @param requestCheckpoint received checkpoint that is checked for processing
* @param indexShard replica shard on which checkpoint is received
*/
public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) {

if (getLatestReceivedCheckpoint(indexShard.shardId()) != null) {
if (requestCheckpoint.isAheadOf(latestReceivedCheckpoint.get(indexShard.shardId()))) {
latestReceivedCheckpoint.replace(indexShard.shardId(), requestCheckpoint);
}
} else {
latestReceivedCheckpoint.put(indexShard.shardId(), requestCheckpoint);
}
if (onGoingReplications.isShardReplicating(indexShard.shardId())) {
logger.trace("Ignoring new replication checkpoint - shard is currently replicating to a checkpoint");
return;
}
if (indexShard.shouldProcessCheckpoint(requestCheckpoint)) {
startReplication(requestCheckpoint, indexShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
// if we received a checkpoint during the copy event that is ahead of this
// try and process it.
if (getLatestReceivedCheckpoint(indexShard.shardId()).isAheadOf(indexShard.getLatestReplicationCheckpoint())) {
onNewCheckpoint(getLatestReceivedCheckpoint(indexShard.shardId()), indexShard);
}
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
if (sendShardFailure == true) {
logger.error("replication failure", e);
indexShard.failShard("replication failure", e);
}
}
});

}
}

public void startReplication(
final ReplicationCheckpoint checkpoint,
final IndexShard indexShard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -52,6 +53,8 @@ public class PublishCheckpointAction extends TransportReplicationAction<
public static final String ACTION_NAME = "indices:admin/publishCheckpoint";
protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class);

private final SegmentReplicationTargetService replicationService;

@Inject
public PublishCheckpointAction(
Settings settings,
Expand All @@ -60,7 +63,8 @@ public PublishCheckpointAction(
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters
ActionFilters actionFilters,
SegmentReplicationTargetService targetService
) {
super(
settings,
Expand All @@ -75,6 +79,7 @@ public PublishCheckpointAction(
PublishCheckpointRequest::new,
ThreadPool.Names.REFRESH
);
this.replicationService = targetService;
}

@Override
Expand Down Expand Up @@ -165,7 +170,7 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh
ActionListener.completeWith(listener, () -> {
logger.trace("Checkpoint received on replica {}", request);
if (request.getCheckpoint().getShardId().equals(replica.shardId())) {
replica.onNewCheckpoint(request);
replicationService.onNewCheckpoint(request.getCheckpoint(), replica);
}
return new ReplicaResult();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public int hashCode() {
* Checks if other is aheadof current replication point by comparing segmentInfosVersion. Returns true for null
*/
public boolean isAheadOf(@Nullable ReplicationCheckpoint other) {
return other == null || segmentInfosVersion > other.getSegmentInfosVersion();
return other == null || segmentInfosVersion > other.getSegmentInfosVersion() || primaryTerm > other.getPrimaryTerm();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ public boolean cancelForShard(ShardId shardId, String reason) {
return cancelled;
}

/**
* check if a shard is currently replicating
*
* @param shardId shardId for which to check if replicating
* @return true if shard is currently replicating
*/
public boolean isShardReplicating(ShardId shardId) {
return onGoingTargetEvents.values().stream().anyMatch(t -> t.indexShard.shardId().equals(shardId));
}

/**
* a reference to {@link ReplicationTarget}, which implements {@link AutoCloseable}. closing the reference
* causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.SetOnce;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.Assertions;
import org.opensearch.Build;
Expand Down Expand Up @@ -932,6 +934,15 @@ protected Node(
.toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class)
.toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService));
b.bind(SegmentReplicationTargetService.class)
.toInstance(
new SegmentReplicationTargetService(
threadPool,
recoverySettings,
transportService,
new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService)
)
);
}
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
Expand Down
Loading