From d10bc9fbe8cb1c0bfdade01c7d6b39269a18c4b2 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Fri, 4 Nov 2022 18:21:24 +0530 Subject: [PATCH] [Backport 2.x] Cluster Manager task throttling (#5041) * Add basic thorttler/exponential backoff policy for retry/Defination of throttling exception (#3856) * Corrected Java doc for Throttler * Changed the default behaviour of Throttler to return Optional * Removed generics from Throttler and used String as key * Ignore backport / autocut / dependabot branches for gradle checks on push * Master node changes for master task throttling (#3882) * Data node changes for master task throttling (#4204) * Onboarding of few task types to throttling (#4542) * Fix timeout exception and Add Integ test for Master task throttling (#4588) * Complete TODO for version change and removed unused classes(Throttler and Semaphore) (#4846) * Remove V1 version from throttling testcase Signed-off-by: Dhwanil Patel --- CHANGELOG.md | 7 + .../ClusterManagerTaskThrottlingIT.java | 214 ++++++++++ .../org/opensearch/OpenSearchException.java | 7 + .../TransportClusterRerouteAction.java | 10 + .../TransportClusterUpdateSettingsAction.java | 13 + .../TransportDeleteStoredScriptAction.java | 7 +- .../TransportPutStoredScriptAction.java | 7 +- .../indices/create/AutoCreateAction.java | 11 + .../TransportDeleteDanglingIndexAction.java | 10 + .../datastream/DeleteDataStreamAction.java | 10 + .../rollover/TransportRolloverAction.java | 10 + .../opensearch/action/bulk/BackoffPolicy.java | 126 ++++++ .../action/support/RetryableAction.java | 41 +- .../TransportClusterManagerNodeAction.java | 55 ++- .../cluster/ClusterStateTaskExecutor.java | 11 + .../MetadataCreateDataStreamService.java | 10 + .../metadata/MetadataCreateIndexService.java | 11 + .../metadata/MetadataDeleteIndexService.java | 12 + .../metadata/MetadataIndexAliasesService.java | 12 + .../MetadataIndexTemplateService.java | 52 +++ .../metadata/MetadataMappingService.java | 12 + .../MetadataUpdateSettingsService.java | 11 + .../service/ClusterManagerTaskKeys.java | 49 +++ .../service/ClusterManagerTaskThrottler.java | 200 ++++++++++ .../ClusterManagerTaskThrottlerListener.java | 16 + .../ClusterManagerThrottlingException.java | 28 ++ .../ClusterManagerThrottlingStats.java | 42 ++ .../cluster/service/ClusterService.java | 11 + .../cluster/service/MasterService.java | 36 +- .../cluster/service/TaskBatcher.java | 72 ++-- .../cluster/service/TaskBatcherListener.java | 41 ++ .../common/settings/ClusterSettings.java | 3 +- .../org/opensearch/ingest/IngestService.java | 19 +- .../PersistentTasksClusterService.java | 32 ++ .../repositories/RepositoriesService.java | 17 + .../blobstore/BlobStoreRepository.java | 6 + .../org/opensearch/script/ScriptService.java | 13 + .../opensearch/snapshots/RestoreService.java | 13 + .../snapshots/SnapshotsService.java | 36 +- .../ExceptionSerializationTests.java | 2 + .../action/bulk/BackoffPolicyTests.java | 41 ++ ...ransportClusterManagerNodeActionTests.java | 102 +++++ .../MetadataDeleteIndexServiceTests.java | 3 +- .../MetadataIndexAliasesServiceTests.java | 3 +- .../MetadataIndexTemplateServiceTests.java | 29 +- .../ClusterManagerTaskThrottlerTests.java | 366 ++++++++++++++++++ .../cluster/service/MasterServiceTests.java | 267 +++++++++++++ .../cluster/service/TaskBatcherTests.java | 28 +- .../PersistentTasksClusterServiceTests.java | 17 +- 49 files changed, 2058 insertions(+), 93 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/clustermanager/ClusterManagerTaskThrottlingIT.java create mode 100644 server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java create mode 100644 server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java create mode 100644 server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerListener.java create mode 100644 server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java create mode 100644 server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingStats.java create mode 100644 server/src/main/java/org/opensearch/cluster/service/TaskBatcherListener.java create mode 100644 server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index d803a9028249f..ffcf8b921d240 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,13 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Added resource usage trackers for in-flight cancellation of SearchShardTask ([#4805](https://github.com/opensearch-project/OpenSearch/pull/4805)) - Added search backpressure stats API ([#4932](https://github.com/opensearch-project/OpenSearch/pull/4932)) - Added changes for graceful node decommission ([#4586](https://github.com/opensearch-project/OpenSearch/pull/4586)) +- Add Cluster manager task throttling framework. Cluster manager node will throttle task submission based on throttling thresholds. + This throttling will be at task type level. Data nodes will perform retries on these throttling exception with exponential delay. (PR: [#4986](https://github.com/opensearch-project/OpenSearch/pull/4986)) ( Issue : [#479](https://github.com/opensearch-project/OpenSearch/issues/479)) + - Throttling Exception / New Backoff policy([#3527](https://github.com/opensearch-project/OpenSearch/pull/3527)) + - Cluster Manager node side change([#3882](https://github.com/opensearch-project/OpenSearch/pull/3882)) + - Data node side change([#4204](https://github.com/opensearch-project/OpenSearch/pull/4204)) + - on-boarding of tasks([#4542](https://github.com/opensearch-project/OpenSearch/pull/4542)) + - Integs ([4588](https://github.com/opensearch-project/OpenSearch/pull/4588)) ### Dependencies - Bumps `com.diffplug.spotless` from 6.9.1 to 6.10.0 diff --git a/server/src/internalClusterTest/java/org/opensearch/clustermanager/ClusterManagerTaskThrottlingIT.java b/server/src/internalClusterTest/java/org/opensearch/clustermanager/ClusterManagerTaskThrottlingIT.java new file mode 100644 index 0000000000000..2f3f0539602dd --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/clustermanager/ClusterManagerTaskThrottlingIT.java @@ -0,0 +1,214 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.clustermanager; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException; +import org.opensearch.cluster.service.ClusterManagerThrottlingException; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.transport.TransportService; +import org.opensearch.transport.TransportMessageListener; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0) +public class ClusterManagerTaskThrottlingIT extends OpenSearchIntegTestCase { + + /* + * This integ test will test end-end cluster manager throttling feature for + * remote cluster manager. + * + * It will check the number of request coming to cluster manager node + * should be total number of requests + throttled requests from cluster manager. + * This will ensure the end-end feature is working as cluster manager is throwing + * Throttling exception and data node is performing retries on it. + * + */ + public void testThrottlingForRemoteClusterManager() throws Exception { + try { + internalCluster().beforeTest(random()); + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + int throttlingLimit = randomIntBetween(1, 5); + createIndex("test"); + setPutMappingThrottlingLimit(throttlingLimit); + + TransportService clusterManagerTransportService = (internalCluster().getInstance(TransportService.class, clusterManagerNode)); + AtomicInteger requestCountOnClusterManager = new AtomicInteger(); + AtomicInteger throttledRequest = new AtomicInteger(); + int totalRequest = randomIntBetween(throttlingLimit, 5 * throttlingLimit); + CountDownLatch latch = new CountDownLatch(totalRequest); + + clusterManagerTransportService.addMessageListener(new TransportMessageListener() { + @Override + public void onRequestReceived(long requestId, String action) { + if (action.contains("mapping")) { + requestCountOnClusterManager.incrementAndGet(); + } + } + + @Override + public void onResponseSent(long requestId, String action, Exception error) { + if (action.contains("mapping")) { + throttledRequest.incrementAndGet(); + assertEquals(ClusterManagerThrottlingException.class, error.getClass()); + } + } + }); + + ActionListener listener = new ActionListener() { + @Override + public void onResponse(Object o) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + throw new AssertionError(e); + } + }; + + executePutMappingRequests(totalRequest, dataNode, listener); + latch.await(); + + assertEquals(totalRequest + throttledRequest.get(), requestCountOnClusterManager.get()); + assertBusy( + () -> { assertEquals(clusterService().getMasterService().numberOfThrottledPendingTasks(), throttledRequest.get()); } + ); + } finally { + clusterSettingCleanUp(); + } + } + + /* + * This will test the throttling feature for single node. + * + * Here we will assert the client behaviour that client's request is not + * failed, i.e. Throttling exception is not passed to the client. + * Data node will internally do the retry and request should pass. + * + */ + public void testThrottlingForSingleNode() throws Exception { + try { + internalCluster().beforeTest(random()); + String node = internalCluster().startNode(); + int throttlingLimit = randomIntBetween(1, 5); + createIndex("test"); + setPutMappingThrottlingLimit(throttlingLimit); + + AtomicInteger successfulRequest = new AtomicInteger(); + int totalRequest = randomIntBetween(throttlingLimit, 3 * throttlingLimit); + CountDownLatch latch = new CountDownLatch(totalRequest); + + ActionListener listener = new ActionListener() { + @Override + public void onResponse(Object o) { + latch.countDown(); + successfulRequest.incrementAndGet(); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + throw new AssertionError(e); + } + }; + executePutMappingRequests(totalRequest, node, listener); + + latch.await(); + assertEquals(totalRequest, successfulRequest.get()); + } finally { + clusterSettingCleanUp(); + } + } + + /* + * This will test the timeout of tasks during throttling. + * + * Here we will assert the client behaviour that client's request is not + * failed with throttling exception but timeout exception. + * It also verifies that if limit is set to 0, all tasks are getting timedout. + */ + + public void testTimeoutWhileThrottling() throws Exception { + try { + internalCluster().beforeTest(random()); + String node = internalCluster().startNode(); + int throttlingLimit = 0; // throttle all the tasks + createIndex("test"); + setPutMappingThrottlingLimit(throttlingLimit); + + AtomicInteger timedoutRequest = new AtomicInteger(); + int totalRequest = randomIntBetween(1, 5); + CountDownLatch latch = new CountDownLatch(totalRequest); + + ActionListener listener = new ActionListener() { + @Override + public void onResponse(Object o) { + latch.countDown(); + throw new AssertionError("Request should not succeed"); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + assertTrue(e instanceof ProcessClusterEventTimeoutException); + timedoutRequest.incrementAndGet(); + } + }; + executePutMappingRequests(totalRequest, node, listener); + + latch.await(); + assertEquals(totalRequest, timedoutRequest.get()); // verifying all requests were timed out with 0 throttling limit + } finally { + clusterSettingCleanUp(); + } + } + + private void executePutMappingRequests(int totalRequest, String node, ActionListener listener) throws Exception { + Thread[] threads = new Thread[totalRequest]; + for (int i = 0; i < totalRequest; i++) { + PutMappingRequest putMappingRequest = new PutMappingRequest("test").source("field" + i, "type=text"); + threads[i] = new Thread(new Runnable() { + @Override + public void run() { + internalCluster().client(node).admin().indices().putMapping(putMappingRequest, listener); + } + }); + } + for (int i = 0; i < totalRequest; i++) { + threads[i].run(); + } + for (int i = 0; i < totalRequest; i++) { + threads[i].join(); + } + } + + private void setPutMappingThrottlingLimit(int throttlingLimit) { + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + Settings settings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", throttlingLimit).build(); + settingsRequest.transientSettings(settings); + assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet()); + } + + private void clusterSettingCleanUp() { + // We need to remove the throttling limit from setting as part of test cleanup + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + Settings settings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", (String) null).build(); + settingsRequest.transientSettings(settings); + assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet()); + } +} diff --git a/server/src/main/java/org/opensearch/OpenSearchException.java b/server/src/main/java/org/opensearch/OpenSearchException.java index cc2b0b66e8d6f..5cf64f873e5cc 100644 --- a/server/src/main/java/org/opensearch/OpenSearchException.java +++ b/server/src/main/java/org/opensearch/OpenSearchException.java @@ -34,6 +34,7 @@ import org.opensearch.action.support.replication.ReplicationOperation; import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.service.ClusterManagerThrottlingException; import org.opensearch.common.CheckedFunction; import org.opensearch.common.Nullable; import org.opensearch.common.ParseField; @@ -1619,6 +1620,12 @@ private enum OpenSearchExceptionHandle { org.opensearch.cluster.decommission.NodeDecommissionedException::new, 164, V_2_4_0 + ), + CLUSTER_MANAGER_TASK_THROTTLED_EXCEPTION( + ClusterManagerThrottlingException.class, + ClusterManagerThrottlingException::new, + 165, + Version.V_2_4_0 ); final Class exceptionClass; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java index 3e5ebdd6a17d3..e9ae23f6b9e34 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java @@ -54,6 +54,8 @@ import org.opensearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand; import org.opensearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; import org.opensearch.cluster.routing.allocation.command.AllocationCommand; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.Strings; @@ -80,6 +82,7 @@ public class TransportClusterRerouteAction extends TransportClusterManagerNodeAc private static final Logger logger = LogManager.getLogger(TransportClusterRerouteAction.class); private final AllocationService allocationService; + private static ClusterManagerTaskThrottler.ThrottlingKey clusterRerouteTaskKey; @Inject public TransportClusterRerouteAction( @@ -100,6 +103,8 @@ public TransportClusterRerouteAction( indexNameExpressionResolver ); this.allocationService = allocationService; + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + clusterRerouteTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CLUSTER_REROUTE_API_KEY, true); } @Override @@ -241,6 +246,11 @@ static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClus this.allocationService = allocationService; } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return clusterRerouteTaskKey; + } + @Override protected ClusterRerouteResponse newResponse(boolean acknowledged) { return new ClusterRerouteResponse(acknowledged, clusterStateToSend, explanations); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index ef404375485a2..e9cb6a78f5269 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -47,6 +47,8 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -73,6 +75,8 @@ public class TransportClusterUpdateSettingsAction extends TransportClusterManage private final ClusterSettings clusterSettings; + private final ClusterManagerTaskThrottler.ThrottlingKey clusterUpdateSettingTaskKey; + @Inject public TransportClusterUpdateSettingsAction( TransportService transportService, @@ -95,6 +99,10 @@ public TransportClusterUpdateSettingsAction( ); this.allocationService = allocationService; this.clusterSettings = clusterSettings; + + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + clusterUpdateSettingTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CLUSTER_UPDATE_SETTINGS_KEY, true); + } @Override @@ -136,6 +144,11 @@ protected void clusterManagerOperation( private volatile boolean changed = false; + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return clusterUpdateSettingTaskKey; + } + @Override protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) { return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate()); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportDeleteStoredScriptAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportDeleteStoredScriptAction.java index 4bc8d836a8200..e41ec2b1f737c 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportDeleteStoredScriptAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportDeleteStoredScriptAction.java @@ -40,6 +40,8 @@ import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; @@ -57,6 +59,7 @@ public class TransportDeleteStoredScriptAction extends TransportClusterManagerNodeAction { private final ScriptService scriptService; + private final ClusterManagerTaskThrottler.ThrottlingKey deleteScriptTaskKey; @Inject public TransportDeleteStoredScriptAction( @@ -77,6 +80,8 @@ public TransportDeleteStoredScriptAction( indexNameExpressionResolver ); this.scriptService = scriptService; + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + deleteScriptTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SCRIPT_KEY, true); } @Override @@ -95,7 +100,7 @@ protected void clusterManagerOperation( ClusterState state, ActionListener listener ) throws Exception { - scriptService.deleteStoredScript(clusterService, request, listener); + scriptService.deleteStoredScript(clusterService, request, deleteScriptTaskKey, listener); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportPutStoredScriptAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportPutStoredScriptAction.java index bb259f173d470..8ffe4d2b74695 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportPutStoredScriptAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportPutStoredScriptAction.java @@ -40,6 +40,8 @@ import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; @@ -57,6 +59,7 @@ public class TransportPutStoredScriptAction extends TransportClusterManagerNodeAction { private final ScriptService scriptService; + private final ClusterManagerTaskThrottler.ThrottlingKey putScriptTaskKey; @Inject public TransportPutStoredScriptAction( @@ -77,6 +80,8 @@ public TransportPutStoredScriptAction( indexNameExpressionResolver ); this.scriptService = scriptService; + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + putScriptTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SCRIPT_KEY, true); } @Override @@ -95,7 +100,7 @@ protected void clusterManagerOperation( ClusterState state, ActionListener listener ) throws Exception { - scriptService.putStoredScript(clusterService, request, listener); + scriptService.putStoredScript(clusterService, request, putScriptTaskKey, listener); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java index 73a2996945aff..77f09f02c9a9c 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java @@ -50,6 +50,8 @@ import org.opensearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest; import org.opensearch.cluster.metadata.MetadataCreateIndexService; import org.opensearch.cluster.metadata.MetadataIndexTemplateService; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.inject.Inject; @@ -84,6 +86,7 @@ public static final class TransportAction extends TransportClusterManagerNodeAct private final ActiveShardsObserver activeShardsObserver; private final MetadataCreateIndexService createIndexService; private final MetadataCreateDataStreamService metadataCreateDataStreamService; + private final ClusterManagerTaskThrottler.ThrottlingKey autoCreateTaskKey; @Inject public TransportAction( @@ -99,6 +102,9 @@ public TransportAction( this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); this.createIndexService = createIndexService; this.metadataCreateDataStreamService = metadataCreateDataStreamService; + + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + autoCreateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.AUTO_CREATE_KEY, true); } @Override @@ -142,6 +148,11 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return autoCreateTaskKey; + } + @Override public ClusterState execute(ClusterState currentState) throws Exception { DataStreamTemplate dataStreamTemplate = resolveAutoCreateDataStream(request, currentState.metadata()); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java b/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java index 015a0f6727ab7..e14125c21af9c 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java @@ -54,6 +54,8 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; @@ -80,6 +82,7 @@ public class TransportDeleteDanglingIndexAction extends TransportClusterManagerN private final Settings settings; private final NodeClient nodeClient; + private final ClusterManagerTaskThrottler.ThrottlingKey deleteDanglingIndexTaskKey; @Inject public TransportDeleteDanglingIndexAction( @@ -102,6 +105,8 @@ public TransportDeleteDanglingIndexAction( ); this.settings = settings; this.nodeClient = nodeClient; + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + deleteDanglingIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_DANGLING_INDEX_KEY, true); } @Override @@ -157,6 +162,11 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) { return new AcknowledgedResponse(acknowledged); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return deleteDanglingIndexTaskKey; + } + @Override public ClusterState execute(final ClusterState currentState) { return deleteDanglingIndex(currentState, indexToDelete); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java index 74b0a84782283..9260904025df2 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -50,6 +50,8 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.MetadataDeleteIndexService; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.Strings; @@ -167,6 +169,7 @@ public IndicesRequest indices(String... indices) { public static class TransportAction extends TransportClusterManagerNodeAction { private final MetadataDeleteIndexService deleteIndexService; + private final ClusterManagerTaskThrottler.ThrottlingKey removeDataStreamTaskKey; @Inject public TransportAction( @@ -179,6 +182,8 @@ public TransportAction( ) { super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); this.deleteIndexService = deleteIndexService; + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + removeDataStreamTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_DATA_STREAM_KEY, true); } @Override @@ -208,6 +213,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return removeDataStreamTaskKey; + } + @Override public ClusterState execute(ClusterState currentState) { return removeDataStream(deleteIndexService, currentState, request); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java b/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java index 4e5e7ec9184fe..b4ecae2ee08ba 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -49,6 +49,8 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.inject.Inject; @@ -77,6 +79,7 @@ public class TransportRolloverAction extends TransportClusterManagerNodeAction iterator() { + return new ExponentialEqualJitterBackoffIterator(baseDelay, maxDelayForRetry); + } + } + + private static class ExponentialEqualJitterBackoffIterator implements Iterator { + /** + * Retry limit to avoids integer overflow issues. + * Post this limit, max delay will be returned with Equal Jitter. + * + * NOTE: If the value is greater than 30, there can be integer overflow + * issues during delay calculation. + **/ + private final int RETRIES_TILL_JITTER_INCREASE = 30; + + /** + * Exponential increase in delay will happen till it reaches maxDelayForRetry. + * Once delay has exceeded maxDelayForRetry, it will return maxDelayForRetry only + * and not increase the delay. + */ + private final int maxDelayForRetry; + private final int baseDelay; + private int retriesAttempted; + + private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelayForRetry) { + this.baseDelay = baseDelay; + this.maxDelayForRetry = maxDelayForRetry; + } + + /** + * There is not any limit for this BackOff. + * This Iterator will always return back off delay. + * + * @return true + */ + @Override + public boolean hasNext() { + return true; + } + + @Override + public TimeValue next() { + int retries = Math.min(retriesAttempted, RETRIES_TILL_JITTER_INCREASE); + int exponentialDelay = (int) Math.min((1L << retries) * baseDelay, maxDelayForRetry); + retriesAttempted++; + return TimeValue.timeValueMillis((exponentialDelay / 2) + Randomness.get().nextInt(exponentialDelay / 2 + 1)); + } + } + + private static class ExponentialFullJitterBackoff extends BackoffPolicy { + private final long baseDelay; + + private ExponentialFullJitterBackoff(long baseDelay) { + this.baseDelay = baseDelay; + } + + @Override + public Iterator iterator() { + return new ExponentialFullJitterBackoffIterator(baseDelay); + } + } + + private static class ExponentialFullJitterBackoffIterator implements Iterator { + /** + * Current delay in exponential backoff + */ + private long currentDelay; + + private ExponentialFullJitterBackoffIterator(long baseDelay) { + this.currentDelay = baseDelay; + } + + /** + * There is not any limit for this BackOff. + * This Iterator will always return back off delay. + * + * @return true + */ + @Override + public boolean hasNext() { + return true; + } + + @Override + public TimeValue next() { + TimeValue delayToReturn = TimeValue.timeValueMillis(Randomness.get().nextInt(Math.toIntExact(currentDelay)) + 1); + currentDelay = Math.min(2 * currentDelay, Integer.MAX_VALUE); + return delayToReturn; + } + } + /** * Concrete Constant Back Off Policy * diff --git a/server/src/main/java/org/opensearch/action/support/RetryableAction.java b/server/src/main/java/org/opensearch/action/support/RetryableAction.java index 38b7e6ec2a8a0..281cf728fb18c 100644 --- a/server/src/main/java/org/opensearch/action/support/RetryableAction.java +++ b/server/src/main/java/org/opensearch/action/support/RetryableAction.java @@ -36,13 +36,14 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; -import org.opensearch.common.Randomness; +import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayDeque; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -64,6 +65,7 @@ public abstract class RetryableAction { private final long startMillis; private final ActionListener finalListener; private final String executor; + private final BackoffPolicy backoffPolicy; private volatile Scheduler.ScheduledCancellable retryTask; @@ -74,7 +76,15 @@ public RetryableAction( TimeValue timeoutValue, ActionListener listener ) { - this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME); + this( + logger, + threadPool, + initialDelay, + timeoutValue, + listener, + BackoffPolicy.exponentialFullJitterBackoff(initialDelay.getMillis()), + ThreadPool.Names.SAME + ); } public RetryableAction( @@ -83,6 +93,7 @@ public RetryableAction( TimeValue initialDelay, TimeValue timeoutValue, ActionListener listener, + BackoffPolicy backoffPolicy, String executor ) { this.logger = logger; @@ -95,10 +106,11 @@ public RetryableAction( this.startMillis = threadPool.relativeTimeInMillis(); this.finalListener = listener; this.executor = executor; + this.backoffPolicy = backoffPolicy; } public void run() { - final RetryingListener retryingListener = new RetryingListener(initialDelayMillis, null); + final RetryingListener retryingListener = new RetryingListener(backoffPolicy.iterator(), null); final Runnable runnable = createRunnable(retryingListener); threadPool.executor(executor).execute(runnable); } @@ -142,16 +154,24 @@ public void onRejection(Exception e) { public void onFinished() {} + /** + * Retry able task may want to throw different Exception on timeout, + * they can override it method for that. + */ + public Exception getTimeoutException(Exception e) { + return e; + } + private class RetryingListener implements ActionListener { private static final int MAX_EXCEPTIONS = 4; - private final long delayMillisBound; private ArrayDeque caughtExceptions; + private Iterator backoffDelayIterator; - private RetryingListener(long delayMillisBound, ArrayDeque caughtExceptions) { - this.delayMillisBound = delayMillisBound; + private RetryingListener(Iterator backoffDelayIterator, ArrayDeque caughtExceptions) { this.caughtExceptions = caughtExceptions; + this.backoffDelayIterator = backoffDelayIterator; } @Override @@ -171,16 +191,13 @@ public void onFailure(Exception e) { () -> new ParameterizedMessage("retryable action timed out after {}", TimeValue.timeValueMillis(elapsedMillis)), e ); - onFinalFailure(e); + onFinalFailure(getTimeoutException(e)); } else { addException(e); - final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE); - final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions); - final Runnable runnable = createRunnable(retryingListener); - final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1; + final TimeValue delay = backoffDelayIterator.next(); + final Runnable runnable = createRunnable(this); if (isDone.get() == false) { - final TimeValue delay = TimeValue.timeValueMillis(delayMillis); logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e); try { retryTask = threadPool.schedule(runnable, delay, executor); diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index a97f4ffe555b6..be075e605a21d 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -39,8 +39,10 @@ import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.ActionResponse; import org.opensearch.action.ActionRunnable; +import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.RetryableAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterManagerNodeChangePredicate; @@ -48,8 +50,10 @@ import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.coordination.FailedToCommitClusterStateException; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterManagerThrottlingException; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.Writeable; @@ -156,12 +160,10 @@ protected boolean localExecute(Request request) { @Override protected void doExecute(Task task, final Request request, ActionListener listener) { - ClusterState state = clusterService.state(); - logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); if (task != null) { request.setParentTask(clusterService.localNode().getId(), task.getId()); } - new AsyncSingleAction(task, request, listener).doStart(state); + new AsyncSingleAction(task, request, listener).run(); } /** @@ -169,21 +171,62 @@ protected void doExecute(Task task, final Request request, ActionListener listener; + private ActionListener listener; private final Request request; private ClusterStateObserver observer; private final long startTime; private final Task task; + private static final int BASE_DELAY_MILLIS = 10; + private static final int MAX_DELAY_MILLIS = 5000; AsyncSingleAction(Task task, Request request, ActionListener listener) { + super( + logger, + threadPool, + TimeValue.timeValueMillis(BASE_DELAY_MILLIS), + request.clusterManagerNodeTimeout, + listener, + BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS), + ThreadPool.Names.SAME + ); this.task = task; this.request = request; - this.listener = listener; this.startTime = threadPool.relativeTimeInMillis(); } + @Override + public void tryAction(ActionListener retryListener) { + ClusterState state = clusterService.state(); + logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); + this.listener = retryListener; + doStart(state); + } + + @Override + public boolean shouldRetry(Exception e) { + // If remote address is null, i.e request is generated from same node and we would want to perform retry for it + // If remote address is not null, i.e request is generated from remote node and received on this master node on transport layer + // in that case we would want throttling retry to perform on remote node only not on this master node. + if (request.remoteAddress() == null) { + if (e instanceof TransportException) { + return ((TransportException) e).unwrapCause() instanceof ClusterManagerThrottlingException; + } + return e instanceof ClusterManagerThrottlingException; + } + return false; + } + + /** + * If tasks gets timed out in retrying on throttling, + * it should send cluster event timeout exception. + */ + @Override + public Exception getTimeoutException(Exception e) { + return new ProcessClusterEventTimeoutException(request.masterNodeTimeout, actionName); + } + protected void doStart(ClusterState clusterState) { try { final DiscoveryNodes nodes = clusterState.nodes(); diff --git a/server/src/main/java/org/opensearch/cluster/ClusterStateTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/ClusterStateTaskExecutor.java index 976019ae77d6c..50beeb1f03deb 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterStateTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterStateTaskExecutor.java @@ -31,6 +31,7 @@ package org.opensearch.cluster; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.common.Nullable; import java.util.IdentityHashMap; @@ -88,6 +89,16 @@ default String describeTasks(List tasks) { return String.join(", ", tasks.stream().map(t -> (CharSequence) t.toString()).filter(t -> t.length() > 0)::iterator); } + /** + * Throttling key associated with the task, on which cluster manager node will do aggregation count + * and perform throttling based on configured threshold in cluster setting. + */ + default ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + // Default task is not registered with clusterService.registerClusterMangerTask, + // User can't configure throttling limit on it and will be bypassed while throttling on cluster manager + return ClusterManagerTaskThrottler.DEFAULT_THROTTLING_KEY; + } + /** * Represents the result of a batched execution of cluster state update tasks * @param the type of the cluster state update task diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java index 412d4dba628cb..7be5ea7e2c34a 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -45,6 +45,8 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ack.ClusterStateUpdateRequest; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; @@ -74,6 +76,7 @@ public class MetadataCreateDataStreamService { private final ClusterService clusterService; private final ActiveShardsObserver activeShardsObserver; private final MetadataCreateIndexService metadataCreateIndexService; + private final ClusterManagerTaskThrottler.ThrottlingKey createDataStreamTaskKey; public MetadataCreateDataStreamService( ThreadPool threadPool, @@ -83,6 +86,8 @@ public MetadataCreateDataStreamService( this.clusterService = clusterService; this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); this.metadataCreateIndexService = metadataCreateIndexService; + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + createDataStreamTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_DATA_STREAM_KEY, true); } public void createDataStream(CreateDataStreamClusterStateUpdateRequest request, ActionListener finalListener) { @@ -113,6 +118,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { return clusterState; } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return createDataStreamTaskKey; + } + @Override protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 46be91fff398b..879a7421251fb 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -60,6 +60,8 @@ import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -147,6 +149,7 @@ public class MetadataCreateIndexService { private final ShardLimitValidator shardLimitValidator; private final boolean forbidPrivateIndexSettings; private final Set indexSettingProviders = new HashSet<>(); + private final ClusterManagerTaskThrottler.ThrottlingKey createIndexTaskKey; private AwarenessReplicaBalance awarenessReplicaBalance; public MetadataCreateIndexService( @@ -177,6 +180,9 @@ public MetadataCreateIndexService( this.forbidPrivateIndexSettings = forbidPrivateIndexSettings; this.shardLimitValidator = shardLimitValidator; this.awarenessReplicaBalance = awarenessReplicaBalance; + + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true); } /** @@ -326,6 +332,11 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return createIndexTaskKey; + } + @Override public ClusterState execute(ClusterState currentState) throws Exception { return applyCreateIndexRequest(currentState, request, false); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java index 66f5edf3da129..655b5ceb376f5 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java @@ -43,6 +43,8 @@ import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.collect.ImmutableOpenMap; @@ -73,12 +75,17 @@ public class MetadataDeleteIndexService { private final ClusterService clusterService; private final AllocationService allocationService; + private final ClusterManagerTaskThrottler.ThrottlingKey deleteIndexTaskKey; @Inject public MetadataDeleteIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService) { this.settings = settings; this.clusterService = clusterService; this.allocationService = allocationService; + + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + deleteIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_INDEX_KEY, true); + } public void deleteIndices( @@ -98,6 +105,11 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return deleteIndexTaskKey; + } + @Override public ClusterState execute(final ClusterState currentState) { return deleteIndices(currentState, Sets.newHashSet(request.indices())); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java index 8d6939a57240c..7f5a5e876d373 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java @@ -39,6 +39,8 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; import org.opensearch.cluster.metadata.AliasAction.NewAliasValidator; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.Strings; @@ -78,6 +80,7 @@ public class MetadataIndexAliasesService { private final MetadataDeleteIndexService deleteIndexService; private final NamedXContentRegistry xContentRegistry; + private final ClusterManagerTaskThrottler.ThrottlingKey indexAliasTaskKey; @Inject public MetadataIndexAliasesService( @@ -92,6 +95,10 @@ public MetadataIndexAliasesService( this.aliasValidator = aliasValidator; this.deleteIndexService = deleteIndexService; this.xContentRegistry = xContentRegistry; + + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + indexAliasTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.INDEX_ALIASES_KEY, true); + } public void indicesAliases( @@ -106,6 +113,11 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return indexAliasTaskKey; + } + @Override public ClusterState execute(ClusterState currentState) { return applyAliasActions(currentState, request.actions()); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java index 7e91b491a234c..c2160b37f2722 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java @@ -45,6 +45,8 @@ import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -109,6 +111,12 @@ public class MetadataIndexTemplateService { private final MetadataCreateIndexService metadataCreateIndexService; private final IndexScopedSettings indexScopedSettings; private final NamedXContentRegistry xContentRegistry; + private final ClusterManagerTaskThrottler.ThrottlingKey createIndexTemplateTaskKey; + private final ClusterManagerTaskThrottler.ThrottlingKey createIndexTemplateV2TaskKey; + private final ClusterManagerTaskThrottler.ThrottlingKey removeIndexTemplateTaskKey; + private final ClusterManagerTaskThrottler.ThrottlingKey removeIndexTemplateV2TaskKey; + private final ClusterManagerTaskThrottler.ThrottlingKey createComponentTemplateTaskKey; + private final ClusterManagerTaskThrottler.ThrottlingKey removeComponentTemplateTaskKey; @Inject public MetadataIndexTemplateService( @@ -125,6 +133,20 @@ public MetadataIndexTemplateService( this.metadataCreateIndexService = metadataCreateIndexService; this.indexScopedSettings = indexScopedSettings; this.xContentRegistry = xContentRegistry; + + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + createIndexTemplateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_TEMPLATE_KEY, true); + createIndexTemplateV2TaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_TEMPLATE_V2_KEY, true); + removeIndexTemplateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_INDEX_TEMPLATE_KEY, true); + removeIndexTemplateV2TaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_INDEX_TEMPLATE_V2_KEY, true); + createComponentTemplateTaskKey = clusterService.registerClusterManagerTask( + ClusterManagerTaskKeys.CREATE_COMPONENT_TEMPLATE_KEY, + true + ); + removeComponentTemplateTaskKey = clusterService.registerClusterManagerTask( + ClusterManagerTaskKeys.REMOVE_COMPONENT_TEMPLATE_KEY, + true + ); } public void removeTemplates(final RemoveRequest request, final RemoveListener listener) { @@ -140,6 +162,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return removeIndexTemplateTaskKey; + } + @Override public ClusterState execute(ClusterState currentState) { Set templateNames = new HashSet<>(); @@ -198,6 +225,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return createComponentTemplateTaskKey; + } + @Override public ClusterState execute(ClusterState currentState) throws Exception { return addComponentTemplate(currentState, create, name, template); @@ -358,6 +390,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return removeComponentTemplateTaskKey; + } + @Override public ClusterState execute(ClusterState currentState) { Set templateNames = new HashSet<>(); @@ -447,6 +484,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return createIndexTemplateV2TaskKey; + } + @Override public ClusterState execute(ClusterState currentState) throws Exception { return addIndexTemplateV2(currentState, create, name, template); @@ -764,6 +806,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return removeIndexTemplateV2TaskKey; + } + @Override public ClusterState execute(ClusterState currentState) { return innerRemoveIndexTemplateV2(currentState, name); @@ -868,6 +915,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return createIndexTemplateTaskKey; + } + @Override public ClusterState execute(ClusterState currentState) throws Exception { validateTemplate(request.settings, request.mappings, indicesService, xContentRegistry); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java index 7f67c45fc80e5..deb4dfd2581bf 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java @@ -43,6 +43,8 @@ import org.opensearch.cluster.ClusterStateTaskExecutor; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -78,6 +80,7 @@ public class MetadataMappingService { private final ClusterService clusterService; private final IndicesService indicesService; + private final ClusterManagerTaskThrottler.ThrottlingKey putMappingTaskKey; final RefreshTaskExecutor refreshExecutor = new RefreshTaskExecutor(); final PutMappingExecutor putMappingExecutor = new PutMappingExecutor(); @@ -86,6 +89,10 @@ public class MetadataMappingService { public MetadataMappingService(ClusterService clusterService, IndicesService indicesService) { this.clusterService = clusterService; this.indicesService = indicesService; + + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + putMappingTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_MAPPING_KEY, true); + } static class RefreshTask { @@ -246,6 +253,11 @@ public ClusterTasksResult execute( } } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return putMappingTaskKey; + } + private ClusterState applyRequest( ClusterState currentState, PutMappingClusterStateUpdateRequest request, diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index 4756e625d21cc..2b5e236bef8c9 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -47,6 +47,8 @@ import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.ValidationException; @@ -89,6 +91,7 @@ public class MetadataUpdateSettingsService { private final IndicesService indicesService; private final ShardLimitValidator shardLimitValidator; private final ThreadPool threadPool; + private final ClusterManagerTaskThrottler.ThrottlingKey updateSettingsTaskKey; private AwarenessReplicaBalance awarenessReplicaBalance; @@ -109,6 +112,9 @@ public MetadataUpdateSettingsService( this.indicesService = indicesService; this.shardLimitValidator = shardLimitValidator; this.awarenessReplicaBalance = awarenessReplicaBalance; + + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + updateSettingsTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.UPDATE_SETTINGS_KEY, true); } public void updateSettings( @@ -162,6 +168,11 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return updateSettingsTaskKey; + } + @Override public ClusterState execute(ClusterState currentState) { diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java new file mode 100644 index 0000000000000..0743997c23c9a --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +/** + * Class for maintaining all cluster manager task key at one place. + */ +public final class ClusterManagerTaskKeys { + + public static final String CREATE_INDEX_KEY = "create-index"; + public static final String UPDATE_SETTINGS_KEY = "update-settings"; + public static final String CLUSTER_UPDATE_SETTINGS_KEY = "cluster-update-settings"; + public static final String AUTO_CREATE_KEY = "auto-create"; + public static final String DELETE_INDEX_KEY = "delete-index"; + public static final String DELETE_DANGLING_INDEX_KEY = "delete-dangling-index"; + public static final String CREATE_DATA_STREAM_KEY = "create-data-stream"; + public static final String REMOVE_DATA_STREAM_KEY = "remove-data-stream"; + public static final String ROLLOVER_INDEX_KEY = "rollover-index"; + public static final String INDEX_ALIASES_KEY = "index-aliases"; + public static final String PUT_MAPPING_KEY = "put-mapping"; + public static final String CREATE_INDEX_TEMPLATE_KEY = "create-index-template"; + public static final String REMOVE_INDEX_TEMPLATE_KEY = "remove-index-template"; + public static final String CREATE_COMPONENT_TEMPLATE_KEY = "create-component-template"; + public static final String REMOVE_COMPONENT_TEMPLATE_KEY = "remove-component-template"; + public static final String CREATE_INDEX_TEMPLATE_V2_KEY = "create-index-template-v2"; + public static final String REMOVE_INDEX_TEMPLATE_V2_KEY = "remove-index-template-v2"; + public static final String PUT_PIPELINE_KEY = "put-pipeline"; + public static final String DELETE_PIPELINE_KEY = "delete-pipeline"; + public static final String CREATE_PERSISTENT_TASK_KEY = "create-persistent-task"; + public static final String FINISH_PERSISTENT_TASK_KEY = "finish-persistent-task"; + public static final String REMOVE_PERSISTENT_TASK_KEY = "remove-persistent-task"; + public static final String UPDATE_TASK_STATE_KEY = "update-task-state"; + public static final String PUT_SCRIPT_KEY = "put-script"; + public static final String DELETE_SCRIPT_KEY = "delete-script"; + public static final String PUT_REPOSITORY_KEY = "put-repository"; + public static final String DELETE_REPOSITORY_KEY = "delete-repository"; + public static final String CREATE_SNAPSHOT_KEY = "create-snapshot"; + public static final String DELETE_SNAPSHOT_KEY = "delete-snapshot"; + public static final String UPDATE_SNAPSHOT_STATE_KEY = "update-snapshot-state"; + public static final String RESTORE_SNAPSHOT_KEY = "restore-snapshot"; + public static final String CLUSTER_REROUTE_API_KEY = "cluster-reroute-api"; + +} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java new file mode 100644 index 0000000000000..0503db713258d --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java @@ -0,0 +1,200 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterStateTaskExecutor; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; + +/** + * This class does throttling on task submission to cluster manager node, it uses throttling key defined in various executors + * as key for throttling. Throttling will be performed over task executor's class level, different task types have different executors class. + * + * Set specific setting to for setting the threshold of throttling of particular task type. + * e.g : Set "cluster_manager.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks, + * Set it to default value(-1) to disable the throttling for this task type. + */ +public class ClusterManagerTaskThrottler implements TaskBatcherListener { + private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class); + public static final ThrottlingKey DEFAULT_THROTTLING_KEY = new ThrottlingKey("default-task-key", false); + + public static final Setting THRESHOLD_SETTINGS = Setting.groupSetting( + "cluster_manager.throttling.thresholds.", + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + protected Map THROTTLING_TASK_KEYS = new ConcurrentHashMap<>(); + + private final int MIN_THRESHOLD_VALUE = -1; // Disabled throttling + private final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener; + + private final ConcurrentMap tasksCount; + private final ConcurrentMap tasksThreshold; + private final Supplier minNodeVersionSupplier; + + public ClusterManagerTaskThrottler( + final ClusterSettings clusterSettings, + final Supplier minNodeVersionSupplier, + final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener + ) { + clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting); + this.minNodeVersionSupplier = minNodeVersionSupplier; + this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener; + tasksCount = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment + tasksThreshold = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment + } + + /** + * To configure a new task for throttling, + * * Register task to cluster service with task key, + * * override getClusterManagerThrottlingKey method with above task key in task executor. + * * Verify that throttled tasks would be retried from data nodes + * + * Added retry mechanism in TransportClusterManagerNodeAction, so it would be retried for customer generated tasks. + * + * If tasks are not getting retried then we can register with false flag, so user won't be able to configure threshold limits for it. + */ + protected ThrottlingKey registerClusterManagerTask(String taskKey, boolean throttlingEnabled) { + ThrottlingKey throttlingKey = new ThrottlingKey(taskKey, throttlingEnabled); + if (THROTTLING_TASK_KEYS.containsKey(taskKey)) { + throw new IllegalArgumentException("There is already a Throttling key registered with same name: " + taskKey); + } + THROTTLING_TASK_KEYS.put(taskKey, throttlingKey); + return throttlingKey; + } + + /** + * Class to store the throttling key for the tasks of cluster manager + */ + public static class ThrottlingKey { + private String taskThrottlingKey; + private boolean throttlingEnabled; + + /** + * Class for throttling key of tasks + * + * @param taskThrottlingKey - throttling key for task + * @param throttlingEnabled - if throttling is enabled or not i.e. data node is performing retry over throttling exception or not. + */ + private ThrottlingKey(String taskThrottlingKey, boolean throttlingEnabled) { + this.taskThrottlingKey = taskThrottlingKey; + this.throttlingEnabled = throttlingEnabled; + } + + public String getTaskThrottlingKey() { + return taskThrottlingKey; + } + + public boolean isThrottlingEnabled() { + return throttlingEnabled; + } + } + + void validateSetting(final Settings settings) { + if (minNodeVersionSupplier.get().compareTo(Version.V_2_4_0) < 0) { + throw new IllegalArgumentException("All the nodes in cluster should be on version later than or equal to 2.4.0"); + } + Map groups = settings.getAsGroups(); + for (String key : groups.keySet()) { + if (!THROTTLING_TASK_KEYS.containsKey(key)) { + throw new IllegalArgumentException("Cluster manager task throttling is not configured for given task type: " + key); + } + if (!THROTTLING_TASK_KEYS.get(key).isThrottlingEnabled()) { + throw new IllegalArgumentException("Throttling is not enabled for given task type: " + key); + } + int threshold = groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE); + if (threshold < MIN_THRESHOLD_VALUE) { + throw new IllegalArgumentException("Provide positive integer for limit or -1 for disabling throttling"); + } + } + } + + void updateSetting(final Settings settings) { + Map groups = settings.getAsGroups(); + for (String key : groups.keySet()) { + updateLimit(key, groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE)); + } + } + + void updateLimit(final String taskKey, final int limit) { + assert limit >= MIN_THRESHOLD_VALUE; + if (limit == MIN_THRESHOLD_VALUE) { + tasksThreshold.remove(taskKey); + } else { + tasksThreshold.put(taskKey, (long) limit); + } + } + + Long getThrottlingLimit(final String taskKey) { + return tasksThreshold.get(taskKey); + } + + @Override + public void onBeginSubmit(List tasks) { + ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey) + .getClusterManagerThrottlingKey(); + tasksCount.putIfAbsent(clusterManagerThrottlingKey.getTaskThrottlingKey(), 0L); + tasksCount.computeIfPresent(clusterManagerThrottlingKey.getTaskThrottlingKey(), (key, count) -> { + int size = tasks.size(); + if (clusterManagerThrottlingKey.isThrottlingEnabled()) { + Long threshold = tasksThreshold.get(clusterManagerThrottlingKey.getTaskThrottlingKey()); + if (threshold != null && (count + size > threshold)) { + clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey.getTaskThrottlingKey(), size); + logger.warn( + "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", + clusterManagerThrottlingKey.getTaskThrottlingKey(), + tasks.size(), + threshold + ); + throw new ClusterManagerThrottlingException( + "Throttling Exception : Limit exceeded for " + clusterManagerThrottlingKey.getTaskThrottlingKey() + ); + } + } + return count + size; + }); + } + + @Override + public void onSubmitFailure(List tasks) { + reduceTaskCount(tasks); + } + + /** + * Tasks will be removed from the queue before processing, so here we will reduce the count of tasks. + * + * @param tasks list of tasks which will be executed. + */ + @Override + public void onBeginProcessing(List tasks) { + reduceTaskCount(tasks); + } + + @Override + public void onTimeout(List tasks) { + reduceTaskCount(tasks); + } + + private void reduceTaskCount(List tasks) { + String masterTaskKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey).getClusterManagerThrottlingKey() + .getTaskThrottlingKey(); + tasksCount.computeIfPresent(masterTaskKey, (key, count) -> count - tasks.size()); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerListener.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerListener.java new file mode 100644 index 0000000000000..9d41f4d39b09f --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerListener.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +/** + * Listener interface for master task throttling + */ +public interface ClusterManagerTaskThrottlerListener { + void onThrottle(String type, final int counts); +} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java new file mode 100644 index 0000000000000..4e2ab2037f548 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.io.stream.StreamInput; + +import java.io.IOException; + +/** + * Exception raised from cluster manager node due to task throttling. + */ +public class ClusterManagerThrottlingException extends OpenSearchException { + + public ClusterManagerThrottlingException(String msg, Object... args) { + super(msg, args); + } + + public ClusterManagerThrottlingException(StreamInput in) throws IOException { + super(in); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingStats.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingStats.java new file mode 100644 index 0000000000000..fe4eb20902723 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingStats.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.common.metrics.CounterMetric; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Contains stats of Cluster Manager Task Throttling. + * It stores the total cumulative count of throttled tasks per task type. + */ +public class ClusterManagerThrottlingStats implements ClusterManagerTaskThrottlerListener { + + private Map throttledTasksCount = new ConcurrentHashMap<>(); + + private void incrementThrottlingCount(String type, final int counts) { + throttledTasksCount.computeIfAbsent(type, k -> new CounterMetric()).inc(counts); + } + + public long getThrottlingCount(String type) { + return throttledTasksCount.get(type) == null ? 0 : throttledTasksCount.get(type).count(); + } + + public long getTotalThrottledTaskCount() { + CounterMetric totalCount = new CounterMetric(); + throttledTasksCount.forEach((aClass, counterMetric) -> { totalCount.inc(counterMetric.count()); }); + return totalCount.count(); + } + + @Override + public void onThrottle(String type, int counts) { + incrementThrottlingCount(type, counts); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index d393613118af8..a605c41bdeff8 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -291,6 +291,17 @@ public final String getNodeName() { return nodeName; } + /** + * Functionality for register task key to cluster manager node. + * + * @param taskKey - task key of task + * @param throttlingEnabled - throttling is enabled for task or not i.e does data node perform retries on it or not + * @return throttling task key which needs to be passed while submitting task to cluster manager + */ + public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(String taskKey, boolean throttlingEnabled) { + return clusterManagerService.registerClusterManagerTask(taskKey, throttlingEnabled); + } + /** * Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig, * ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched. diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index b78707e994855..f78e2c760ebb3 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Assertions; +import org.opensearch.Version; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.AckedClusterStateTaskListener; import org.opensearch.cluster.ClusterChangedEvent; @@ -127,6 +128,8 @@ public class MasterService extends AbstractLifecycleComponent { private volatile PrioritizedOpenSearchThreadPoolExecutor threadPoolExecutor; private volatile Batcher taskBatcher; + protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler; + private final ClusterManagerThrottlingStats throttlingStats; public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); @@ -137,6 +140,8 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP this::setSlowTaskLoggingThreshold ); + this.throttlingStats = new ClusterManagerThrottlingStats(); + this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler(clusterSettings, this::getMinNodeVersion, throttlingStats); this.threadPool = threadPool; } @@ -157,7 +162,7 @@ protected synchronized void doStart() { Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting"); Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting"); threadPoolExecutor = createThreadPoolExecutor(); - taskBatcher = new Batcher(logger, threadPoolExecutor); + taskBatcher = new Batcher(logger, threadPoolExecutor, clusterManagerTaskThrottler); } protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { @@ -172,8 +177,8 @@ protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { @SuppressWarnings("unchecked") class Batcher extends TaskBatcher { - Batcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor) { - super(logger, threadExecutor); + Batcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor, TaskBatcherListener taskBatcherListener) { + super(logger, threadExecutor, taskBatcherListener); } @Override @@ -589,6 +594,20 @@ public List pendingTasks() { }).collect(Collectors.toList()); } + /** + * Returns the number of throttled pending tasks. + */ + public long numberOfThrottledPendingTasks() { + return throttlingStats.getTotalThrottledTaskCount(); + } + + /** + * Returns the min version of nodes in cluster + */ + public Version getMinNodeVersion() { + return state().getNodes().getMinNodeVersion(); + } + /** * Returns the number of currently pending tasks. */ @@ -915,6 +934,17 @@ void onNoLongerClusterManager() { } } + /** + * Functionality for register task key to cluster manager node. + * + * @param taskKey - task key of task + * @param throttlingEnabled - throttling is enabled for task or not i.e does data node perform retries on it or not + * @return throttling task key which needs to be passed while submitting task to cluster manager + */ + public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(String taskKey, boolean throttlingEnabled) { + return clusterManagerTaskThrottler.registerClusterManagerTask(taskKey, throttlingEnabled); + } + /** * Submits a batch of cluster state update tasks; submitted updates are guaranteed to be processed together, * potentially with more tasks of the same executor. diff --git a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java index e04c8617ecd33..b5710bab41172 100644 --- a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java +++ b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java @@ -62,10 +62,12 @@ public abstract class TaskBatcher { private final PrioritizedOpenSearchThreadPoolExecutor threadExecutor; // package visible for tests final Map> tasksPerBatchingKey = new HashMap<>(); + private final TaskBatcherListener taskBatcherListener; - public TaskBatcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor) { + public TaskBatcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor, TaskBatcherListener taskBatcherListener) { this.logger = logger; this.threadExecutor = threadExecutor; + this.taskBatcherListener = taskBatcherListener; } public void submitTasks(List tasks, @Nullable TimeValue timeout) throws OpenSearchRejectedExecutionException { @@ -75,36 +77,46 @@ public void submitTasks(List tasks, @Nullable TimeValue t final BatchedTask firstTask = tasks.get(0); assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) : "tasks submitted in a batch should share the same batching key: " + tasks; - // convert to an identity map to check for dups based on task identity - final Map tasksIdentity = tasks.stream() - .collect( - Collectors.toMap( - BatchedTask::getTask, - Function.identity(), - (a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); }, - IdentityHashMap::new - ) - ); - - synchronized (tasksPerBatchingKey) { - LinkedHashSet existingTasks = tasksPerBatchingKey.computeIfAbsent( - firstTask.batchingKey, - k -> new LinkedHashSet<>(tasks.size()) - ); - for (BatchedTask existing : existingTasks) { - // check that there won't be two tasks with the same identity for the same batching key - BatchedTask duplicateTask = tasksIdentity.get(existing.getTask()); - if (duplicateTask != null) { - throw new IllegalStateException( - "task [" - + duplicateTask.describeTasks(Collections.singletonList(existing)) - + "] with source [" - + duplicateTask.source - + "] is already queued" - ); + assert tasks.stream().allMatch(t -> t.getTask().getClass() == firstTask.getTask().getClass()) + : "tasks submitted in a batch should be of same class: " + tasks; + + taskBatcherListener.onBeginSubmit(tasks); + + try { + // convert to an identity map to check for dups based on task identity + final Map tasksIdentity = tasks.stream() + .collect( + Collectors.toMap( + BatchedTask::getTask, + Function.identity(), + (a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); }, + IdentityHashMap::new + ) + ); + + synchronized (tasksPerBatchingKey) { + LinkedHashSet existingTasks = tasksPerBatchingKey.computeIfAbsent( + firstTask.batchingKey, + k -> new LinkedHashSet<>(tasks.size()) + ); + for (BatchedTask existing : existingTasks) { + // check that there won't be two tasks with the same identity for the same batching key + BatchedTask duplicateTask = tasksIdentity.get(existing.getTask()); + if (duplicateTask != null) { + throw new IllegalStateException( + "task [" + + duplicateTask.describeTasks(Collections.singletonList(existing)) + + "] with source [" + + duplicateTask.source + + "] is already queued" + ); + } } + existingTasks.addAll(tasks); } - existingTasks.addAll(tasks); + } catch (Exception e) { + taskBatcherListener.onSubmitFailure(tasks); + throw e; } if (timeout != null) { @@ -136,6 +148,7 @@ private void onTimeoutInternal(List tasks, TimeValue time } } } + taskBatcherListener.onTimeout(toRemove); onTimeout(toRemove, timeout); } } @@ -173,6 +186,7 @@ void runIfNotProcessed(BatchedTask updateTask) { return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]"; }).reduce((s1, s2) -> s1 + ", " + s2).orElse(""); + taskBatcherListener.onBeginProcessing(toExecute); run(updateTask.batchingKey, toExecute, tasksSummary); } } diff --git a/server/src/main/java/org/opensearch/cluster/service/TaskBatcherListener.java b/server/src/main/java/org/opensearch/cluster/service/TaskBatcherListener.java new file mode 100644 index 0000000000000..2feaf2540a96e --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/TaskBatcherListener.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import java.util.List; + +/** + * Listener class for callback on various events of TaskBatcher. + */ +public interface TaskBatcherListener { + /** + * Callback called before submitting tasks. + * @param tasks list of tasks which will be submitted. + */ + void onBeginSubmit(List tasks); + + /** + * Callback called if tasks submission due to any reason + * for e.g. failing due to duplicate tasks. + * @param tasks list of tasks which was failed to submit. + */ + void onSubmitFailure(List tasks); + + /** + * Callback called before processing any tasks. + * @param tasks list of tasks which will be executed. + */ + void onBeginProcessing(List tasks); + + /** + * Callback called when tasks are timed out. + * @param tasks list of tasks which will be executed. + */ + void onTimeout(List tasks); +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 70a17837839cf..d635238a8dbd9 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -92,6 +92,7 @@ import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.common.logging.Loggers; import org.opensearch.common.network.NetworkModule; @@ -591,7 +592,7 @@ public void apply(Settings value, Settings current, Settings previous) { IndexingPressure.MAX_INDEXING_BYTES, TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED, TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED, - + ClusterManagerTaskThrottler.THRESHOLD_SETTINGS, // Settings related to search backpressure SearchBackpressureSettings.SETTING_MODE, SearchBackpressureSettings.SETTING_CANCELLATION_RATIO, diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index b8256fe896da4..b9785d9ec036f 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -56,6 +56,8 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.MetadataIndexTemplateService; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.Tuple; import org.opensearch.common.regex.Regex; @@ -114,6 +116,8 @@ public class IngestService implements ClusterStateApplier, ReportingService> ingestClusterStateListeners = new CopyOnWriteArrayList<>(); + private final ClusterManagerTaskThrottler.ThrottlingKey putPipelineTaskKey; + private final ClusterManagerTaskThrottler.ThrottlingKey deletePipelineTaskKey; private volatile ClusterState state; public IngestService( @@ -141,8 +145,11 @@ public IngestService( threadPool.generic()::execute ) ); - this.threadPool = threadPool; + + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_PIPELINE_KEY, true); + deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_PIPELINE_KEY, true); } private static Map processorFactories(List ingestPlugins, Processor.Parameters parameters) { @@ -291,6 +298,11 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) { public ClusterState execute(ClusterState currentState) { return innerDelete(request, currentState); } + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return deletePipelineTaskKey; + } } ); } @@ -385,6 +397,11 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) { public ClusterState execute(ClusterState currentState) { return innerPut(request, currentState); } + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return putPipelineTaskKey; + } } ); } diff --git a/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java index 9dc7f7d7380cc..bce3095d0c30d 100644 --- a/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java @@ -45,6 +45,8 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -81,6 +83,10 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos private final EnableAssignmentDecider decider; private final ThreadPool threadPool; private final PeriodicRechecker periodicRechecker; + private final ClusterManagerTaskThrottler.ThrottlingKey createPersistentTaskKey; + private final ClusterManagerTaskThrottler.ThrottlingKey finishPersistentTaskKey; + private final ClusterManagerTaskThrottler.ThrottlingKey removePersistentTaskKey; + private final ClusterManagerTaskThrottler.ThrottlingKey updatePersistentTaskKey; public PersistentTasksClusterService( Settings settings, @@ -98,6 +104,12 @@ public PersistentTasksClusterService( } clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, this::setRecheckInterval); + + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + createPersistentTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_PERSISTENT_TASK_KEY, true); + finishPersistentTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.FINISH_PERSISTENT_TASK_KEY, true); + removePersistentTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_PERSISTENT_TASK_KEY, true); + updatePersistentTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.UPDATE_TASK_STATE_KEY, true); } // visible for testing only @@ -144,6 +156,11 @@ public ClusterState execute(ClusterState currentState) { return update(currentState, builder.addTask(taskId, taskName, taskParams, assignment)); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return createPersistentTaskKey; + } + @Override public void onFailure(String source, Exception e) { listener.onFailure(e); @@ -203,6 +220,11 @@ public ClusterState execute(ClusterState currentState) { } } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return finishPersistentTaskKey; + } + @Override public void onFailure(String source, Exception e) { listener.onFailure(e); @@ -234,6 +256,11 @@ public ClusterState execute(ClusterState currentState) { } } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return removePersistentTaskKey; + } + @Override public void onFailure(String source, Exception e) { listener.onFailure(e); @@ -277,6 +304,11 @@ public ClusterState execute(ClusterState currentState) { } } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return updatePersistentTaskKey; + } + @Override public void onFailure(String source, Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index c70c10495b7b5..32bdb8b665520 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -53,6 +53,8 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Strings; import org.opensearch.common.component.AbstractLifecycleComponent; @@ -111,6 +113,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C private final Map internalRepositories = ConcurrentCollections.newConcurrentMap(); private volatile Map repositories = Collections.emptyMap(); private final RepositoriesStatsArchive repositoriesStatsArchive; + private final ClusterManagerTaskThrottler.ThrottlingKey putRepositoryTaskKey; + private final ClusterManagerTaskThrottler.ThrottlingKey deleteRepositoryTaskKey; public RepositoriesService( Settings settings, @@ -137,6 +141,9 @@ public RepositoriesService( REPOSITORIES_STATS_ARCHIVE_MAX_ARCHIVED_STATS.get(settings), threadPool::relativeTimeInMillis ); + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + putRepositoryTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_REPOSITORY_KEY, true); + deleteRepositoryTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_REPOSITORY_KEY, true); } /** @@ -229,6 +236,11 @@ public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).metadata(mdBuilder).build(); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return putRepositoryTaskKey; + } + @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", request.name()), e); @@ -290,6 +302,11 @@ public ClusterState execute(ClusterState currentState) { throw new RepositoryMissingException(request.name()); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return deleteRepositoryTaskKey; + } + @Override public boolean mustAck(DiscoveryNode discoveryNode) { // repository was created on both cluster-manager and data nodes diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 3513cd6d00ce5..50388a1354327 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -63,6 +63,7 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Numbers; @@ -454,6 +455,11 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public TimeValue timeout() { return updateTask.timeout(); } + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return updateTask.getClusterManagerThrottlingKey(); + } }); }, onFailure)); } diff --git a/server/src/main/java/org/opensearch/script/ScriptService.java b/server/src/main/java/org/opensearch/script/ScriptService.java index 303fc5ccbcf88..0eeb6b38e5b27 100644 --- a/server/src/main/java/org/opensearch/script/ScriptService.java +++ b/server/src/main/java/org/opensearch/script/ScriptService.java @@ -45,6 +45,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Strings; import org.opensearch.common.settings.ClusterSettings; @@ -545,6 +546,7 @@ protected StoredScriptSource getScriptFromClusterState(String id) { public void putStoredScript( ClusterService clusterService, PutStoredScriptRequest request, + ClusterManagerTaskThrottler.ThrottlingKey putStoreTaskKey, ActionListener listener ) { if (request.content().length() > maxSizeInBytes) { @@ -604,6 +606,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { return ClusterState.builder(currentState).metadata(mdb).build(); } + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return putStoreTaskKey; + } } ); } @@ -611,6 +618,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { public void deleteStoredScript( ClusterService clusterService, DeleteStoredScriptRequest request, + ClusterManagerTaskThrottler.ThrottlingKey deleteScriptTaskKey, ActionListener listener ) { clusterService.submitStateUpdateTask( @@ -630,6 +638,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { return ClusterState.builder(currentState).metadata(mdb).build(); } + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return deleteScriptTaskKey; + } } ); } diff --git a/server/src/main/java/org/opensearch/snapshots/RestoreService.java b/server/src/main/java/org/opensearch/snapshots/RestoreService.java index 643f176a82221..9a4c3e7dc8ce8 100644 --- a/server/src/main/java/org/opensearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/opensearch/snapshots/RestoreService.java @@ -75,6 +75,8 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; @@ -178,6 +180,8 @@ public class RestoreService implements ClusterStateApplier { private final ClusterSettings clusterSettings; + private final ClusterManagerTaskThrottler.ThrottlingKey restoreSnapshotTaskKey; + private static final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor(); public RestoreService( @@ -199,6 +203,10 @@ public RestoreService( } this.clusterSettings = clusterService.getClusterSettings(); this.shardLimitValidator = shardLimitValidator; + + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + restoreSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.RESTORE_SNAPSHOT_KEY, true); + } /** @@ -390,6 +398,11 @@ public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionLi final String restoreUUID = UUIDs.randomBase64UUID(); RestoreInfo restoreInfo = null; + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return restoreSnapshotTaskKey; + } + @Override public ClusterState execute(ClusterState currentState) { RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY); diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 4f672c9813d64..a777388218d16 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -77,6 +77,8 @@ import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -198,6 +200,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final OngoingRepositoryOperations repositoryOperations = new OngoingRepositoryOperations(); + private final ClusterManagerTaskThrottler.ThrottlingKey createSnapshotTaskKey; + private final ClusterManagerTaskThrottler.ThrottlingKey deleteSnapshotTaskKey; + private static ClusterManagerTaskThrottler.ThrottlingKey updateSnapshotStateTaskKey; + /** * Setting that specifies the maximum number of allowed concurrent snapshot create and delete operations in the * cluster state. The number of concurrent operations in a cluster state is defined as the sum of the sizes of @@ -242,6 +248,11 @@ public SnapshotsService( clusterService.getClusterSettings() .addSettingsUpdateConsumer(MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, i -> maxConcurrentOperations = i); } + + // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. + createSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_SNAPSHOT_KEY, true); + deleteSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SNAPSHOT_KEY, true); + updateSnapshotStateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.UPDATE_SNAPSHOT_STATE_KEY, true); } /** @@ -527,6 +538,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return createSnapshotTaskKey; + } + @Override public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { try { @@ -2273,6 +2289,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { .build(); } + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return deleteSnapshotTaskKey; + } + @Override public void onFailure(String source, Exception e) { listener.onFailure(e); @@ -3239,7 +3260,20 @@ public boolean assertAllListenersResolved() { * * Package private to allow for tests. */ - static final ClusterStateTaskExecutor SHARD_STATE_EXECUTOR = (currentState, tasks) -> { + static final ClusterStateTaskExecutor SHARD_STATE_EXECUTOR = new ClusterStateTaskExecutor() { + @Override + public ClusterTasksResult execute(ClusterState currentState, List tasks) + throws Exception { + return shardStateExecutor.execute(currentState, tasks); + } + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return updateSnapshotStateTaskKey; + } + }; + + static final ClusterStateTaskExecutor shardStateExecutor = (currentState, tasks) -> { int changedCount = 0; int startedCount = 0; final List entries = new ArrayList<>(); diff --git a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java index ff2bb77531486..58ea9b1c29467 100644 --- a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java @@ -56,6 +56,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.service.ClusterManagerThrottlingException; import org.opensearch.common.ParsingException; import org.opensearch.common.Strings; import org.opensearch.common.UUIDs; @@ -864,6 +865,7 @@ public void testIds() { ids.put(162, PrimaryShardClosedException.class); ids.put(163, DecommissioningFailedException.class); ids.put(164, NodeDecommissionedException.class); + ids.put(165, ClusterManagerThrottlingException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java index 1b7d848b626fe..2f9ae9a154f46 100644 --- a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java @@ -75,4 +75,45 @@ public void testWrapBackoffPolicy() { assertEquals(expectedRetries, retries.get()); } } + + public void testEqualJitterExponentialBackOffPolicy() { + int baseDelay = 10; + int maxDelay = 10000; + BackoffPolicy policy = BackoffPolicy.exponentialEqualJitterBackoff(baseDelay, maxDelay); + Iterator iterator = policy.iterator(); + + // Assert equal jitter + int retriesTillMaxDelay = 10; + for (int i = 0; i < retriesTillMaxDelay; i++) { + TimeValue delay = iterator.next(); + assertTrue(delay.getMillis() >= baseDelay * (1L << i) / 2); + assertTrue(delay.getMillis() <= baseDelay * (1L << i)); + } + + // Now policy should return max delay for next retries. + int retriesAfterMaxDelay = randomInt(10); + for (int i = 0; i < retriesAfterMaxDelay; i++) { + TimeValue delay = iterator.next(); + assertTrue(delay.getMillis() >= maxDelay / 2); + assertTrue(delay.getMillis() <= maxDelay); + } + } + + public void testExponentialBackOffPolicy() { + long baseDelay = 10; + int maxDelay = 10000; + long currentDelay = baseDelay; + BackoffPolicy policy = BackoffPolicy.exponentialFullJitterBackoff(baseDelay); + Iterator iterator = policy.iterator(); + + // Assert equal jitter + int numberOfRetries = randomInt(20); + + for (int i = 0; i < numberOfRetries; i++) { + TimeValue delay = iterator.next(); + assertTrue(delay.getMillis() >= 0); + assertTrue(delay.getMillis() <= currentDelay); + currentDelay = currentDelay * 2; + } + } } diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index 1195ed2590b1e..c45bae224dbd6 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -52,6 +52,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterManagerThrottlingException; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -80,6 +81,9 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; @@ -606,4 +610,102 @@ public void testDelegateToClusterManagerOnNodeWithDeprecatedMasterRole() throws assertTrue(listener.isDone()); assertThat(listener.get(), equalTo(response)); } + + public void testThrottlingRetryLocalMaster() throws InterruptedException, BrokenBarrierException { + Request request = new Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + AtomicBoolean exception = new AtomicBoolean(true); + AtomicBoolean retried = new AtomicBoolean(false); + CyclicBarrier barrier = new CyclicBarrier(2); + setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, new DiscoveryNode[] { localNode })); + + TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) { + @Override + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) { + if (exception.getAndSet(false)) { + throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for test"); + } else { + try { + retried.set(true); + barrier.await(); + } catch (Exception e) { + throw new AssertionError(); + } + } + } + }; + action.execute(request, listener); + + barrier.await(); + assertTrue(retried.get()); + assertFalse(exception.get()); + } + + public void testThrottlingRetryRemoteMaster() throws ExecutionException, InterruptedException { + Request request = new Request().clusterManagerNodeTimeout(TimeValue.timeValueSeconds(60)); + DiscoveryNode masterNode = this.remoteNode; + setState( + clusterService, + // use a random base version so it can go down when simulating a restart. + ClusterState.builder(ClusterStateCreationUtils.state(localNode, masterNode, new DiscoveryNode[] { localNode, masterNode })) + .version(randomIntBetween(0, 10)) + ); + + PlainActionFuture listener = new PlainActionFuture<>(); + TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool); + action.execute(request, listener); + + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests.length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = capturedRequests[0]; + assertTrue(capturedRequest.node.isMasterNode()); + assertThat(capturedRequest.request, equalTo(request)); + assertThat(capturedRequest.action, equalTo("internal:testAction")); + transport.handleRemoteError( + capturedRequest.requestId, + new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for test") + ); + + assertFalse(listener.isDone()); + + // waiting for retry to trigger + Thread.sleep(100); + + // Retry for above throttling exception + capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests.length, equalTo(1)); + capturedRequest = capturedRequests[0]; + Response response = new Response(); + transport.handleResponse(capturedRequest.requestId, response); + + assertTrue(listener.isDone()); + listener.get(); + } + + public void testRetryForDifferentException() throws InterruptedException, BrokenBarrierException { + Request request = new Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + AtomicBoolean exception = new AtomicBoolean(true); + AtomicBoolean retried = new AtomicBoolean(false); + CyclicBarrier barrier = new CyclicBarrier(2); + setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, new DiscoveryNode[] { localNode })); + + TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) { + @Override + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) + throws Exception { + if (exception.getAndSet(false)) { + throw new Exception("Different exception"); + } else { + // If called second time due to retry, throw exception + retried.set(true); + throw new AssertionError("Should not retry for other exception"); + } + } + }; + action.execute(request, listener); + + assertFalse(retried.get()); + assertFalse(exception.get()); + } } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java index c374392fc3d0e..5caea9f5bf674 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java @@ -39,6 +39,7 @@ import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; @@ -83,7 +84,7 @@ public void setUp() throws Exception { when(allocationService.reroute(any(ClusterState.class), any(String.class))).thenAnswer( mockInvocation -> mockInvocation.getArguments()[0] ); - service = new MetadataDeleteIndexService(Settings.EMPTY, null, allocationService); + service = new MetadataDeleteIndexService(Settings.EMPTY, mock(ClusterService.class), allocationService); } public void testDeleteMissing() { diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexAliasesServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexAliasesServiceTests.java index d37756dabfe8d..f3c7e73e419db 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexAliasesServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexAliasesServiceTests.java @@ -35,6 +35,7 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.set.Sets; import org.opensearch.index.Index; @@ -66,7 +67,7 @@ public class MetadataIndexAliasesServiceTests extends OpenSearchTestCase { private final AliasValidator aliasValidator = new AliasValidator(); private final MetadataDeleteIndexService deleteIndexService = mock(MetadataDeleteIndexService.class); private final MetadataIndexAliasesService service = new MetadataIndexAliasesService( - null, + mock(ClusterService.class), null, aliasValidator, deleteIndexService, diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java index aa1c500030bd6..cbd56c8d05116 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java @@ -56,7 +56,6 @@ import org.opensearch.index.mapper.MapperParsingException; import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.IndexTemplateMissingException; -import org.opensearch.indices.IndicesService; import org.opensearch.indices.InvalidIndexTemplateException; import org.opensearch.indices.SystemIndices; import org.opensearch.test.OpenSearchSingleNodeTestCase; @@ -2110,7 +2109,7 @@ private static List putTemplate(NamedXContentRegistry xContentRegistr new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()) ); MetadataIndexTemplateService service = new MetadataIndexTemplateService( - null, + clusterService, createIndexService, new AliasValidator(), null, @@ -2155,31 +2154,7 @@ public void onFailure(Exception e) { } private MetadataIndexTemplateService getMetadataIndexTemplateService() { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - ClusterService clusterService = getInstanceFromNode(ClusterService.class); - MetadataCreateIndexService createIndexService = new MetadataCreateIndexService( - Settings.EMPTY, - clusterService, - indicesService, - null, - null, - createTestShardLimitService(randomIntBetween(1, 1000), false), - new Environment(builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(), null), - IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, - null, - xContentRegistry(), - new SystemIndices(Collections.emptyMap()), - true, - new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()) - ); - return new MetadataIndexTemplateService( - clusterService, - createIndexService, - new AliasValidator(), - indicesService, - new IndexScopedSettings(Settings.EMPTY, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS), - xContentRegistry() - ); + return getInstanceFromNode(MetadataIndexTemplateService.class); } @SuppressWarnings("unchecked") diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java new file mode 100644 index 0000000000000..d20fed5c37361 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java @@ -0,0 +1,366 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.Version; +import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateTaskExecutor; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.common.Priority; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.ClusterServiceUtils; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import static org.opensearch.test.ClusterServiceUtils.setState; + +/** + * Contains tests for {@link ClusterManagerTaskThrottler} + */ +public class ClusterManagerTaskThrottlerTests extends OpenSearchTestCase { + + private static ThreadPool threadPool; + private ClusterService clusterService; + private DiscoveryNode localNode; + private DiscoveryNode[] allNodes; + private ClusterManagerThrottlingStats throttlingStats; + + @BeforeClass + public static void beforeClass() { + threadPool = new TestThreadPool("TransportMasterNodeActionTests"); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + localNode = new DiscoveryNode( + "local_node", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + Version.V_2_4_0 + ); + allNodes = new DiscoveryNode[] { localNode }; + throttlingStats = new ClusterManagerThrottlingStats(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + + public void testDefaults() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + clusterSettings, + () -> { return clusterService.getMasterService().getMinNodeVersion(); }, + throttlingStats + ); + throttler.registerClusterManagerTask("put-mapping", true); + throttler.registerClusterManagerTask("create-index", true); + for (String key : throttler.THROTTLING_TASK_KEYS.keySet()) { + assertNull(throttler.getThrottlingLimit(key)); + } + } + + public void testValidateSettingsForDifferentVersion() { + DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0); + DiscoveryNode dataNode = getDataNode(Version.V_2_0_0); + setState( + clusterService, + ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode }) + ); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + clusterSettings, + () -> { return clusterService.getMasterService().getMinNodeVersion(); }, + throttlingStats + ); + throttler.registerClusterManagerTask("put-mapping", true); + + // set some limit for update snapshot tasks + int newLimit = randomIntBetween(1, 10); + + Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build(); + assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); + } + + public void testValidateSettingsForTaskWihtoutRetryOnDataNode() { + DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0); + DiscoveryNode dataNode = getDataNode(Version.V_2_4_0); + setState( + clusterService, + ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode }) + ); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + clusterSettings, + () -> { return clusterService.getMasterService().getMinNodeVersion(); }, + throttlingStats + ); + throttler.registerClusterManagerTask("put-mapping", false); + + // set some limit for update snapshot tasks + int newLimit = randomIntBetween(1, 10); + + Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build(); + assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); + } + + public void testValidateSettingsForUnknownTask() { + DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0); + DiscoveryNode dataNode = getDataNode(Version.V_2_4_0); + setState( + clusterService, + ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode }) + ); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + clusterSettings, + () -> { return clusterService.getMasterService().getMinNodeVersion(); }, + throttlingStats + ); + + // set some limit for update snapshot tasks + int newLimit = randomIntBetween(1, 10); + + Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.random-task.value", newLimit).build(); + assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); + } + + public void testUpdateThrottlingLimitForBasicSanity() { + DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0); + DiscoveryNode dataNode = getDataNode(Version.V_2_4_0); + setState( + clusterService, + ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode }) + ); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + clusterSettings, + () -> { return clusterService.getMasterService().getMinNodeVersion(); }, + throttlingStats + ); + throttler.registerClusterManagerTask("put-mapping", true); + + // set some limit for update snapshot tasks + long newLimit = randomLongBetween(1, 10); + + Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build(); + clusterSettings.applySettings(newSettings); + assertEquals(newLimit, throttler.getThrottlingLimit("put-mapping").intValue()); + + // set update snapshot task limit to default + newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", -1).build(); + clusterSettings.applySettings(newSettings); + assertNull(throttler.getThrottlingLimit("put-mapping")); + } + + public void testValidateSettingForLimit() { + DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0); + DiscoveryNode dataNode = getDataNode(Version.V_2_4_0); + setState( + clusterService, + ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode }) + ); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + clusterSettings, + () -> { return clusterService.getMasterService().getMinNodeVersion(); }, + throttlingStats + ); + throttler.registerClusterManagerTask("put-mapping", true); + + Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.values", -5).build(); + assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); + } + + public void testUpdateLimit() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + clusterSettings, + () -> { return clusterService.getMasterService().getMinNodeVersion(); }, + throttlingStats + ); + throttler.registerClusterManagerTask("put-mapping", true); + + throttler.updateLimit("test", 5); + assertEquals(5L, throttler.getThrottlingLimit("test").intValue()); + throttler.updateLimit("test", -1); + assertNull(throttler.getThrottlingLimit("test")); + } + + private DiscoveryNode getDataNode(Version version) { + return new DiscoveryNode( + "local_data_node", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + version + ); + } + + private DiscoveryNode getClusterManagerNode(Version version) { + return new DiscoveryNode( + "local_master_node", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + version + ); + } + + public void testThrottlingForDisabledThrottlingTask() { + String taskKey = "test"; + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + clusterSettings, + () -> { return clusterService.getMasterService().getMinNodeVersion(); }, + throttlingStats + ); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, false); + + // adding limit directly in thresholds + throttler.updateLimit(taskKey, 5); + + // adding 10 tasks, should pass as throttling is disabled for task + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 10)); + + // Asserting that there was not any throttling for it + assertEquals(0L, throttlingStats.getThrottlingCount(taskKey)); + } + + public void testThrottling() { + String taskKey = "test"; + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + clusterSettings, + () -> { return clusterService.getMasterService().getMinNodeVersion(); }, + throttlingStats + ); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true); + + throttler.updateLimit(taskKey, 5); + + // adding 3 tasks + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + + // adding 3 more tasks, these tasks should be throttled + // taskCount in Queue: 3 Threshold: 5 + assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)) + ); + assertEquals(3L, throttlingStats.getThrottlingCount(taskKey)); + + // remove one task + throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1)); + + // add 3 tasks should pass now. + // taskCount in Queue: 2 Threshold: 5 + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + + // adding one task will throttle + // taskCount in Queue: 5 Threshold: 5 + assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) + ); + assertEquals(4L, throttlingStats.getThrottlingCount(taskKey)); + + // update limit of threshold 6 + throttler.updateLimit(taskKey, 6); + + // adding one task should pass now + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)); + } + + private List getMockUpdateTaskList( + String taskKey, + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey, + int size + ) { + TaskBatcherTests.TestTaskBatcher testTaskBatcher = new TaskBatcherTests.TestTaskBatcher(logger, null); + List taskList = new ArrayList<>(); + + class MockExecutor + implements + TaskExecutorTests.TestExecutor, + ClusterStateTaskExecutor { + + @Override + public ClusterTasksResult execute( + ClusterState currentState, + List tasks + ) throws Exception { + // No Op + return null; + } + + @Override + public boolean runOnlyOnMaster() { + return true; + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {} + + @Override + public void execute(List tasks) {} + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return throttlingKey; + } + + @Override + public String describeTasks(List tasks) { + return taskKey; + } + } + + for (int i = 0; i < size; i++) { + taskList.add(testTaskBatcher.new UpdateTask(Priority.HIGH, taskKey, taskKey, (source, e) -> { + if (!(e instanceof ClusterManagerThrottlingException)) { + throw new AssertionError(e); + } + }, new MockExecutor())); + } + return taskList; + } +} diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index a397f295bcaf2..fb47cb8e2d65a 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -692,6 +692,273 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } } + public void testThrottlingForTaskSubmission() throws InterruptedException { + MasterService masterService = createClusterManagerService(true); + int throttlingLimit = randomIntBetween(1, 10); + int taskId = 1; + final CyclicBarrier barrier = new CyclicBarrier(2); + final CountDownLatch latch = new CountDownLatch(1); + final String taskName = "test"; + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = masterService.registerClusterManagerTask(taskName, true); + class Task { + private final int id; + + Task(int id) { + this.id = id; + } + } + + class TaskExecutor implements ClusterStateTaskExecutor { + private AtomicInteger published = new AtomicInteger(); + + @Override + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + latch.countDown(); + barrier.await(); + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return throttlingKey; + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + published.incrementAndGet(); + } + } + + masterService.clusterManagerTaskThrottler.updateLimit(taskName, throttlingLimit); + + final ClusterStateTaskListener listener = new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) {} + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {} + }; + + TaskExecutor executor = new TaskExecutor(); + // submit one task which will be execution, post that will submit throttlingLimit tasks. + try { + masterService.submitStateUpdateTask( + taskName, + new Task(taskId++), + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor, + listener + ); + } catch (Exception e) { + throw new AssertionError(e); + } + // wait till task enter in execution. + latch.await(); + + for (int i = 0; i < throttlingLimit; i++) { + try { + masterService.submitStateUpdateTask( + taskName, + new Task(taskId++), + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor, + listener + ); + } catch (Exception e) { + throw new AssertionError(e); + } + } + + // we have one task in execution and tasks in queue so next task should throttled. + final AtomicReference assertionRef = new AtomicReference<>(); + try { + masterService.submitStateUpdateTask( + taskName, + new Task(taskId++), + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor, + listener + ); + } catch (ClusterManagerThrottlingException e) { + assertionRef.set(e); + } + assertNotNull(assertionRef.get()); + masterService.close(); + } + + public void testThrottlingForMultipleTaskTypes() throws InterruptedException { + MasterService masterService = createClusterManagerService(true); + int throttlingLimitForTask1 = randomIntBetween(1, 5); + int throttlingLimitForTask2 = randomIntBetween(1, 5); + int throttlingLimitForTask3 = randomIntBetween(1, 5); + int numberOfTask1 = randomIntBetween(throttlingLimitForTask1, 10); + int numberOfTask2 = randomIntBetween(throttlingLimitForTask2, 10); + int numberOfTask3 = randomIntBetween(throttlingLimitForTask3, 10); + String task1 = "Task1"; + String task2 = "Task2"; + String task3 = "Task3"; + + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey1 = masterService.registerClusterManagerTask(task1, true); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey2 = masterService.registerClusterManagerTask(task2, true); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey3 = masterService.registerClusterManagerTask(task3, true); + class Task {} + class Task1 extends Task {} + class Task2 extends Task {} + class Task3 extends Task {} + + class Task1Executor implements ClusterStateTaskExecutor { + @Override + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + Thread.sleep(randomInt(1000)); + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {} + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return throttlingKey1; + } + } + + class Task2Executor implements ClusterStateTaskExecutor { + @Override + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + Thread.sleep(randomInt(1000)); + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {} + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return throttlingKey2; + } + } + + class Task3Executor implements ClusterStateTaskExecutor { + @Override + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + Thread.sleep(randomInt(1000)); + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {} + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return throttlingKey3; + } + } + + // configuring limits for Task1 and Task3. All task submission of Task2 should pass. + masterService.clusterManagerTaskThrottler.updateLimit(task1, throttlingLimitForTask1); + masterService.clusterManagerTaskThrottler.updateLimit(task3, throttlingLimitForTask3); + final CountDownLatch latch = new CountDownLatch(numberOfTask1 + numberOfTask2 + numberOfTask3); + AtomicInteger throttledTask1 = new AtomicInteger(); + AtomicInteger throttledTask2 = new AtomicInteger(); + AtomicInteger throttledTask3 = new AtomicInteger(); + AtomicInteger succeededTask1 = new AtomicInteger(); + AtomicInteger succeededTask2 = new AtomicInteger(); + AtomicInteger timedOutTask3 = new AtomicInteger(); + + final ClusterStateTaskListener listener = new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) { + // Task3's timeout should have called this. + assertEquals(task3, source); + timedOutTask3.incrementAndGet(); + latch.countDown(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (source.equals(task1)) { + succeededTask1.incrementAndGet(); + } else if (source.equals(task2)) { + succeededTask2.incrementAndGet(); + } + latch.countDown(); + } + }; + Task1Executor executor1 = new Task1Executor(); + Task2Executor executor2 = new Task2Executor(); + Task3Executor executor3 = new Task3Executor(); + List threads = new ArrayList(); + for (int i = 0; i < numberOfTask1; i++) { + threads.add(new Thread(new Runnable() { + @Override + public void run() { + try { + masterService.submitStateUpdateTask( + task1, + new Task1(), + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor1, + listener + ); + } catch (ClusterManagerThrottlingException e) { + // Exception should be RejactedExecutionException. + throttledTask1.incrementAndGet(); + latch.countDown(); + } + } + })); + } + for (int i = 0; i < numberOfTask2; i++) { + threads.add(new Thread(new Runnable() { + @Override + public void run() { + try { + masterService.submitStateUpdateTask( + task2, + new Task2(), + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor2, + listener + ); + } catch (ClusterManagerThrottlingException e) { + throttledTask2.incrementAndGet(); + latch.countDown(); + } + } + })); + } + for (int i = 0; i < numberOfTask3; i++) { + threads.add(new Thread(new Runnable() { + @Override + public void run() { + try { + masterService.submitStateUpdateTask( + task3, + new Task3(), + ClusterStateTaskConfig.build(randomFrom(Priority.values()), new TimeValue(0)), + executor3, + listener + ); + } catch (ClusterManagerThrottlingException e) { + throttledTask3.incrementAndGet(); + latch.countDown(); + } + } + })); + } + for (Thread thread : threads) { + thread.start(); + } + + // await for latch to clear + latch.await(); + assertEquals(numberOfTask1, throttledTask1.get() + succeededTask1.get()); + assertEquals(numberOfTask2, succeededTask2.get()); + assertEquals(0, throttledTask2.get()); + assertEquals(numberOfTask3, throttledTask3.get() + timedOutTask3.get()); + masterService.close(); + } + public void testBlockingCallInClusterStateTaskListenerFails() throws InterruptedException { assumeTrue("assertions must be enabled for this test to work", BaseFuture.class.desiredAssertionStatus()); final CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java b/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java index aec2641b355d3..31018d4cef029 100644 --- a/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java @@ -71,10 +71,10 @@ public void setUpBatchingTaskExecutor() throws Exception { taskBatcher = new TestTaskBatcher(logger, threadExecutor); } - class TestTaskBatcher extends TaskBatcher { + static class TestTaskBatcher extends TaskBatcher { TestTaskBatcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor) { - super(logger, threadExecutor); + super(logger, threadExecutor, getMockListener()); } @Override @@ -344,6 +344,30 @@ public void onFailure(String source, Exception e) { latch.await(); } + protected static TaskBatcherListener getMockListener() { + return new TaskBatcherListener() { + @Override + public void onBeginSubmit(List tasks) { + // No Op + } + + @Override + public void onSubmitFailure(List tasks) { + // No Op + } + + @Override + public void onBeginProcessing(List tasks) { + // No Op + } + + @Override + public void onTimeout(List tasks) { + // No Op + } + }; + } + private static class SimpleTask { private final int id; diff --git a/server/src/test/java/org/opensearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/opensearch/persistent/PersistentTasksClusterServiceTests.java index 7e23e6ef3748c..7659bce456381 100644 --- a/server/src/test/java/org/opensearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/opensearch/persistent/PersistentTasksClusterServiceTests.java @@ -201,7 +201,7 @@ public void testReassignmentRequiredOnMetadataChanges() { public void testReassignTasksWithNoTasks() { ClusterState clusterState = initialState(); - assertThat(reassign(clusterState).metadata().custom(PersistentTasksCustomMetadata.TYPE), nullValue()); + assertThat(reassign(createService(), clusterState).metadata().custom(PersistentTasksCustomMetadata.TYPE), nullValue()); } public void testReassignConsidersClusterStateUpdates() { @@ -219,7 +219,7 @@ public void testReassignConsidersClusterStateUpdates() { Metadata.Builder metadata = Metadata.builder(clusterState.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build()); clusterState = builder.metadata(metadata).nodes(nodes).build(); - ClusterState newClusterState = reassign(clusterState); + ClusterState newClusterState = reassign(createService(), clusterState); PersistentTasksCustomMetadata tasksInProgress = newClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); assertThat(tasksInProgress, notNullValue()); @@ -238,7 +238,8 @@ public void testNonClusterStateConditionAssignment() { clusterState = builder.metadata(metadata).nodes(nodes).build(); nonClusterStateCondition = false; - ClusterState newClusterState = reassign(clusterState); + PersistentTasksClusterService service = createService(); + ClusterState newClusterState = reassign(service, clusterState); PersistentTasksCustomMetadata tasksInProgress = newClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); assertThat(tasksInProgress, notNullValue()); @@ -250,7 +251,7 @@ public void testNonClusterStateConditionAssignment() { assertThat(tasksInProgress.tasks().size(), equalTo(1)); nonClusterStateCondition = true; - ClusterState finalClusterState = reassign(newClusterState); + ClusterState finalClusterState = reassign(service, newClusterState); tasksInProgress = finalClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); assertThat(tasksInProgress, notNullValue()); @@ -289,7 +290,7 @@ public void testReassignTasks() { } Metadata.Builder metadata = Metadata.builder(clusterState.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build()); clusterState = builder.metadata(metadata).nodes(nodes).build(); - ClusterState newClusterState = reassign(clusterState); + ClusterState newClusterState = reassign(createService(), clusterState); PersistentTasksCustomMetadata tasksInProgress = newClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); assertThat(tasksInProgress, notNullValue()); @@ -624,8 +625,8 @@ private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) } } - private ClusterState reassign(ClusterState clusterState) { - PersistentTasksClusterService service = createService((params, currentState) -> { + private PersistentTasksClusterService createService() { + return createService((params, currentState) -> { TestParams testParams = (TestParams) params; switch (testParams.getTestParam()) { case "assign_me": @@ -644,7 +645,9 @@ private ClusterState reassign(ClusterState clusterState) { } return NO_NODE_FOUND; }); + } + private ClusterState reassign(PersistentTasksClusterService service, ClusterState clusterState) { return service.reassignTasks(clusterState); }