Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Minor bug fix related to cluster manager throttling settings #5598

Merged
merged 1 commit into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Adding auto release workflow ([#5582](https://github.com/opensearch-project/OpenSearch/pull/5582))
- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518))


### Dependencies
- Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148))
- Bumps `commons-compress` from 1.21 to 1.22 ([#5104](https://github.com/opensearch-project/OpenSearch/pull/5104))
Expand All @@ -42,6 +43,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix 1.x compatibility bug with stored Tasks ([#5412](https://github.com/opensearch-project/OpenSearch/pull/5412))
- Fix case sensitivity for wildcard queries ([#5462](https://github.com/opensearch-project/OpenSearch/pull/5462))
- Support OpenSSL Provider with default Netty allocator ([#5499](https://github.com/opensearch-project/OpenSearch/pull/5499))
- Apply cluster manager throttling settings during bootstrap ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524))
- Update thresholds map when cluster manager throttling setting is removed ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524))
### Security

[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.4...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
Expand Down Expand Up @@ -50,15 +52,18 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private final Supplier<Version> minNodeVersionSupplier;

public ClusterManagerTaskThrottler(
final Settings settings,
final ClusterSettings clusterSettings,
final Supplier<Version> minNodeVersionSupplier,
final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener
) {
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting);
this.minNodeVersionSupplier = minNodeVersionSupplier;
this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener;
tasksCount = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment
tasksThreshold = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment
this.minNodeVersionSupplier = minNodeVersionSupplier;
this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener;
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting);
// Required for setting values as per current settings during node bootstrap
updateSetting(THRESHOLD_SETTINGS.get(settings));
}

/**
Expand Down Expand Up @@ -128,10 +133,16 @@ void validateSetting(final Settings settings) {
}
}

void updateSetting(final Settings settings) {
Map<String, Settings> groups = settings.getAsGroups();
for (String key : groups.keySet()) {
updateLimit(key, groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE));
void updateSetting(final Settings newSettings) {
Map<String, Settings> groups = newSettings.getAsGroups();
Set<String> settingKeys = new HashSet<>();
// Adding keys which are present in new Setting
settingKeys.addAll(groups.keySet());
// Adding existing keys that may need to be set to a default value if that is removed in new setting.
settingKeys.addAll(tasksThreshold.keySet());
for (String key : settingKeys) {
Settings setting = groups.get(key);
updateLimit(key, setting == null ? MIN_THRESHOLD_VALUE : setting.getAsInt("value", MIN_THRESHOLD_VALUE));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,12 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
);

this.throttlingStats = new ClusterManagerThrottlingStats();
this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler(clusterSettings, this::getMinNodeVersion, throttlingStats);
this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler(
settings,
clusterSettings,
this::getMinNodeVersion,
throttlingStats
);
this.threadPool = threadPool;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ public class ClusterManagerTaskThrottlerTests extends OpenSearchTestCase {

private static ThreadPool threadPool;
private ClusterService clusterService;
private DiscoveryNode localNode;
private DiscoveryNode[] allNodes;
private ClusterManagerThrottlingStats throttlingStats;

@BeforeClass
public static void beforeClass() {
Expand All @@ -56,15 +53,6 @@ public static void beforeClass() {
public void setUp() throws Exception {
super.setUp();
clusterService = ClusterServiceUtils.createClusterService(threadPool);
localNode = new DiscoveryNode(
"local_node",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE),
Version.V_2_4_0
);
allNodes = new DiscoveryNode[] { localNode };
throttlingStats = new ClusterManagerThrottlingStats();
}

@After
Expand All @@ -82,9 +70,10 @@ public static void afterClass() {
public void testDefaults() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);
throttler.registerClusterManagerTask("create-index", true);
Expand All @@ -103,9 +92,10 @@ public void testValidateSettingsForDifferentVersion() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);

Expand Down Expand Up @@ -135,9 +125,10 @@ public void testValidateSettingsForTaskWihtoutRetryOnDataNode() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", false);

Expand All @@ -148,6 +139,60 @@ public void testValidateSettingsForTaskWihtoutRetryOnDataNode() {
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings));
}

public void testUpdateSettingsForNullValue() {
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_4_0);
setState(
clusterService,
ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode })
);

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);

// set some limit for put-mapping tasks
int newLimit = randomIntBetween(1, 10);
Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build();
clusterSettings.applySettings(newSettings);
assertEquals(newLimit, throttler.getThrottlingLimit("put-mapping").intValue());

// set limit to null
Settings nullSettings = Settings.builder().build();
clusterSettings.applySettings(nullSettings);
assertNull(throttler.getThrottlingLimit("put-mapping"));
}

public void testSettingsOnBootstrap() {
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_4_0);
setState(
clusterService,
ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode })
);

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
int put_mapping_threshold_value = randomIntBetween(1, 10);
Settings initialSettings = Settings.builder()
.put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value)
.build();
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
initialSettings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);

// assert that limit is applied on throttler
assertEquals(put_mapping_threshold_value, throttler.getThrottlingLimit("put-mapping").intValue());
}

public void testValidateSettingsForUnknownTask() {
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_4_0);
Expand All @@ -158,14 +203,14 @@ public void testValidateSettingsForUnknownTask() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);

// set some limit for update snapshot tasks
int newLimit = randomIntBetween(1, 10);

Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.random-task.value", newLimit).build();
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings));
}
Expand All @@ -180,9 +225,10 @@ public void testUpdateThrottlingLimitForBasicSanity() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);

Expand All @@ -209,9 +255,10 @@ public void testValidateSettingForLimit() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);

Expand All @@ -222,9 +269,10 @@ public void testValidateSettingForLimit() {
public void testUpdateLimit() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);

Expand Down Expand Up @@ -255,9 +303,11 @@ private DiscoveryNode getClusterManagerNode(Version version) {
}

public void testThrottlingForDisabledThrottlingTask() {
ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
String taskKey = "test";
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
Expand All @@ -275,9 +325,11 @@ public void testThrottlingForDisabledThrottlingTask() {
}

public void testThrottling() {
ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
String taskKey = "test";
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
Expand Down