From 85b1cdbf8e99c9149910900a379d178b8e40cc80 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 1 Apr 2019 17:51:32 +0100 Subject: [PATCH 01/17] Use ChannelActionListener in OperationTransportHandler --- .../action/support/ChannelActionListener.java | 1 + .../TransportReplicationAction.java | 22 ++----------------- 2 files changed, 3 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java index b23758758e24d..b93298eefe871 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java @@ -55,6 +55,7 @@ public void onFailure(Exception e) { try { channel.sendResponse(e); } catch (Exception e1) { + e1.addSuppressed(e); logger.warn(() -> new ParameterizedMessage( "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2ff1ff3e93a9f..57845322f875c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.client.transport.NoNodeAvailableException; @@ -279,26 +280,7 @@ public OperationTransportHandler() { @Override public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { - execute(task, request, new ActionListener() { - @Override - public void onResponse(Response result) { - try { - channel.sendResponse(result); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage("Failed to send response for {}", actionName), inner); - } - } - }); + execute(task, request, new ChannelActionListener<>(channel, actionName, request)); } } From 430232097b99418d05b9a26af871302017d85fc8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 1 Apr 2019 17:57:48 +0100 Subject: [PATCH 02/17] Generic PrimaryResult --- .../admin/indices/refresh/TransportShardRefreshAction.java | 5 +++-- .../support/replication/TransportReplicationAction.java | 4 ++-- .../support/replication/TransportReplicationActionTests.java | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 0b5975cf025af..df3ff16ff8800 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -53,10 +53,11 @@ protected ReplicationResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary) { + protected PrimaryResult shardOperationOnPrimary( + BasicReplicationRequest shardRequest, IndexShard primary) { primary.refresh("api"); logger.trace("{} refresh request executed on primary", primary.shardId()); - return new PrimaryResult(shardRequest, new ReplicationResponse()); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 57845322f875c..9e3366ee5298f 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -1010,8 +1010,8 @@ public void failShard(String reason, Exception e) { } @Override - public PrimaryResult perform(Request request) throws Exception { - PrimaryResult result = shardOperationOnPrimary(request, indexShard); + public PrimaryResult perform(Request request) throws Exception { + PrimaryResult result = shardOperationOnPrimary(request, indexShard); assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest() + "] with a primary failure [" + result.finalFailure + "]"; return result; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index ffc9c2bf70a8a..7941e81512477 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1221,10 +1221,10 @@ protected TestResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception { + protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; - return new PrimaryResult(shardRequest, new TestResponse()); + return new PrimaryResult<>(shardRequest, new TestResponse()); } @Override From 16350a68f1c4bdef114810c178df7e3a9fbf45da Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 1 Apr 2019 18:01:37 +0100 Subject: [PATCH 03/17] No need for ShardReference superclass --- .../replication/TransportReplicationAction.java | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 9e3366ee5298f..333676d195c83 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -964,12 +964,13 @@ protected void acquireReplicaOperationPermit(final IndexShard replica, replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request); } - class ShardReference implements Releasable { + class PrimaryShardReference implements Releasable, + ReplicationOperation.Primary> { protected final IndexShard indexShard; private final Releasable operationLock; - ShardReference(IndexShard indexShard, Releasable operationLock) { + PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { this.indexShard = indexShard; this.operationLock = operationLock; } @@ -987,15 +988,6 @@ public ShardRouting routingEntry() { return indexShard.routingEntry(); } - } - - class PrimaryShardReference extends ShardReference - implements ReplicationOperation.Primary> { - - PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { - super(indexShard, operationLock); - } - public boolean isRelocated() { return indexShard.isRelocatedPrimary(); } From efdab0cb20ae8f4ee4d7e5bad6cddcbaf508dacc Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 1 Apr 2019 18:09:27 +0100 Subject: [PATCH 04/17] Use ActionListener instead of Channel for primary response handler --- .../TransportReplicationAction.java | 32 ++++++------------- .../TransportReplicationActionTests.java | 8 ++--- ...ReplicationAllPermitsAcquisitionTests.java | 32 ++----------------- 3 files changed, 15 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 333676d195c83..675268125cff8 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -292,7 +292,8 @@ public PrimaryOperationTransportHandler() { @Override public void messageReceived(ConcreteShardRequest request, TransportChannel channel, Task task) { - new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run(); + new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, + new ChannelActionListener<>(channel, transportPrimaryAction, request), (ReplicationTask) task).run(); } } @@ -303,15 +304,15 @@ class AsyncPrimaryAction extends AbstractRunnable { private final String targetAllocationID; // primary term of the shard this request is meant for private final long primaryTerm; - private final TransportChannel channel; + private final ActionListener listener; private final ReplicationTask replicationTask; - AsyncPrimaryAction(Request request, String targetAllocationID, long primaryTerm, TransportChannel channel, + AsyncPrimaryAction(Request request, String targetAllocationID, long primaryTerm, ActionListener listener, ReplicationTask replicationTask) { this.request = request; this.targetAllocationID = targetAllocationID; this.primaryTerm = primaryTerm; - this.channel = channel; + this.listener = listener; this.replicationTask = replicationTask; } @@ -371,9 +372,7 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere transportService.sendRequest(relocatingNode, transportPrimaryAction, new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm), transportOptions, - new TransportChannelResponseHandler(logger, channel, "rerouting indexing to target primary " + primary, - reader) { - + new ActionListenerResponseHandler(listener, reader) { @Override public void handleResponse(Response response) { setPhase(replicationTask, "finished"); @@ -403,12 +402,7 @@ public void handleException(TransportException exp) { @Override public void onFailure(Exception e) { setPhase(replicationTask, "finished"); - try { - channel.sendResponse(e); - } catch (IOException inner) { - inner.addSuppressed(e); - logger.warn("failed to send response", inner); - } + listener.onFailure(e); } private ActionListener createResponseListener(final PrimaryShardReference primaryShardReference) { @@ -433,22 +427,14 @@ public void onResponse(Response response) { } primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); - try { - channel.sendResponse(response); - } catch (IOException e) { - onFailure(e); - } + listener.onResponse(response); } @Override public void onFailure(Exception e) { primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); - try { - channel.sendResponse(e); - } catch (IOException e1) { - logger.warn("failed to send response", e); - } + listener.onFailure(e); } }; } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 7941e81512477..2007621e61e2f 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -332,7 +332,7 @@ public ClusterBlockLevel indexBlockLevel() { final PlainActionFuture listener = new PlainActionFuture<>(); final TransportReplicationAction.AsyncPrimaryAction asyncPrimaryActionWithBlocks = - actionWithBlocks.new AsyncPrimaryAction(request, targetAllocationID, primaryTerm, createTransportChannel(listener), task); + actionWithBlocks.new AsyncPrimaryAction(request, targetAllocationID, primaryTerm, listener, task); asyncPrimaryActionWithBlocks.run(); final ExecutionException exception = expectThrows(ExecutionException.class, listener::get); @@ -589,7 +589,7 @@ public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throw isRelocated.set(true); executeOnPrimary = false; } - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, createTransportChannel(listener), task) { + action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( @@ -646,7 +646,7 @@ public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws ReplicationTask task = maybeTask(); AtomicBoolean executed = new AtomicBoolean(); action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), primaryTerm, - createTransportChannel(listener), task) { + listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( @@ -817,7 +817,7 @@ public void testCounterOnPrimary() throws Exception { final boolean throwExceptionOnCreation = i == 1; final boolean throwExceptionOnRun = i == 2; final boolean respondWithError = i == 3; - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, createTransportChannel(listener), task) { + action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 1cb1bfde34ea8..13d375785ac21 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -204,7 +204,7 @@ public void testTransportReplicationActionWithAllPermits() throws Exception { Thread thread = new Thread(() -> { TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = - singlePermitAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(listener), null) { + singlePermitAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), listener, null) { @Override protected void doRun() throws Exception { if (delayed) { @@ -255,7 +255,7 @@ private void assertBlockIsPresentForDelayedOp() { final PlainActionFuture allPermitFuture = new PlainActionFuture<>(); Thread thread = new Thread(() -> { TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = - allPermitsAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(allPermitFuture), null) { + allPermitsAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), allPermitFuture, null) { @Override void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) { assertEquals("All permits must be acquired", 0, reference.indexShard.getActiveOperationsCount()); @@ -530,32 +530,4 @@ public String toString() { static class Response extends ReplicationResponse { } - - /** - * Transport channel that is needed for replica operation testing. - */ - public TransportChannel transportChannel(final PlainActionFuture listener) { - return new TransportChannel() { - - @Override - public String getProfileName() { - return ""; - } - - @Override - public void sendResponse(TransportResponse response) throws IOException { - listener.onResponse(((Response) response)); - } - - @Override - public void sendResponse(Exception exception) throws IOException { - listener.onFailure(exception); - } - - @Override - public String getChannelType() { - return "replica_test"; - } - }; - } } From e1fef4115414ad76c9fb5dd4a6009d10b7b5ea9f Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 1 Apr 2019 18:12:46 +0100 Subject: [PATCH 05/17] Use ActionListener instead of Channel for replica response handler --- .../TransportReplicationAction.java | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 675268125cff8..c0f1931a315e9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -70,7 +70,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportChannelResponseHandler; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; @@ -526,7 +525,7 @@ public void messageReceived( replicaRequest.getPrimaryTerm(), replicaRequest.getGlobalCheckpoint(), replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(), - channel, + new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), (ReplicationTask) task).run(); } @@ -551,7 +550,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio private final long primaryTerm; private final long globalCheckpoint; private final long maxSeqNoOfUpdatesOrDeletes; - private final TransportChannel channel; + private final ActionListener listener; private final IndexShard replica; /** * The task on the node with the replica shard. @@ -567,10 +566,10 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio long primaryTerm, long globalCheckpoint, long maxSeqNoOfUpdatesOrDeletes, - TransportChannel channel, + ActionListener listener, ReplicationTask task) { this.request = request; - this.channel = channel; + this.listener = listener; this.task = task; this.targetAllocationID = targetAllocationID; this.primaryTerm = primaryTerm; @@ -610,14 +609,10 @@ public void onFailure(Exception e) { public void onNewClusterState(ClusterState state) { // Forking a thread on local node via transport service so that custom transport service have an // opportunity to execute custom logic before the replica operation begins - String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]"; - TransportChannelResponseHandler handler = - new TransportChannelResponseHandler<>(logger, channel, extraMessage, - (in) -> TransportResponse.Empty.INSTANCE); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes), - handler); + new ActionListenerResponseHandler<>(listener, in -> new ReplicaResponse())); } @Override @@ -636,14 +631,8 @@ public void onTimeout(TimeValue timeout) { } protected void responseWithFailure(Exception e) { - try { - setPhase(task, "finished"); - channel.sendResponse(e); - } catch (IOException responseException) { - responseException.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage( - "failed to send error message back to client for action [{}]", transportReplicaAction), responseException); - } + setPhase(task, "finished"); + listener.onFailure(e); } @Override @@ -675,7 +664,7 @@ public void onResponse(Empty response) { } setPhase(task, "finished"); try { - channel.sendResponse(replicaResponse); + listener.onResponse(replicaResponse); } catch (Exception e) { onFailure(e); } From 4844f9acbd4071601f524baa2db364d70068b7ca Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 1 Apr 2019 18:24:57 +0100 Subject: [PATCH 06/17] Use ConcreteShardRequest throughout AsyncPrimaryAction --- .../TransportReplicationAction.java | 40 ++++++++----------- .../TransportReplicationActionTests.java | 17 +++++--- ...ReplicationAllPermitsAcquisitionTests.java | 8 +++- 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index c0f1931a315e9..2d1f59ac79e25 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -291,33 +291,26 @@ public PrimaryOperationTransportHandler() { @Override public void messageReceived(ConcreteShardRequest request, TransportChannel channel, Task task) { - new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, - new ChannelActionListener<>(channel, transportPrimaryAction, request), (ReplicationTask) task).run(); + new AsyncPrimaryAction( + request, new ChannelActionListener<>(channel, transportPrimaryAction, request), (ReplicationTask) task).run(); } } class AsyncPrimaryAction extends AbstractRunnable { - - private final Request request; - // targetAllocationID of the shard this request is meant for - private final String targetAllocationID; - // primary term of the shard this request is meant for - private final long primaryTerm; private final ActionListener listener; private final ReplicationTask replicationTask; + private final ConcreteShardRequest primaryRequest; - AsyncPrimaryAction(Request request, String targetAllocationID, long primaryTerm, ActionListener listener, + AsyncPrimaryAction(ConcreteShardRequest primaryRequest, ActionListener listener, ReplicationTask replicationTask) { - this.request = request; - this.targetAllocationID = targetAllocationID; - this.primaryTerm = primaryTerm; + this.primaryRequest = primaryRequest; this.listener = listener; this.replicationTask = replicationTask; } @Override protected void doRun() throws Exception { - final ShardId shardId = request.shardId(); + final ShardId shardId = primaryRequest.getRequest().shardId(); final IndexShard indexShard = getIndexShard(shardId); final ShardRouting shardRouting = indexShard.routingEntry(); // we may end up here if the cluster state used to route the primary is so stale that the underlying @@ -327,17 +320,17 @@ protected void doRun() throws Exception { throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting); } final String actualAllocationId = shardRouting.allocationId().getId(); - if (actualAllocationId.equals(targetAllocationID) == false) { - throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", targetAllocationID, - actualAllocationId); + if (actualAllocationId.equals(primaryRequest.getTargetAllocationID()) == false) { + throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", + primaryRequest.getTargetAllocationID(), actualAllocationId); } final long actualTerm = indexShard.getPendingPrimaryTerm(); - if (actualTerm != primaryTerm) { - throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", targetAllocationID, - primaryTerm, actualTerm); + if (actualTerm != primaryRequest.getPrimaryTerm()) { + throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", + primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm); } - acquirePrimaryOperationPermit(indexShard, request, ActionListener.wrap( + acquirePrimaryOperationPermit(indexShard, primaryRequest.getRequest(), ActionListener.wrap( releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)), this::onFailure )); @@ -369,7 +362,8 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere }; DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, - new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm), + new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(), + primaryRequest.getPrimaryTerm()), transportOptions, new ActionListenerResponseHandler(listener, reader) { @Override @@ -387,7 +381,7 @@ public void handleException(TransportException exp) { } else { setPhase(replicationTask, "primary"); final ActionListener listener = createResponseListener(primaryShardReference); - createReplicatedOperation(request, + createReplicatedOperation(primaryRequest.getRequest(), ActionListener.wrap(result -> result.respond(listener), listener::onFailure), primaryShardReference) .execute(); @@ -442,7 +436,7 @@ protected ReplicationOperation> listener, PrimaryShardReference primaryShardReference) { return new ReplicationOperation<>(request, primaryShardReference, listener, - newReplicasProxy(primaryTerm), logger, actionName); + newReplicasProxy(primaryRequest.getPrimaryTerm()), logger, actionName); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 2007621e61e2f..de15e8ee37e1c 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -331,8 +331,10 @@ public ClusterBlockLevel indexBlockLevel() { final ReplicationTask task = maybeTask(); final PlainActionFuture listener = new PlainActionFuture<>(); + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, targetAllocationID, primaryTerm); final TransportReplicationAction.AsyncPrimaryAction asyncPrimaryActionWithBlocks = - actionWithBlocks.new AsyncPrimaryAction(request, targetAllocationID, primaryTerm, listener, task); + actionWithBlocks.new AsyncPrimaryAction(primaryRequest, listener, task); asyncPrimaryActionWithBlocks.run(); final ExecutionException exception = expectThrows(ExecutionException.class, listener::get); @@ -589,7 +591,9 @@ public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throw isRelocated.set(true); executeOnPrimary = false; } - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, listener, task) { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm); + action.new AsyncPrimaryAction(primaryRequest, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( @@ -645,8 +649,9 @@ public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); AtomicBoolean executed = new AtomicBoolean(); - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), primaryTerm, - listener, task) { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getRelocationId(), primaryTerm); + action.new AsyncPrimaryAction(primaryRequest, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( @@ -817,7 +822,9 @@ public void testCounterOnPrimary() throws Exception { final boolean throwExceptionOnCreation = i == 1; final boolean throwExceptionOnRun = i == 2; final boolean respondWithError = i == 3; - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, listener, task) { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm); + action.new AsyncPrimaryAction(primaryRequest, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 13d375785ac21..0e78a2dc99958 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -203,8 +203,10 @@ public void testTransportReplicationActionWithAllPermits() throws Exception { actions[threadId] = singlePermitAction; Thread thread = new Thread(() -> { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request(), allocationId(), primaryTerm()); TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = - singlePermitAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), listener, null) { + singlePermitAction.new AsyncPrimaryAction(primaryRequest, listener, null) { @Override protected void doRun() throws Exception { if (delayed) { @@ -254,8 +256,10 @@ private void assertBlockIsPresentForDelayedOp() { final PlainActionFuture allPermitFuture = new PlainActionFuture<>(); Thread thread = new Thread(() -> { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request(), allocationId(), primaryTerm()); TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = - allPermitsAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), allPermitFuture, null) { + allPermitsAction.new AsyncPrimaryAction(primaryRequest, allPermitFuture, null) { @Override void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) { assertEquals("All permits must be acquired", 0, reference.indexShard.getActiveOperationsCount()); From 2135ff9a0d240466581bcb6e65b0f49a4041716f Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 1 Apr 2019 18:28:30 +0100 Subject: [PATCH 07/17] Use ConcreteReplicaRequest throughout AbstractReplicaAction --- .../TransportReplicationAction.java | 55 ++++++------------- 1 file changed, 18 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2d1f59ac79e25..2c309dc719141 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -513,12 +513,7 @@ public void messageReceived( final TransportChannel channel, final Task task) throws Exception { - new AsyncReplicaAction( - replicaRequest.getRequest(), - replicaRequest.getTargetAllocationID(), - replicaRequest.getPrimaryTerm(), - replicaRequest.getGlobalCheckpoint(), - replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(), + new AsyncReplicaAction(replicaRequest, new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), (ReplicationTask) task).run(); } @@ -538,12 +533,6 @@ public RetryOnReplicaException(StreamInput in) throws IOException { } private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener { - private final ReplicaRequest request; - // allocation id of the replica this request is meant for - private final String targetAllocationID; - private final long primaryTerm; - private final long globalCheckpoint; - private final long maxSeqNoOfUpdatesOrDeletes; private final ActionListener listener; private final IndexShard replica; /** @@ -553,23 +542,14 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio // important: we pass null as a timeout as failing a replica is // something we want to avoid at all costs private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); + private final ConcreteReplicaRequest replicaRequest; - AsyncReplicaAction( - ReplicaRequest request, - String targetAllocationID, - long primaryTerm, - long globalCheckpoint, - long maxSeqNoOfUpdatesOrDeletes, - ActionListener listener, - ReplicationTask task) { - this.request = request; + AsyncReplicaAction(ConcreteReplicaRequest replicaRequest, ActionListener listener, + ReplicationTask task) { + this.replicaRequest = replicaRequest; this.listener = listener; this.task = task; - this.targetAllocationID = targetAllocationID; - this.primaryTerm = primaryTerm; - this.globalCheckpoint = globalCheckpoint; - this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; - final ShardId shardId = request.shardId(); + final ShardId shardId = replicaRequest.getRequest().shardId(); assert shardId != null : "request shardId must be set"; this.replica = getIndexShard(shardId); } @@ -577,7 +557,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio @Override public void onResponse(Releasable releasable) { try { - final ReplicaResult replicaResult = shardOperationOnReplica(request, replica); + final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica); releasable.close(); // release shard operation lock before responding to caller final TransportReplicationAction.ReplicaResponse response = new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint()); @@ -595,17 +575,16 @@ public void onFailure(Exception e) { () -> new ParameterizedMessage( "Retrying operation on replica, action [{}], request [{}]", transportReplicaAction, - request), + replicaRequest.getRequest()), e); - request.onRetry(); + replicaRequest.getRequest().onRetry(); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { // Forking a thread on local node via transport service so that custom transport service have an // opportunity to execute custom logic before the replica operation begins transportService.sendRequest(clusterService.localNode(), transportReplicaAction, - new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, - globalCheckpoint, maxSeqNoOfUpdatesOrDeletes), + replicaRequest, new ActionListenerResponseHandler<>(listener, in -> new ReplicaResponse())); } @@ -633,11 +612,12 @@ protected void responseWithFailure(Exception e) { protected void doRun() throws Exception { setPhase(task, "replica"); final String actualAllocationId = this.replica.routingEntry().allocationId().getId(); - if (actualAllocationId.equals(targetAllocationID) == false) { - throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", targetAllocationID, - actualAllocationId); + if (actualAllocationId.equals(replicaRequest.getTargetAllocationID()) == false) { + throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", + replicaRequest.getTargetAllocationID(), actualAllocationId); } - acquireReplicaOperationPermit(replica, request, this, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); + acquireReplicaOperationPermit(replica, replicaRequest.getRequest(), this, replicaRequest.getPrimaryTerm(), + replicaRequest.getGlobalCheckpoint(), replicaRequest.getMaxSeqNoOfUpdatesOrDeletes()); } /** @@ -653,8 +633,9 @@ private class ResponseListener implements ActionListener Date: Mon, 1 Apr 2019 21:35:53 +0100 Subject: [PATCH 08/17] Replace OperationTransportHandler with lambda --- .../resync/TransportResyncReplicationAction.java | 2 +- .../replication/TransportReplicationAction.java | 14 +++----------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index c3aa39009ce8f..acdea214dcc2e 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -66,7 +66,7 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran @Override protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler()); + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); // we should never reject resync because of thread pool capacity on primary transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2c309dc719141..5bb6805ecbcb4 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -154,7 +154,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler()); + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, new PrimaryOperationTransportHandler()); // we must never reject on because of thread pool capacity on replicas @@ -271,16 +271,8 @@ boolean isRetryableClusterBlockException(final Throwable e) { return false; } - protected class OperationTransportHandler implements TransportRequestHandler { - - public OperationTransportHandler() { - - } - - @Override - public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { - execute(task, request, new ChannelActionListener<>(channel, actionName, request)); - } + protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) { + execute(task, request, new ChannelActionListener<>(channel, actionName, request)); } protected class PrimaryOperationTransportHandler implements TransportRequestHandler> { From a076dec23aa70713a5ce40e8f2d741b99b7170bc Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 1 Apr 2019 21:39:44 +0100 Subject: [PATCH 09/17] Replace PrimaryOperationTransportHandler with lambda --- .../resync/TransportResyncReplicationAction.java | 2 +- .../replication/TransportReplicationAction.java | 16 ++++------------ .../TransportReplicationActionTests.java | 6 ++---- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index acdea214dcc2e..204321d174a0c 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -71,7 +71,7 @@ protected void registerRequestHandlers(String actionName, TransportService trans transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, true, true, - new PrimaryOperationTransportHandler()); + this::handlePrimaryRequest); transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 5bb6805ecbcb4..41ddf4f972310 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -156,7 +156,7 @@ protected void registerRequestHandlers(String actionName, TransportService trans Supplier replicaRequest, String executor) { transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, - new PrimaryOperationTransportHandler()); + this::handlePrimaryRequest); // we must never reject on because of thread pool capacity on replicas transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), @@ -275,17 +275,9 @@ protected void handleOperationRequest(final Request request, final TransportChan execute(task, request, new ChannelActionListener<>(channel, actionName, request)); } - protected class PrimaryOperationTransportHandler implements TransportRequestHandler> { - - public PrimaryOperationTransportHandler() { - - } - - @Override - public void messageReceived(ConcreteShardRequest request, TransportChannel channel, Task task) { - new AsyncPrimaryAction( - request, new ChannelActionListener<>(channel, transportPrimaryAction, request), (ReplicationTask) task).run(); - } + protected void handlePrimaryRequest(final ConcreteShardRequest request, final TransportChannel channel, final Task task) { + new AsyncPrimaryAction( + request, new ChannelActionListener<>(channel, transportPrimaryAction, request), (ReplicationTask) task).run(); } class AsyncPrimaryAction extends AbstractRunnable { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index de15e8ee37e1c..62d0826e64fce 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -797,9 +797,7 @@ protected IndexShard getIndexShard(ShardId shardId) { } }; - TransportReplicationAction.PrimaryOperationTransportHandler primaryPhase = - action.new PrimaryOperationTransportHandler(); - primaryPhase.messageReceived(concreteShardRequest, createTransportChannel(listener), null); + action.handlePrimaryRequest(concreteShardRequest, createTransportChannel(listener), null); CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests(); assertThat(requestsToReplicas, arrayWithSize(1)); assertThat(((TransportReplicationAction.ConcreteShardRequest) requestsToReplicas[0].request).getPrimaryTerm(), @@ -945,7 +943,7 @@ public void testPrimaryActionRejectsWrongAidOrWrongTerm() throws Exception { final boolean wrongAllocationId = randomBoolean(); final long requestTerm = wrongAllocationId && randomBoolean() ? primaryTerm : primaryTerm + randomIntBetween(1, 10); Request request = new Request(shardId).timeout("1ms"); - action.new PrimaryOperationTransportHandler().messageReceived( + action.handlePrimaryRequest( new TransportReplicationAction.ConcreteShardRequest<>(request, wrongAllocationId ? "_not_a_valid_aid_" : primary.allocationId().getId(), requestTerm), From 83c6c677d73b4ce7c6ee97cc352d600ae63e3b0a Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 1 Apr 2019 21:48:32 +0100 Subject: [PATCH 10/17] Replace ReplicaOperationTransportHandler with lambda --- .../TransportResyncReplicationAction.java | 2 +- .../TransportReplicationAction.java | 23 +++++-------------- .../TransportReplicationActionTests.java | 11 ++++----- ...ReplicationAllPermitsAcquisitionTests.java | 3 +-- 4 files changed, 12 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 204321d174a0c..80a55e8ab2d41 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -75,7 +75,7 @@ protected void registerRequestHandlers(String actionName, TransportService trans transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, - new ReplicaOperationTransportHandler()); + this::handleReplicaRequest); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 41ddf4f972310..f0cfd69bf3418 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -158,10 +158,8 @@ protected void registerRequestHandlers(String actionName, TransportService trans transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, this::handlePrimaryRequest); // we must never reject on because of thread pool capacity on replicas - transportService.registerRequestHandler(transportReplicaAction, - () -> new ConcreteReplicaRequest<>(replicaRequest), - executor, true, true, - new ReplicaOperationTransportHandler()); + transportService.registerRequestHandler( + transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); } @Override @@ -489,19 +487,10 @@ public void respond(ActionListener listener) { } } - public class ReplicaOperationTransportHandler implements TransportRequestHandler> { - - @Override - public void messageReceived( - final ConcreteReplicaRequest replicaRequest, - final TransportChannel channel, - final Task task) - throws Exception { - new AsyncReplicaAction(replicaRequest, - new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), - (ReplicationTask) task).run(); - } - + protected void handleReplicaRequest(final ConcreteReplicaRequest replicaRequest, + final TransportChannel channel, final Task task) { + new AsyncReplicaAction( + replicaRequest, new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), (ReplicationTask) task).run(); } public static class RetryOnReplicaException extends ElasticsearchException { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 62d0826e64fce..9164d9e4184eb 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -885,9 +885,8 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl return new ReplicaResult(); } }; - final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); try { - replicaOperationTransportHandler.messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>( new Request().setShardId(shardId), replicaRouting.allocationId().getId(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), @@ -978,7 +977,7 @@ public void testReplicaActionRejectsWrongAid() throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); Request request = new Request(shardId).timeout("1ms"); - action.new ReplicaOperationTransportHandler().messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>(request, "_not_a_valid_aid_", randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), createTransportChannel(listener), maybeTask() @@ -1020,12 +1019,11 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl return new ReplicaResult(); } }; - final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); final PlainActionFuture listener = new PlainActionFuture<>(); final Request request = new Request().setShardId(shardId); final long checkpoint = randomNonNegativeLong(); final long maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong(); - replicaOperationTransportHandler.messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, checkpoint, maxSeqNoOfUpdatesOrDeletes), createTransportChannel(listener), task); @@ -1089,12 +1087,11 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl return new ReplicaResult(); } }; - final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); final PlainActionFuture listener = new PlainActionFuture<>(); final Request request = new Request().setShardId(shardId); final long checkpoint = randomNonNegativeLong(); final long maxSeqNoOfUpdates = randomNonNegativeLong(); - replicaOperationTransportHandler.messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, checkpoint, maxSeqNoOfUpdates), createTransportChannel(listener), task); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 0e78a2dc99958..8fe204cee2c34 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -411,9 +411,8 @@ protected void sendReplicaRequest(final ConcreteReplicaRequest replicaR final DiscoveryNode node, final ActionListener listener) { assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2"), node); - ReplicaOperationTransportHandler replicaOperationTransportHandler = new ReplicaOperationTransportHandler(); try { - replicaOperationTransportHandler.messageReceived(replicaRequest, new TransportChannel() { + handleReplicaRequest(replicaRequest, new TransportChannel() { @Override public String getProfileName() { return null; From 3d7e753d33e12edaffc05837f45e88ece9079d8e Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 1 Apr 2019 21:51:37 +0100 Subject: [PATCH 11/17] Allow subclasses to control whether primary action is forced Today this requires overriding and reimplementing registerRequestHandlers() but it's clearer to highlight the difference like this --- .../TransportResyncReplicationAction.java | 13 ++----------- .../TransportReplicationAction.java | 19 +++++++++---------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 80a55e8ab2d41..18f14bae4d514 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -64,18 +64,9 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran } @Override - protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, - Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); + protected boolean forcePrimaryActionExecution() { // we should never reject resync because of thread pool capacity on primary - transportService.registerRequestHandler(transportPrimaryAction, - () -> new ConcreteShardRequest<>(request), - executor, true, true, - this::handlePrimaryRequest); - transportService.registerRequestHandler(transportReplicaAction, - () -> new ConcreteReplicaRequest<>(replicaRequest), - executor, true, true, - this::handleReplicaRequest); + return true; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index f0cfd69bf3418..72a6d604e7c27 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -72,7 +72,6 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse.Empty; @@ -145,21 +144,21 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans this.transportPrimaryAction = actionName + "[p]"; this.transportReplicaAction = actionName + "[r]"; - registerRequestHandlers(actionName, transportService, request, replicaRequest, executor); + + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); + transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, true, + forcePrimaryActionExecution(), this::handlePrimaryRequest); + // we must never reject on because of thread pool capacity on replicas + transportService.registerRequestHandler( + transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); this.transportOptions = transportOptions(settings); this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation; } - protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, - Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); - transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, - this::handlePrimaryRequest); - // we must never reject on because of thread pool capacity on replicas - transportService.registerRequestHandler( - transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); + protected boolean forcePrimaryActionExecution() { + return false; } @Override From df7437cd27361b4f375c0c852bea53a2086fd2c2 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 1 Apr 2019 23:16:23 +0100 Subject: [PATCH 12/17] Imports --- .../action/resync/TransportResyncReplicationAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 18f14bae4d514..1a9d3617d4538 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -47,7 +47,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.function.Supplier; public class TransportResyncReplicationAction extends TransportWriteAction implements PrimaryReplicaSyncer.SyncAction { From 9f89af76def0f9206edf1acaab4afdb5a13c72f3 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Apr 2019 00:36:17 +0100 Subject: [PATCH 13/17] Revert "Allow subclasses to control whether primary action is forced" This reverts commit 3d7e753d33e12edaffc05837f45e88ece9079d8e. --- .../TransportResyncReplicationAction.java | 14 ++++++++++++-- .../TransportReplicationAction.java | 18 +++++++++--------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 1a9d3617d4538..80a55e8ab2d41 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.function.Supplier; public class TransportResyncReplicationAction extends TransportWriteAction implements PrimaryReplicaSyncer.SyncAction { @@ -63,9 +64,18 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran } @Override - protected boolean forcePrimaryActionExecution() { + protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, + Supplier replicaRequest, String executor) { + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); // we should never reject resync because of thread pool capacity on primary - return true; + transportService.registerRequestHandler(transportPrimaryAction, + () -> new ConcreteShardRequest<>(request), + executor, true, true, + this::handlePrimaryRequest); + transportService.registerRequestHandler(transportReplicaAction, + () -> new ConcreteReplicaRequest<>(replicaRequest), + executor, true, true, + this::handleReplicaRequest); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 72a6d604e7c27..639e389cdbca4 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -144,21 +144,21 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans this.transportPrimaryAction = actionName + "[p]"; this.transportReplicaAction = actionName + "[r]"; - - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); - transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, true, - forcePrimaryActionExecution(), this::handlePrimaryRequest); - // we must never reject on because of thread pool capacity on replicas - transportService.registerRequestHandler( - transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); + registerRequestHandlers(actionName, transportService, request, replicaRequest, executor); this.transportOptions = transportOptions(settings); this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation; } - protected boolean forcePrimaryActionExecution() { - return false; + protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, + Supplier replicaRequest, String executor) { + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); + transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, + this::handlePrimaryRequest); + // we must never reject on because of thread pool capacity on replicas + transportService.registerRequestHandler( + transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); } @Override From a5bfc846d86cd8f18637fab2017534d5a839166f Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Apr 2019 00:42:58 +0100 Subject: [PATCH 14/17] Revert "Revert "Allow subclasses to control whether primary action is forced"" This reverts commit 9f89af76def0f9206edf1acaab4afdb5a13c72f3. --- .../TransportResyncReplicationAction.java | 14 ++------------ .../TransportReplicationAction.java | 18 +++++++++--------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 80a55e8ab2d41..1a9d3617d4538 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -47,7 +47,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.function.Supplier; public class TransportResyncReplicationAction extends TransportWriteAction implements PrimaryReplicaSyncer.SyncAction { @@ -64,18 +63,9 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran } @Override - protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, - Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); + protected boolean forcePrimaryActionExecution() { // we should never reject resync because of thread pool capacity on primary - transportService.registerRequestHandler(transportPrimaryAction, - () -> new ConcreteShardRequest<>(request), - executor, true, true, - this::handlePrimaryRequest); - transportService.registerRequestHandler(transportReplicaAction, - () -> new ConcreteReplicaRequest<>(replicaRequest), - executor, true, true, - this::handleReplicaRequest); + return true; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 639e389cdbca4..72a6d604e7c27 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -144,21 +144,21 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans this.transportPrimaryAction = actionName + "[p]"; this.transportReplicaAction = actionName + "[r]"; - registerRequestHandlers(actionName, transportService, request, replicaRequest, executor); + + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); + transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, true, + forcePrimaryActionExecution(), this::handlePrimaryRequest); + // we must never reject on because of thread pool capacity on replicas + transportService.registerRequestHandler( + transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); this.transportOptions = transportOptions(settings); this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation; } - protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, - Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); - transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, - this::handlePrimaryRequest); - // we must never reject on because of thread pool capacity on replicas - transportService.registerRequestHandler( - transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); + protected boolean forcePrimaryActionExecution() { + return false; } @Override From fb06813b165f799952112919cd6fa8e3cb99504e Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Apr 2019 15:46:59 +0100 Subject: [PATCH 15/17] Rename listener -> onCompletionListener --- .../TransportReplicationAction.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 72a6d604e7c27..a66f654600f06 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -278,14 +278,14 @@ protected void handlePrimaryRequest(final ConcreteShardRequest request, } class AsyncPrimaryAction extends AbstractRunnable { - private final ActionListener listener; + private final ActionListener onCompletionListener; private final ReplicationTask replicationTask; private final ConcreteShardRequest primaryRequest; - AsyncPrimaryAction(ConcreteShardRequest primaryRequest, ActionListener listener, + AsyncPrimaryAction(ConcreteShardRequest primaryRequest, ActionListener onCompletionListener, ReplicationTask replicationTask) { this.primaryRequest = primaryRequest; - this.listener = listener; + this.onCompletionListener = onCompletionListener; this.replicationTask = replicationTask; } @@ -346,7 +346,7 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(), primaryRequest.getPrimaryTerm()), transportOptions, - new ActionListenerResponseHandler(listener, reader) { + new ActionListenerResponseHandler(onCompletionListener, reader) { @Override public void handleResponse(Response response) { setPhase(replicationTask, "finished"); @@ -376,7 +376,7 @@ public void handleException(TransportException exp) { @Override public void onFailure(Exception e) { setPhase(replicationTask, "finished"); - listener.onFailure(e); + onCompletionListener.onFailure(e); } private ActionListener createResponseListener(final PrimaryShardReference primaryShardReference) { @@ -401,14 +401,14 @@ public void onResponse(Response response) { } primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); - listener.onResponse(response); + onCompletionListener.onResponse(response); } @Override public void onFailure(Exception e) { primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); - listener.onFailure(e); + onCompletionListener.onFailure(e); } }; } @@ -505,7 +505,7 @@ public RetryOnReplicaException(StreamInput in) throws IOException { } private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener { - private final ActionListener listener; + private final ActionListener onCompletionListener; private final IndexShard replica; /** * The task on the node with the replica shard. @@ -516,10 +516,10 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); private final ConcreteReplicaRequest replicaRequest; - AsyncReplicaAction(ConcreteReplicaRequest replicaRequest, ActionListener listener, + AsyncReplicaAction(ConcreteReplicaRequest replicaRequest, ActionListener onCompletionListener, ReplicationTask task) { this.replicaRequest = replicaRequest; - this.listener = listener; + this.onCompletionListener = onCompletionListener; this.task = task; final ShardId shardId = replicaRequest.getRequest().shardId(); assert shardId != null : "request shardId must be set"; @@ -557,7 +557,7 @@ public void onNewClusterState(ClusterState state) { // opportunity to execute custom logic before the replica operation begins transportService.sendRequest(clusterService.localNode(), transportReplicaAction, replicaRequest, - new ActionListenerResponseHandler<>(listener, in -> new ReplicaResponse())); + new ActionListenerResponseHandler<>(onCompletionListener, in -> new ReplicaResponse())); } @Override @@ -577,7 +577,7 @@ public void onTimeout(TimeValue timeout) { protected void responseWithFailure(Exception e) { setPhase(task, "finished"); - listener.onFailure(e); + onCompletionListener.onFailure(e); } @Override @@ -611,7 +611,7 @@ public void onResponse(Empty response) { } setPhase(task, "finished"); try { - listener.onResponse(replicaResponse); + onCompletionListener.onResponse(replicaResponse); } catch (Exception e) { onFailure(e); } From ffc72568e30a271beebd3699f0f4201da8dce956 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Apr 2019 15:51:57 +0100 Subject: [PATCH 16/17] onCompletionListener.onResponse cannot fail --- .../support/replication/TransportReplicationAction.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index a66f654600f06..8b15222e68885 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -610,11 +610,7 @@ public void onResponse(Empty response) { replicaRequest.getRequest()); } setPhase(task, "finished"); - try { - onCompletionListener.onResponse(replicaResponse); - } catch (Exception e) { - onFailure(e); - } + onCompletionListener.onResponse(replicaResponse); } @Override From c4d9c408beff8e0ee74f42e7723e359b42683305 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Apr 2019 17:06:22 +0100 Subject: [PATCH 17/17] Revert "Revert "Revert "Allow subclasses to control whether primary action is forced""" This reverts commit a5bfc846d86cd8f18637fab2017534d5a839166f. --- .../TransportResyncReplicationAction.java | 14 ++++++++++++-- .../TransportReplicationAction.java | 18 +++++++++--------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 1a9d3617d4538..80a55e8ab2d41 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.function.Supplier; public class TransportResyncReplicationAction extends TransportWriteAction implements PrimaryReplicaSyncer.SyncAction { @@ -63,9 +64,18 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran } @Override - protected boolean forcePrimaryActionExecution() { + protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, + Supplier replicaRequest, String executor) { + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); // we should never reject resync because of thread pool capacity on primary - return true; + transportService.registerRequestHandler(transportPrimaryAction, + () -> new ConcreteShardRequest<>(request), + executor, true, true, + this::handlePrimaryRequest); + transportService.registerRequestHandler(transportReplicaAction, + () -> new ConcreteReplicaRequest<>(replicaRequest), + executor, true, true, + this::handleReplicaRequest); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 8b15222e68885..ef294b52803de 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -144,21 +144,21 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans this.transportPrimaryAction = actionName + "[p]"; this.transportReplicaAction = actionName + "[r]"; - - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); - transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, true, - forcePrimaryActionExecution(), this::handlePrimaryRequest); - // we must never reject on because of thread pool capacity on replicas - transportService.registerRequestHandler( - transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); + registerRequestHandlers(actionName, transportService, request, replicaRequest, executor); this.transportOptions = transportOptions(settings); this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation; } - protected boolean forcePrimaryActionExecution() { - return false; + protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, + Supplier replicaRequest, String executor) { + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); + transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, + this::handlePrimaryRequest); + // we must never reject on because of thread pool capacity on replicas + transportService.registerRequestHandler( + transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); } @Override