From 2f76c48ea71d5b45ef531ab4c579022e82757438 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 5 Aug 2020 13:36:19 -0600 Subject: [PATCH] Propagate forceExecution when acquiring permit (#60634) Currently the transport replication action does not propagate the force execution parameter when acquiring the indexing permit. The logic to acquire the index permit supports force execution, so this parameter should be propagate. Fixes #60359. --- .../replication/TransportReplicationAction.java | 4 +++- .../support/replication/TransportWriteAction.java | 10 ++++------ .../java/org/elasticsearch/index/shard/IndexShard.java | 8 +++++++- .../resync/TransportResyncReplicationActionTests.java | 2 +- .../replication/TransportReplicationActionTests.java | 9 ++++++--- 5 files changed, 21 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index d71887eaa2489..6ce706246b489 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -120,6 +120,7 @@ public abstract class TransportReplicationAction< protected final IndicesService indicesService; protected final TransportRequestOptions transportOptions; protected final String executor; + protected final boolean forceExecutionOnPrimary; // package private for testing protected final String transportReplicaAction; @@ -158,6 +159,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings); this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings); + this.forceExecutionOnPrimary = forceExecutionOnPrimary; transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); @@ -906,7 +908,7 @@ void retryBecauseUnavailable(ShardId shardId, String message) { protected void acquirePrimaryOperationPermit(final IndexShard primary, final Request request, final ActionListener onAcquired) { - primary.acquirePrimaryOperationPermit(onAcquired, executor, request); + primary.acquirePrimaryOperationPermit(onAcquired, executor, request, forceExecutionOnPrimary); } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 362f73f0b8aff..71e76de3b7493 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -60,7 +60,6 @@ public abstract class TransportWriteAction< Response extends ReplicationResponse & WriteResponse > extends TransportReplicationAction { - private final boolean forceExecution; private final IndexingPressure indexingPressure; private final String executor; @@ -74,13 +73,12 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary); this.executor = executor; - this.forceExecution = forceExecutionOnPrimary; this.indexingPressure = indexingPressure; } @Override protected Releasable checkOperationLimits(Request request) { - return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution); + return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary); } @Override @@ -97,7 +95,7 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal // If this primary request was received directly from the network, we must mark a new primary // operation. This happens if the write action skips the reroute step (ex: rsync) or during // primary delegation, after the primary relocation hand-off. - return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution); + return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary); } } @@ -107,7 +105,7 @@ protected long primaryOperationSize(Request request) { @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecution); + return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecutionOnPrimary); } protected long replicaOperationSize(ReplicaRequest request) { @@ -163,7 +161,7 @@ protected void doRun() { @Override public boolean isForceExecution() { - return forceExecution; + return forceExecutionOnPrimary; } }); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5218cda7124a0..7cc8813392962 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2791,10 +2791,16 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { * isn't used */ public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay, Object debugInfo) { + acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, debugInfo, false); + } + + public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay, Object debugInfo, + boolean forceExecution) { verifyNotClosed(); assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting; - indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, false, debugInfo); + indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, forceExecution, + debugInfo); } /** diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index e3b16b43fe33d..fb365ed967860 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -131,7 +131,7 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { acquiredPermits.incrementAndGet(); callback.onResponse(acquiredPermits::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject(), eq(true)); when(indexShard.getReplicationGroup()).thenReturn( new ReplicationGroup(shardRoutingTable, clusterService.state().metadata().index(index).inSyncAllocationIds(shardId.id()), diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 25f27d2abfb3b..56bd370397ad2 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -126,6 +126,7 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -152,6 +153,7 @@ public static R resolveRequest(TransportRequest r private static ThreadPool threadPool; + private boolean forceExecute; private ClusterService clusterService; private TransportService transportService; private CapturingTransport transport; @@ -172,6 +174,7 @@ public static void beforeClass() { @Before public void setUp() throws Exception { super.setUp(); + forceExecute = randomBoolean(); transport = new CapturingTransport(); clusterService = createClusterService(threadPool); transportService = transport.createTransportService(clusterService.getSettings(), threadPool, @@ -839,7 +842,7 @@ public void testSeqNoIsSetOnPrimary() { //noinspection unchecked ((ActionListener)invocation.getArguments()[0]).onResponse(count::decrementAndGet); return null; - }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject(), eq(forceExecute)); when(shard.getActiveOperationsCount()).thenAnswer(i -> count.get()); final IndexService indexService = mock(IndexService.class); @@ -1272,7 +1275,7 @@ private class TestAction extends TransportReplicationAction()), - Request::new, Request::new, ThreadPool.Names.SAME); + Request::new, Request::new, ThreadPool.Names.SAME, false, forceExecute); } @Override @@ -1343,7 +1346,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED)); } return null; - }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject(), eq(forceExecute)); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; ActionListener callback = (ActionListener) invocation.getArguments()[3];