Skip to content

Commit

Permalink
Move primary term from replicas proxy to repl op (#41119)
Browse files Browse the repository at this point in the history
A small refactoring that removes the primaryTerm field from ReplicasProxy and
instead passes it directly in to the methods that need it. Relates #40706.
  • Loading branch information
DaveCTurner authored Apr 11, 2019
1 parent 3ec0cc5 commit b41e3fc
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ private void executeShardOperation(final ShardRequest request, final IndexShard
}

@Override
protected ReplicationOperation.Replicas<ShardRequest> newReplicasProxy(final long primaryTerm) {
return new VerifyShardBeforeCloseActionReplicasProxy(primaryTerm);
protected ReplicationOperation.Replicas<ShardRequest> newReplicasProxy() {
return new VerifyShardBeforeCloseActionReplicasProxy();
}

/**
Expand All @@ -125,13 +125,9 @@ protected ReplicationOperation.Replicas<ShardRequest> newReplicasProxy(final lon
* or reopened in an unverified state with potential non flushed translog operations.
*/
class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy {

VerifyShardBeforeCloseActionReplicasProxy(final long primaryTerm) {
super(primaryTerm);
}

@Override
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final ActionListener<Void> listener) {
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final long primaryTerm,
final ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ protected ResyncReplicationResponse newResponseInstance() {
}

@Override
protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
return new ResyncActionReplicasProxy(primaryTerm);
protected ReplicationOperation.Replicas newReplicasProxy() {
return new ResyncActionReplicasProxy();
}

@Override
Expand All @@ -96,9 +96,10 @@ public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest
}

@Override
protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
protected WriteReplicaResult<ResyncReplicationRequest> shardOperationOnReplica(ResyncReplicationRequest request,
IndexShard replica) throws Exception {
Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult(request, location, null, replica, logger);
return new WriteReplicaResult<>(request, location, null, replica, logger);
}

public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
Expand Down Expand Up @@ -174,12 +175,9 @@ public void handleException(TransportException exp) {
*/
class ResyncActionReplicasProxy extends ReplicasProxy {

ResyncActionReplicasProxy(long primaryTerm) {
super(primaryTerm);
}

@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(
replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ public class ReplicationOperation<
private final Primary<Request, ReplicaRequest, PrimaryResultT> primary;
private final Replicas<ReplicaRequest> replicasProxy;
private final AtomicBoolean finished = new AtomicBoolean();
protected final ActionListener<PrimaryResultT> resultListener;
private final long primaryTerm;

// exposed for tests
final ActionListener<PrimaryResultT> resultListener;

private volatile PrimaryResultT primaryResult = null;

Expand All @@ -80,13 +83,14 @@ public class ReplicationOperation<
public ReplicationOperation(Request request, Primary<Request, ReplicaRequest, PrimaryResultT> primary,
ActionListener<PrimaryResultT> listener,
Replicas<ReplicaRequest> replicas,
Logger logger, String opType) {
Logger logger, String opType, long primaryTerm) {
this.replicasProxy = replicas;
this.primary = primary;
this.resultListener = listener;
this.logger = logger;
this.request = request;
this.opType = opType;
this.primaryTerm = primaryTerm;
}

public void execute() throws Exception {
Expand Down Expand Up @@ -137,7 +141,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica
// if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
for (String allocationId : replicationGroup.getUnavailableInSyncShards()) {
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId,
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, primaryTerm,
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
}
}
Expand Down Expand Up @@ -165,44 +169,45 @@ private void performOnReplica(final ShardRouting shard, final ReplicaRequest rep

totalShards.incrementAndGet();
pendingActions.incrementAndGet();
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, new ActionListener<ReplicaResponse>() {
@Override
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
} catch (final AlreadyClosedException e) {
// okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
// fail the primary but fall through and let the rest of operation processing complete
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
primary.failShard(message, e);
replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes,
new ActionListener<>() {
@Override
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
} catch (final AlreadyClosedException e) {
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
// fail the primary but fall through and let the rest of operation processing complete
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
primary.failShard(message, e);
}
decPendingAndFinishIfNeeded();
}
decPendingAndFinishIfNeeded();
}

@Override
public void onFailure(Exception replicaException) {
logger.trace(() -> new ParameterizedMessage(
"[{}] failure while performing [{}] on replica {}, request [{}]",
shard.shardId(), opType, shard, replicaRequest), replicaException);
// Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
if (TransportActions.isShardNotAvailableException(replicaException) == false) {
RestStatus restStatus = ExceptionsHelper.status(replicaException);
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
@Override
public void onFailure(Exception replicaException) {
logger.trace(() -> new ParameterizedMessage(
"[{}] failure while performing [{}] on replica {}, request [{}]",
shard.shardId(), opType, shard, replicaRequest), replicaException);
// Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
if (TransportActions.isShardNotAvailableException(replicaException) == false) {
RestStatus restStatus = ExceptionsHelper.status(replicaException);
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
}
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, primaryTerm, message, replicaException,
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
}
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, message, replicaException,
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
}

@Override
public String toString() {
return "[" + replicaRequest + "][" + shard + "]";
}
});
@Override
public String toString() {
return "[" + replicaRequest + "][" + shard + "]";
}
});
}

private void onNoLongerPrimary(Exception failure) {
Expand Down Expand Up @@ -373,25 +378,27 @@ public interface Replicas<RequestT extends ReplicationRequest<RequestT>> {
*
* @param replica the shard this request should be executed on
* @param replicaRequest the operation to perform
* @param primaryTerm the primary term
* @param globalCheckpoint the global checkpoint on the primary
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwriting Lucene) or deletes on primary
* after this replication was executed on it.
* @param listener callback for handling the response or failure
*/
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<ReplicaResponse> listener);
void performOn(ShardRouting replica, RequestT replicaRequest,
long primaryTerm, long globalCheckpoint, long maxSeqNoOfUpdatesOrDeletes, ActionListener<ReplicaResponse> listener);

/**
* Fail the specified shard if needed, removing it from the current set
* of active shards. Whether a failure is needed is left up to the
* implementation.
*
* @param replica shard to fail
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
* @param replica shard to fail
* @param primaryTerm the primary term
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
*/
void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener);
void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, ActionListener<Void> listener);

