From 7bf3e440e6730bf8b1bdd857eb452414cad0eacd Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Fri, 16 Dec 2022 22:32:29 +0530 Subject: [PATCH] Backporting Minor bug fix related to cluster manager throttling settings Signed-off-by: Dhwanil Patel --- CHANGELOG.md | 3 + .../service/ClusterManagerTaskThrottler.java | 25 +++-- .../cluster/service/MasterService.java | 7 +- .../ClusterManagerTaskThrottlerTests.java | 92 +++++++++++++++---- 4 files changed, 99 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index afcb67923152e..1ba1467ef5b64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Adding auto release workflow ([#5582](https://github.com/opensearch-project/OpenSearch/pull/5582)) - Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518)) + ### Dependencies - Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148)) - Bumps `commons-compress` from 1.21 to 1.22 ([#5104](https://github.com/opensearch-project/OpenSearch/pull/5104)) @@ -42,6 +43,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix 1.x compatibility bug with stored Tasks ([#5412](https://github.com/opensearch-project/OpenSearch/pull/5412)) - Fix case sensitivity for wildcard queries ([#5462](https://github.com/opensearch-project/OpenSearch/pull/5462)) - Support OpenSSL Provider with default Netty allocator ([#5499](https://github.com/opensearch-project/OpenSearch/pull/5499)) +- Apply cluster manager throttling settings during bootstrap ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524)) +- Update thresholds map when cluster manager throttling setting is removed ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524)) ### Security [Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.4...2.x diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java index 249b4ff5316d9..06cb2a34c3b37 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java @@ -16,8 +16,10 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Supplier; @@ -50,15 +52,18 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener { private final Supplier minNodeVersionSupplier; public ClusterManagerTaskThrottler( + final Settings settings, final ClusterSettings clusterSettings, final Supplier minNodeVersionSupplier, final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener ) { - clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting); - this.minNodeVersionSupplier = minNodeVersionSupplier; - this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener; tasksCount = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment tasksThreshold = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment + this.minNodeVersionSupplier = minNodeVersionSupplier; + this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener; + clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting); + // Required for setting values as per current settings during node bootstrap + updateSetting(THRESHOLD_SETTINGS.get(settings)); } /** @@ -128,10 +133,16 @@ void validateSetting(final Settings settings) { } } - void updateSetting(final Settings settings) { - Map groups = settings.getAsGroups(); - for (String key : groups.keySet()) { - updateLimit(key, groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE)); + void updateSetting(final Settings newSettings) { + Map groups = newSettings.getAsGroups(); + Set settingKeys = new HashSet<>(); + // Adding keys which are present in new Setting + settingKeys.addAll(groups.keySet()); + // Adding existing keys that may need to be set to a default value if that is removed in new setting. + settingKeys.addAll(tasksThreshold.keySet()); + for (String key : settingKeys) { + Settings setting = groups.get(key); + updateLimit(key, setting == null ? MIN_THRESHOLD_VALUE : setting.getAsInt("value", MIN_THRESHOLD_VALUE)); } } diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index f78e2c760ebb3..02bc9633b54d9 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -141,7 +141,12 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP ); this.throttlingStats = new ClusterManagerThrottlingStats(); - this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler(clusterSettings, this::getMinNodeVersion, throttlingStats); + this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler( + settings, + clusterSettings, + this::getMinNodeVersion, + throttlingStats + ); this.threadPool = threadPool; } diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java index c5e706e50c298..8f3ee1cff706c 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java @@ -42,9 +42,6 @@ public class ClusterManagerTaskThrottlerTests extends OpenSearchTestCase { private static ThreadPool threadPool; private ClusterService clusterService; - private DiscoveryNode localNode; - private DiscoveryNode[] allNodes; - private ClusterManagerThrottlingStats throttlingStats; @BeforeClass public static void beforeClass() { @@ -56,15 +53,6 @@ public static void beforeClass() { public void setUp() throws Exception { super.setUp(); clusterService = ClusterServiceUtils.createClusterService(threadPool); - localNode = new DiscoveryNode( - "local_node", - buildNewFakeTransportAddress(), - Collections.emptyMap(), - Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), - Version.V_2_4_0 - ); - allNodes = new DiscoveryNode[] { localNode }; - throttlingStats = new ClusterManagerThrottlingStats(); } @After @@ -82,9 +70,10 @@ public static void afterClass() { public void testDefaults() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + Settings.EMPTY, clusterSettings, () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats + new ClusterManagerThrottlingStats() ); throttler.registerClusterManagerTask("put-mapping", true); throttler.registerClusterManagerTask("create-index", true); @@ -103,9 +92,10 @@ public void testValidateSettingsForDifferentVersion() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + Settings.EMPTY, clusterSettings, () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats + new ClusterManagerThrottlingStats() ); throttler.registerClusterManagerTask("put-mapping", true); @@ -135,9 +125,10 @@ public void testValidateSettingsForTaskWihtoutRetryOnDataNode() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + Settings.EMPTY, clusterSettings, () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats + new ClusterManagerThrottlingStats() ); throttler.registerClusterManagerTask("put-mapping", false); @@ -148,6 +139,60 @@ public void testValidateSettingsForTaskWihtoutRetryOnDataNode() { assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); } + public void testUpdateSettingsForNullValue() { + DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0); + DiscoveryNode dataNode = getDataNode(Version.V_2_4_0); + setState( + clusterService, + ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode }) + ); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + Settings.EMPTY, + clusterSettings, + () -> { return clusterService.getMasterService().getMinNodeVersion(); }, + new ClusterManagerThrottlingStats() + ); + throttler.registerClusterManagerTask("put-mapping", true); + + // set some limit for put-mapping tasks + int newLimit = randomIntBetween(1, 10); + Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build(); + clusterSettings.applySettings(newSettings); + assertEquals(newLimit, throttler.getThrottlingLimit("put-mapping").intValue()); + + // set limit to null + Settings nullSettings = Settings.builder().build(); + clusterSettings.applySettings(nullSettings); + assertNull(throttler.getThrottlingLimit("put-mapping")); + } + + public void testSettingsOnBootstrap() { + DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0); + DiscoveryNode dataNode = getDataNode(Version.V_2_4_0); + setState( + clusterService, + ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode }) + ); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + int put_mapping_threshold_value = randomIntBetween(1, 10); + Settings initialSettings = Settings.builder() + .put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value) + .build(); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + initialSettings, + clusterSettings, + () -> { return clusterService.getMasterService().getMinNodeVersion(); }, + new ClusterManagerThrottlingStats() + ); + throttler.registerClusterManagerTask("put-mapping", true); + + // assert that limit is applied on throttler + assertEquals(put_mapping_threshold_value, throttler.getThrottlingLimit("put-mapping").intValue()); + } + public void testValidateSettingsForUnknownTask() { DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0); DiscoveryNode dataNode = getDataNode(Version.V_2_4_0); @@ -158,14 +203,14 @@ public void testValidateSettingsForUnknownTask() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + Settings.EMPTY, clusterSettings, () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats + new ClusterManagerThrottlingStats() ); // set some limit for update snapshot tasks int newLimit = randomIntBetween(1, 10); - Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.random-task.value", newLimit).build(); assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); } @@ -180,9 +225,10 @@ public void testUpdateThrottlingLimitForBasicSanity() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + Settings.EMPTY, clusterSettings, () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats + new ClusterManagerThrottlingStats() ); throttler.registerClusterManagerTask("put-mapping", true); @@ -209,9 +255,10 @@ public void testValidateSettingForLimit() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + Settings.EMPTY, clusterSettings, () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats + new ClusterManagerThrottlingStats() ); throttler.registerClusterManagerTask("put-mapping", true); @@ -222,9 +269,10 @@ public void testValidateSettingForLimit() { public void testUpdateLimit() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + Settings.EMPTY, clusterSettings, () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats + new ClusterManagerThrottlingStats() ); throttler.registerClusterManagerTask("put-mapping", true); @@ -255,9 +303,11 @@ private DiscoveryNode getClusterManagerNode(Version version) { } public void testThrottlingForDisabledThrottlingTask() { + ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats(); String taskKey = "test"; ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + Settings.EMPTY, clusterSettings, () -> { return clusterService.getMasterService().getMinNodeVersion(); }, throttlingStats @@ -275,9 +325,11 @@ public void testThrottlingForDisabledThrottlingTask() { } public void testThrottling() { + ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats(); String taskKey = "test"; ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + Settings.EMPTY, clusterSettings, () -> { return clusterService.getMasterService().getMinNodeVersion(); }, throttlingStats