From b41e3fc09f2584626c5fe8c9889ba70d9cd4cbb2 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 11 Apr 2019 20:26:18 +0100 Subject: [PATCH] Move primary term from replicas proxy to repl op (#41119) A small refactoring that removes the primaryTerm field from ReplicasProxy and instead passes it directly in to the methods that need it. Relates #40706. --- ...TransportVerifyShardBeforeCloseAction.java | 12 +-- .../TransportResyncReplicationAction.java | 16 ++- .../replication/ReplicationOperation.java | 102 ++++++++++-------- .../TransportReplicationAction.java | 18 ++-- .../replication/TransportWriteAction.java | 13 +-- ...portVerifyShardBeforeCloseActionTests.java | 99 +++++++++-------- .../ReplicationOperationTests.java | 48 ++++----- .../TransportReplicationActionTests.java | 19 ++-- .../TransportWriteActionTests.java | 9 +- .../ESIndexLevelReplicationTestCase.java | 11 +- 10 files changed, 173 insertions(+), 174 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index f29bf6987a085..7d691717de1f2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -115,8 +115,8 @@ private void executeShardOperation(final ShardRequest request, final IndexShard } @Override - protected ReplicationOperation.Replicas newReplicasProxy(final long primaryTerm) { - return new VerifyShardBeforeCloseActionReplicasProxy(primaryTerm); + protected ReplicationOperation.Replicas newReplicasProxy() { + return new VerifyShardBeforeCloseActionReplicasProxy(); } /** @@ -125,13 +125,9 @@ protected ReplicationOperation.Replicas newReplicasProxy(final lon * or reopened in an unverified state with potential non flushed translog operations. */ class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy { - - VerifyShardBeforeCloseActionReplicasProxy(final long primaryTerm) { - super(primaryTerm); - } - @Override - public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final ActionListener listener) { + public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final long primaryTerm, + final ActionListener listener) { shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener); } } diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index bfe3274996160..464cd3168bf7e 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -68,8 +68,8 @@ protected ResyncReplicationResponse newResponseInstance() { } @Override - protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { - return new ResyncActionReplicasProxy(primaryTerm); + protected ReplicationOperation.Replicas newReplicasProxy() { + return new ResyncActionReplicasProxy(); } @Override @@ -96,9 +96,10 @@ public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest } @Override - protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception { + protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest request, + IndexShard replica) throws Exception { Translog.Location location = performOnReplica(request, replica); - return new WriteReplicaResult(request, location, null, replica, logger); + return new WriteReplicaResult<>(request, location, null, replica, logger); } public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception { @@ -174,12 +175,9 @@ public void handleException(TransportException exp) { */ class ResyncActionReplicasProxy extends ReplicasProxy { - ResyncActionReplicasProxy(long primaryTerm) { - super(primaryTerm); - } - @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, + ActionListener listener) { shardStateAction.remoteShardFailed( replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, listener); } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index c8c102dfd85e7..e5c2136aae56d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -71,7 +71,10 @@ public class ReplicationOperation< private final Primary primary; private final Replicas replicasProxy; private final AtomicBoolean finished = new AtomicBoolean(); - protected final ActionListener resultListener; + private final long primaryTerm; + + // exposed for tests + final ActionListener resultListener; private volatile PrimaryResultT primaryResult = null; @@ -80,13 +83,14 @@ public class ReplicationOperation< public ReplicationOperation(Request request, Primary primary, ActionListener listener, Replicas replicas, - Logger logger, String opType) { + Logger logger, String opType, long primaryTerm) { this.replicasProxy = replicas; this.primary = primary; this.resultListener = listener; this.logger = logger; this.request = request; this.opType = opType; + this.primaryTerm = primaryTerm; } public void execute() throws Exception { @@ -137,7 +141,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica // if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale for (String allocationId : replicationGroup.getUnavailableInSyncShards()) { pendingActions.incrementAndGet(); - replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, + replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, primaryTerm, ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)); } } @@ -165,44 +169,45 @@ private void performOnReplica(final ShardRouting shard, final ReplicaRequest rep totalShards.incrementAndGet(); pendingActions.incrementAndGet(); - replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, new ActionListener() { - @Override - public void onResponse(ReplicaResponse response) { - successfulShards.incrementAndGet(); - try { - primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint()); - primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint()); - } catch (final AlreadyClosedException e) { - // okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally - } catch (final Exception e) { - // fail the primary but fall through and let the rest of operation processing complete - final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard); - primary.failShard(message, e); + replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, + new ActionListener<>() { + @Override + public void onResponse(ReplicaResponse response) { + successfulShards.incrementAndGet(); + try { + primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint()); + primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint()); + } catch (final AlreadyClosedException e) { + // the index was deleted or this shard was never activated after a relocation; fall through and finish normally + } catch (final Exception e) { + // fail the primary but fall through and let the rest of operation processing complete + final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard); + primary.failShard(message, e); + } + decPendingAndFinishIfNeeded(); } - decPendingAndFinishIfNeeded(); - } - @Override - public void onFailure(Exception replicaException) { - logger.trace(() -> new ParameterizedMessage( - "[{}] failure while performing [{}] on replica {}, request [{}]", - shard.shardId(), opType, shard, replicaRequest), replicaException); - // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report. - if (TransportActions.isShardNotAvailableException(replicaException) == false) { - RestStatus restStatus = ExceptionsHelper.status(replicaException); - shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( - shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); + @Override + public void onFailure(Exception replicaException) { + logger.trace(() -> new ParameterizedMessage( + "[{}] failure while performing [{}] on replica {}, request [{}]", + shard.shardId(), opType, shard, replicaRequest), replicaException); + // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report. + if (TransportActions.isShardNotAvailableException(replicaException) == false) { + RestStatus restStatus = ExceptionsHelper.status(replicaException); + shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( + shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); + } + String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); + replicasProxy.failShardIfNeeded(shard, primaryTerm, message, replicaException, + ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)); } - String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); - replicasProxy.failShardIfNeeded(shard, message, replicaException, - ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)); - } - @Override - public String toString() { - return "[" + replicaRequest + "][" + shard + "]"; - } - }); + @Override + public String toString() { + return "[" + replicaRequest + "][" + shard + "]"; + } + }); } private void onNoLongerPrimary(Exception failure) { @@ -373,25 +378,27 @@ public interface Replicas> { * * @param replica the shard this request should be executed on * @param replicaRequest the operation to perform + * @param primaryTerm the primary term * @param globalCheckpoint the global checkpoint on the primary * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwriting Lucene) or deletes on primary * after this replication was executed on it. * @param listener callback for handling the response or failure */ - void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint, - long maxSeqNoOfUpdatesOrDeletes, ActionListener listener); + void performOn(ShardRouting replica, RequestT replicaRequest, + long primaryTerm, long globalCheckpoint, long maxSeqNoOfUpdatesOrDeletes, ActionListener listener); /** * Fail the specified shard if needed, removing it from the current set * of active shards. Whether a failure is needed is left up to the * implementation. * - * @param replica shard to fail - * @param message a (short) description of the reason - * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed - * @param listener a listener that will be notified when the failing shard has been removed from the in-sync set + * @param replica shard to fail + * @param primaryTerm the primary term + * @param message a (short) description of the reason + * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed + * @param listener a listener that will be notified when the failing shard has been removed from the in-sync set */ - void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener); + void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, ActionListener listener); /** * Marks shard copy as stale if needed, removing its allocation id from @@ -400,9 +407,10 @@ void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpo * * @param shardId shard id * @param allocationId allocation id to remove from the set of in-sync allocation ids + * @param primaryTerm the primary term * @param listener a listener that will be notified when the failing shard has been removed from the in-sync set */ - void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener); + void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, ActionListener listener); } /** @@ -427,11 +435,11 @@ public interface ReplicaResponse { } public static class RetryOnPrimaryException extends ElasticsearchException { - public RetryOnPrimaryException(ShardId shardId, String msg) { + RetryOnPrimaryException(ShardId shardId, String msg) { this(shardId, msg, null); } - public RetryOnPrimaryException(ShardId shardId, String msg, Throwable cause) { + RetryOnPrimaryException(ShardId shardId, String msg, Throwable cause) { super(msg, cause); setShard(shardId); } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 96c6cc3afa04c..e9c071e5a0ed1 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -163,8 +163,8 @@ protected void doExecute(Task task, Request request, ActionListener li new ReroutePhase((ReplicationTask) task, request, listener).run(); } - protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { - return new ReplicasProxy(primaryTerm); + protected ReplicationOperation.Replicas newReplicasProxy() { + return new ReplicasProxy(); } protected abstract Response newResponseInstance(); @@ -409,7 +409,7 @@ protected ReplicationOperation> listener, PrimaryShardReference primaryShardReference) { return new ReplicationOperation<>(request, primaryShardReference, listener, - newReplicasProxy(primaryRequest.getPrimaryTerm()), logger, actionName); + newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()); } } @@ -1021,16 +1021,11 @@ public int hashCode() { */ protected class ReplicasProxy implements ReplicationOperation.Replicas { - protected final long primaryTerm; - - public ReplicasProxy(long primaryTerm) { - this.primaryTerm = primaryTerm; - } - @Override public void performOn( final ShardRouting replica, final ReplicaRequest request, + final long primaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener listener) { @@ -1051,7 +1046,8 @@ public void performOn( } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, + ActionListener listener) { // This does not need to fail the shard. The idea is that this // is a non-write operation (something like a refresh or a global // checkpoint sync) and therefore the replica should still be @@ -1060,7 +1056,7 @@ public void failShardIfNeeded(ShardRouting replica, String message, Exception ex } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, ActionListener listener) { // This does not need to make the shard stale. The idea is that this // is a non-write operation (something like a refresh or a global // checkpoint sync) and therefore the replica should still be diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 15c49d1030374..86e2760c9012d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -96,8 +96,8 @@ public static Location locationToSync(Location current, Location next) { } @Override - protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { - return new WriteActionReplicasProxy(primaryTerm); + protected ReplicationOperation.Replicas newReplicasProxy() { + return new WriteActionReplicasProxy(); } /** @@ -371,12 +371,9 @@ void run() { */ class WriteActionReplicasProxy extends ReplicasProxy { - WriteActionReplicasProxy(long primaryTerm) { - super(primaryTerm); - } - @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, + ActionListener listener) { if (TransportActions.isShardNotAvailableException(exception) == false) { logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); } @@ -385,7 +382,7 @@ public void failShardIfNeeded(ShardRouting replica, String message, Exception ex } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, ActionListener listener) { shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index f90f40311c1b8..75f70ad02df64 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -139,7 +139,7 @@ private void executeOnPrimaryOrReplica() throws Throwable { final TaskId taskId = new TaskId("_node_id", randomNonNegativeLong()); final TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), clusterBlock, taskId); - final PlainActionFuture res = PlainActionFuture.newFuture(); + final PlainActionFuture res = PlainActionFuture.newFuture(); action.shardOperationOnPrimary(request, indexShard, ActionListener.wrap( r -> { assertNotNull(r); @@ -228,10 +228,10 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { TaskId taskId = new TaskId(clusterService.localNode().getId(), 0L); TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, clusterBlock, taskId); - ReplicationOperation.Replicas proxy = action.newReplicasProxy(primaryTerm); + ReplicationOperation.Replicas proxy = action.newReplicasProxy(); ReplicationOperation operation = - new ReplicationOperation<>(request, createPrimary(primaryRouting, replicationGroup), listener, proxy, logger, "test"); + TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> operation = new ReplicationOperation<>( + request, createPrimary(primaryRouting, replicationGroup), listener, proxy, logger, "test", primaryTerm); operation.execute(); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); @@ -268,53 +268,50 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> createPrimary(final ShardRouting primary, final ReplicationGroup replicationGroup) { - return new ReplicationOperation.Primary< - TransportVerifyShardBeforeCloseAction.ShardRequest, - TransportVerifyShardBeforeCloseAction.ShardRequest, - PrimaryResult>() { - @Override - public ShardRouting routingEntry() { - return primary; - } - - @Override - public ReplicationGroup getReplicationGroup() { - return replicationGroup; - } - - @Override - public void perform( - TransportVerifyShardBeforeCloseAction.ShardRequest request, ActionListener listener) { - listener.onResponse(new PrimaryResult(request)); - } - - @Override - public void failShard(String message, Exception exception) { - - } - - @Override - public void updateLocalCheckpointForShard(String allocationId, long checkpoint) { - } - - @Override - public void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint) { - } - - @Override - public long localCheckpoint() { - return 0; - } - - @Override - public long globalCheckpoint() { - return 0; - } - - @Override - public long maxSeqNoOfUpdatesOrDeletes() { - return 0; - } + return new ReplicationOperation.Primary<>() { + @Override + public ShardRouting routingEntry() { + return primary; + } + + @Override + public ReplicationGroup getReplicationGroup() { + return replicationGroup; + } + + @Override + public void perform( + TransportVerifyShardBeforeCloseAction.ShardRequest request, ActionListener listener) { + listener.onResponse(new PrimaryResult(request)); + } + + @Override + public void failShard(String message, Exception exception) { + + } + + @Override + public void updateLocalCheckpointForShard(String allocationId, long checkpoint) { + } + + @Override + public void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint) { + } + + @Override + public long localCheckpoint() { + return 0; + } + + @Override + public long globalCheckpoint() { + return 0; + } + + @Override + public long maxSeqNoOfUpdatesOrDeletes() { + return 0; + } }; } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index d493b70337208..c959e3ed45d1a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -116,10 +116,10 @@ public void testReplication() throws Exception { Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); - final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, simulatedFailures); + final TestReplicaProxy replicasProxy = new TestReplicaProxy(simulatedFailures); final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup); - final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy); + final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, primaryTerm); op.execute(); assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); @@ -213,12 +213,12 @@ public void testNoLongerPrimary() throws Exception { } else { shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead"); } - final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures) { + final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures) { @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, ActionListener shardActionListener) { if (testPrimaryDemotedOnStaleShardCopies) { - super.failShardIfNeeded(replica, message, exception, shardActionListener); + super.failShardIfNeeded(replica, primaryTerm, message, exception, shardActionListener); } else { assertThat(replica, equalTo(failedReplica)); shardActionListener.onFailure(shardActionFailure); @@ -226,11 +226,12 @@ public void failShardIfNeeded(ShardRouting replica, String message, Exception ex } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener shardActionListener) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, + ActionListener shardActionListener) { if (testPrimaryDemotedOnStaleShardCopies) { shardActionListener.onFailure(shardActionFailure); } else { - super.markShardCopyAsStaleIfNeeded(shardId, allocationId, shardActionListener); + super.markShardCopyAsStaleIfNeeded(shardId, allocationId, primaryTerm, shardActionListener); } } }; @@ -242,7 +243,7 @@ public void failShard(String message, Exception exception) { assertTrue(primaryFailed.compareAndSet(false, true)); } }; - final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy); + final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, primaryTerm); op.execute(); assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); @@ -299,7 +300,7 @@ public void perform(Request request, ActionListener listener) { Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, - new TestReplicaProxy(primaryTerm)); + new TestReplicaProxy(), primaryTerm); op.execute(); assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); @@ -343,7 +344,7 @@ public void testWaitForActiveShards() throws Exception { final ShardRouting primaryShard = shardRoutingTable.primaryShard(); final TestReplicationOperation op = new TestReplicationOperation(request, new TestPrimary(primaryShard, () -> initialReplicationGroup), - listener, new TestReplicaProxy(primaryTerm), logger, "test"); + listener, new TestReplicaProxy(), logger, "test", primaryTerm); if (passesActiveShardCheck) { assertThat(op.checkActiveShardCount(), nullValue()); @@ -401,8 +402,8 @@ public void updateLocalCheckpointForShard(String allocationId, long checkpoint) }; final PlainActionFuture listener = new PlainActionFuture<>(); - final ReplicationOperation.Replicas replicas = new TestReplicaProxy(primaryTerm, Collections.emptyMap()); - TestReplicationOperation operation = new TestReplicationOperation(request, primary, listener, replicas); + final ReplicationOperation.Replicas replicas = new TestReplicaProxy(Collections.emptyMap()); + TestReplicationOperation operation = new TestReplicationOperation(request, primary, listener, replicas, primaryTerm); operation.execute(); assertThat(primaryFailed.get(), equalTo(fatal)); @@ -577,14 +578,11 @@ static class TestReplicaProxy implements ReplicationOperation.Replicas final Set markedAsStaleCopies = ConcurrentCollections.newConcurrentSet(); - final long primaryTerm; - - TestReplicaProxy(long primaryTerm) { - this(primaryTerm, Collections.emptyMap()); + TestReplicaProxy() { + this(Collections.emptyMap()); } - TestReplicaProxy(long primaryTerm, Map opFailures) { - this.primaryTerm = primaryTerm; + TestReplicaProxy(Map opFailures) { this.opFailures = opFailures; } @@ -592,6 +590,7 @@ static class TestReplicaProxy implements ReplicationOperation.Replicas public void performOn( final ShardRouting replica, final Request request, + final long primaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener listener) { @@ -609,7 +608,8 @@ public void performOn( } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, + ActionListener listener) { if (failedReplicas.add(replica) == false) { fail("replica [" + replica + "] was failed twice"); } @@ -621,7 +621,7 @@ public void failShardIfNeeded(ShardRouting replica, String message, Exception ex } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, ActionListener listener) { if (markedAsStaleCopies.add(allocationId) == false) { fail("replica [" + allocationId + "] was marked as stale twice"); } @@ -631,14 +631,14 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, A class TestReplicationOperation extends ReplicationOperation { TestReplicationOperation(Request request, Primary primary, - ActionListener listener, Replicas replicas) { - this(request, primary, listener, replicas, ReplicationOperationTests.this.logger, "test"); + ActionListener listener, Replicas replicas, long primaryTerm) { + this(request, primary, listener, replicas, ReplicationOperationTests.this.logger, "test", primaryTerm); } TestReplicationOperation(Request request, Primary primary, ActionListener listener, - Replicas replicas, Logger logger, String opType) { - super(request, primary, listener, replicas, logger, opType); + Replicas replicas, Logger logger, String opType, long primaryTerm) { + super(request, primary, listener, replicas, logger, opType, primaryTerm); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index e256ec4e92679..2cdd3ad2fe480 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -603,7 +603,7 @@ action.new AsyncPrimaryAction(primaryRequest, listener, task) { Request request, ActionListener> actionListener, TransportReplicationAction.PrimaryShardReference primaryShardReference) { - return new NoopReplicationOperation(request, actionListener) { + return new NoopReplicationOperation(request, actionListener, primaryTerm) { @Override public void execute() throws Exception { assertPhase(task, "primary"); @@ -661,7 +661,7 @@ action.new AsyncPrimaryAction(primaryRequest, listener, task) { Request request, ActionListener> actionListener, TransportReplicationAction.PrimaryShardReference primaryShardReference) { - return new NoopReplicationOperation(request, actionListener) { + return new NoopReplicationOperation(request, actionListener, primaryTerm) { @Override public void execute() throws Exception { assertPhase(task, "primary"); @@ -710,7 +710,8 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { ClusterState state = stateWithActivePrimary(index, true, 1 + randomInt(3), randomInt(2)); logger.info("using state: {}", state); setState(clusterService, state); - ReplicationOperation.Replicas proxy = action.newReplicasProxy(state.metaData().index(index).primaryTerm(0)); + final long primaryTerm = state.metaData().index(index).primaryTerm(0); + ReplicationOperation.Replicas proxy = action.newReplicasProxy(); // check that at unknown node fails PlainActionFuture listener = new PlainActionFuture<>(); @@ -720,6 +721,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { TestShardRouting.newShardRouting(shardId, "NOT THERE", routingState == ShardRoutingState.RELOCATING ? state.nodes().iterator().next().getId() : null, false, routingState), new Request(NO_SHARD_ID), + primaryTerm, randomNonNegativeLong(), randomNonNegativeLong(), listener); @@ -730,7 +732,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream() .filter(ShardRouting::assignedToNode).collect(Collectors.toList())); listener = new PlainActionFuture<>(); - proxy.performOn(replica, new Request(NO_SHARD_ID), randomNonNegativeLong(), randomNonNegativeLong(), listener); + proxy.performOn(replica, new Request(NO_SHARD_ID), primaryTerm, randomNonNegativeLong(), randomNonNegativeLong(), listener); assertFalse(listener.isDone()); CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear(); @@ -753,7 +755,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { AtomicReference failure = new AtomicReference<>(); AtomicBoolean success = new AtomicBoolean(); - proxy.failShardIfNeeded(replica, "test", new ElasticsearchException("simulated"), + proxy.failShardIfNeeded(replica, primaryTerm, "test", new ElasticsearchException("simulated"), ActionListener.wrap(r -> success.set(true), failure::set)); CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); // A replication action doesn't not fail the request @@ -836,7 +838,7 @@ action.new AsyncPrimaryAction(primaryRequest, listener, task) { if (throwExceptionOnCreation) { throw new ElasticsearchException("simulated exception, during createReplicatedOperation"); } - return new NoopReplicationOperation(request, actionListener) { + return new NoopReplicationOperation(request, actionListener, primaryTerm) { @Override public void execute() throws Exception { assertIndexShardCounter(1); @@ -1322,8 +1324,9 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService class NoopReplicationOperation extends ReplicationOperation> { - NoopReplicationOperation(Request request, ActionListener> listener) { - super(request, null, listener, null, TransportReplicationActionTests.this.logger, "noop"); + NoopReplicationOperation(Request request, ActionListener> listener, + long primaryTerm) { + super(request, null, listener, null, TransportReplicationActionTests.this.logger, "noop", primaryTerm); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 98c4f215fca8a..1a7e5a73e7523 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -278,7 +278,8 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary(index, true, 1 + randomInt(3), randomInt(2)); logger.info("using state: {}", state); ClusterServiceUtils.setState(clusterService, state); - ReplicationOperation.Replicas proxy = action.newReplicasProxy(state.metaData().index(index).primaryTerm(0)); + final long primaryTerm = state.metaData().index(index).primaryTerm(0); + ReplicationOperation.Replicas proxy = action.newReplicasProxy(); // check that at unknown node fails PlainActionFuture listener = new PlainActionFuture<>(); @@ -288,7 +289,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { TestShardRouting.newShardRouting(shardId, "NOT THERE", routingState == ShardRoutingState.RELOCATING ? state.nodes().iterator().next().getId() : null, false, routingState), new TestRequest(), - randomNonNegativeLong(), randomNonNegativeLong(), listener); + primaryTerm, randomNonNegativeLong(), randomNonNegativeLong(), listener); assertTrue(listener.isDone()); assertListenerThrows("non existent node should throw a NoNodeAvailableException", listener, NoNodeAvailableException.class); @@ -296,7 +297,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream() .filter(ShardRouting::assignedToNode).collect(Collectors.toList())); listener = new PlainActionFuture<>(); - proxy.performOn(replica, new TestRequest(), randomNonNegativeLong(), randomNonNegativeLong(), listener); + proxy.performOn(replica, new TestRequest(), primaryTerm, randomNonNegativeLong(), randomNonNegativeLong(), listener); assertFalse(listener.isDone()); CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear(); @@ -319,7 +320,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { AtomicReference failure = new AtomicReference<>(); AtomicBoolean success = new AtomicBoolean(); - proxy.failShardIfNeeded(replica, "test", new ElasticsearchException("simulated"), + proxy.failShardIfNeeded(replica, primaryTerm, "test", new ElasticsearchException("simulated"), ActionListener.wrap(r -> success.set(true), failure::set)); CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); // A write replication action proxy should fail the shard diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 64e31059c5fab..9691677f643b9 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -608,8 +608,8 @@ protected ReplicationAction(Request request, ActionListener listener, public void execute() { try { new ReplicationOperation<>(request, new PrimaryRef(), - ActionListener.wrap(result -> result.respond(listener), listener::onFailure), new ReplicasRef(), logger, opType - ).execute(); + ActionListener.wrap(result -> result.respond(listener), listener::onFailure), new ReplicasRef(), logger, opType, + primaryTerm).execute(); } catch (Exception e) { listener.onFailure(e); } @@ -678,6 +678,7 @@ class ReplicasRef implements ReplicationOperation.Replicas { public void performOn( final ShardRouting replicaRouting, final ReplicaRequest request, + final long primaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener listener) { @@ -700,12 +701,14 @@ public void performOn( } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, + ActionListener listener) { throw new UnsupportedOperationException("failing shard " + replica + " isn't supported. failure: " + message, exception); } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, + ActionListener listener) { throw new UnsupportedOperationException("can't mark " + shardId + ", aid [" + allocationId + "] as stale"); } }