/**
* Marks shard copy as stale if needed, removing its allocation id from
Expand All @@ -400,9 +407,10 @@ void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpo
*
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
* @param primaryTerm the primary term
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
*/
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener);
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, ActionListener<Void> listener);
}

/**
Expand All @@ -427,11 +435,11 @@ public interface ReplicaResponse {
}

public static class RetryOnPrimaryException extends ElasticsearchException {
public RetryOnPrimaryException(ShardId shardId, String msg) {
RetryOnPrimaryException(ShardId shardId, String msg) {
this(shardId, msg, null);
}

public RetryOnPrimaryException(ShardId shardId, String msg, Throwable cause) {
RetryOnPrimaryException(ShardId shardId, String msg, Throwable cause) {
super(msg, cause);
setShard(shardId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
new ReroutePhase((ReplicationTask) task, request, listener).run();
}

protected ReplicationOperation.Replicas<ReplicaRequest> newReplicasProxy(long primaryTerm) {
return new ReplicasProxy(primaryTerm);
protected ReplicationOperation.Replicas<ReplicaRequest> newReplicasProxy() {
return new ReplicasProxy();
}

protected abstract Response newResponseInstance();
Expand Down Expand Up @@ -409,7 +409,7 @@ protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaReq
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
newReplicasProxy(primaryRequest.getPrimaryTerm()), logger, actionName);
newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm());
}
}

Expand Down Expand Up @@ -1021,16 +1021,11 @@ public int hashCode() {
*/
protected class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {

protected final long primaryTerm;

public ReplicasProxy(long primaryTerm) {
this.primaryTerm = primaryTerm;
}

@Override
public void performOn(
final ShardRouting replica,
final ReplicaRequest request,
final long primaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
Expand All @@ -1051,7 +1046,8 @@ public void performOn(
}

@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
ActionListener<Void> listener) {
// This does not need to fail the shard. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be
Expand All @@ -1060,7 +1056,7 @@ public void failShardIfNeeded(ShardRouting replica, String message, Exception ex
}

@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, ActionListener<Void> listener) {
// This does not need to make the shard stale. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ public static Location locationToSync(Location current, Location next) {
}

@Override
protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
return new WriteActionReplicasProxy(primaryTerm);
protected ReplicationOperation.Replicas<ReplicaRequest> newReplicasProxy() {
return new WriteActionReplicasProxy();
}

/**
Expand Down Expand Up @@ -371,12 +371,9 @@ void run() {
*/
class WriteActionReplicasProxy extends ReplicasProxy {

WriteActionReplicasProxy(long primaryTerm) {
super(primaryTerm);
}

@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
ActionListener<Void> listener) {
if (TransportActions.isShardNotAvailableException(exception) == false) {
logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
}
Expand All @@ -385,7 +382,7 @@ public void failShardIfNeeded(ShardRouting replica, String message, Exception ex
}

@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener);
}
}
Expand Down
Loading

0 comments on commit b41e3fc

Please sign in to comment.