diff --git a/CHANGELOG.md b/CHANGELOG.md index 04f3fbeb4b068..d5c59ba1c0fe7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support remote translog transfer for request level durability([#4480](https://github.com/opensearch-project/OpenSearch/pull/4480)) - Changed http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) - Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) +- Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/OpenSearchException.java b/server/src/main/java/org/opensearch/OpenSearchException.java index aef098403ec2b..78f6b50b3a039 100644 --- a/server/src/main/java/org/opensearch/OpenSearchException.java +++ b/server/src/main/java/org/opensearch/OpenSearchException.java @@ -34,6 +34,7 @@ import org.opensearch.action.support.replication.ReplicationOperation; import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.routing.UnsupportedWeightedRoutingStateException; import org.opensearch.cluster.service.ClusterManagerThrottlingException; import org.opensearch.common.CheckedFunction; import org.opensearch.common.Nullable; @@ -70,6 +71,7 @@ import static java.util.Collections.unmodifiableMap; import static org.opensearch.Version.V_2_1_0; import static org.opensearch.Version.V_2_4_0; +import static org.opensearch.Version.V_2_5_0; import static org.opensearch.Version.V_3_0_0; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE; import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; @@ -1618,6 +1620,12 @@ private enum OpenSearchExceptionHandle { SnapshotInUseDeletionException::new, 166, UNKNOWN_VERSION_ADDED + ), + UNSUPPORTED_WEIGHTED_ROUTING_STATE_EXCEPTION( + UnsupportedWeightedRoutingStateException.class, + UnsupportedWeightedRoutingStateException::new, + 167, + V_2_5_0 ); final Class<? extends OpenSearchException> exceptionClass; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java index af229fb12b4f0..5474f4effa829 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.util.HashMap; -import java.util.Locale; import java.util.Map; import static org.opensearch.action.ValidateActions.addValidationError; @@ -127,26 +126,17 @@ public ActionRequestValidationException validate() { if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) { validationException = addValidationError("Weights are missing", validationException); } - int countValueWithZeroWeights = 0; - double weight; try { for (Object value : weightedRouting.weights().values()) { if (value == null) { validationException = addValidationError(("Weight is null"), validationException); } else { - weight = Double.parseDouble(value.toString()); - countValueWithZeroWeights = (weight == 0) ? countValueWithZeroWeights + 1 : countValueWithZeroWeights; + Double.parseDouble(value.toString()); } } } catch (NumberFormatException e) { validationException = addValidationError(("Weight is not a number"), validationException); } - if (countValueWithZeroWeights > 1) { - validationException = addValidationError( - (String.format(Locale.ROOT, "More than one [%d] value has weight set as 0", countValueWithZeroWeights)), - validationException - ); - } return validationException; } diff --git a/server/src/main/java/org/opensearch/cluster/routing/UnsupportedWeightedRoutingStateException.java b/server/src/main/java/org/opensearch/cluster/routing/UnsupportedWeightedRoutingStateException.java new file mode 100644 index 0000000000000..fd4fd4163ede6 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/UnsupportedWeightedRoutingStateException.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.rest.RestStatus; + +import java.io.IOException; + +/** + * Thrown when failing to update the routing weight due to an unsupported state. See {@link WeightedRoutingService} for more details. + * + * @opensearch.internal + */ +public class UnsupportedWeightedRoutingStateException extends OpenSearchException { + public UnsupportedWeightedRoutingStateException(StreamInput in) throws IOException { + super(in); + } + + public UnsupportedWeightedRoutingStateException(String msg, Object... args) { + super(msg, args); + } + + @Override + public RestStatus status() { + return RestStatus.CONFLICT; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index 6acb4a1e832cb..2b5961c7340c1 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -19,6 +19,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; +import org.opensearch.cluster.decommission.DecommissionAttribute; import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.cluster.metadata.Metadata; @@ -32,10 +33,16 @@ import org.opensearch.common.settings.Settings; import org.opensearch.threadpool.ThreadPool; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import static org.opensearch.action.ValidateActions.addValidationError; +import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING; /** * * Service responsible for updating cluster state metadata with weighted routing weights @@ -45,6 +52,8 @@ public class WeightedRoutingService { private final ClusterService clusterService; private final ThreadPool threadPool; private volatile List<String> awarenessAttributes; + private volatile Map<String, List<String>> forcedAwarenessAttributes; + private static final Double DECOMMISSIONED_AWARENESS_VALUE_WEIGHT = 0.0; @Inject public WeightedRoutingService( @@ -60,6 +69,11 @@ public WeightedRoutingService( AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes ); + setForcedAwarenessAttributes(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.get(settings)); + clusterSettings.addSettingsUpdateConsumer( + CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, + this::setForcedAwarenessAttributes + ); } public void registerWeightedRoutingMetadata( @@ -70,8 +84,10 @@ public void registerWeightedRoutingMetadata( clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { - // verify currently no decommission action is ongoing - ensureNoOngoingDecommissionAction(currentState); + // verify that request object has weights for all discovered and forced awareness values + ensureWeightsSetForAllDiscoveredAndForcedAwarenessValues(currentState, request); + // verify weights will not be updated for a decommissioned attribute + ensureDecommissionedAttributeHasZeroWeight(currentState, request); Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); @@ -148,6 +164,18 @@ private void setAwarenessAttributes(List<String> awarenessAttributes) { this.awarenessAttributes = awarenessAttributes; } + private void setForcedAwarenessAttributes(Settings forceSettings) { + Map<String, List<String>> forcedAwarenessAttributes = new HashMap<>(); + Map<String, Settings> forceGroups = forceSettings.getAsGroups(); + for (Map.Entry<String, Settings> entry : forceGroups.entrySet()) { + List<String> aValues = entry.getValue().getAsList("values"); + if (aValues.size() > 0) { + forcedAwarenessAttributes.put(entry.getKey(), aValues); + } + } + this.forcedAwarenessAttributes = forcedAwarenessAttributes; + } + public void verifyAwarenessAttribute(String attributeName) { if (getAwarenessAttributes().contains(attributeName) == false) { ActionRequestValidationException validationException = null; @@ -159,13 +187,62 @@ public void verifyAwarenessAttribute(String attributeName) { } } - public void ensureNoOngoingDecommissionAction(ClusterState state) { + private void ensureWeightsSetForAllDiscoveredAndForcedAwarenessValues(ClusterState state, ClusterPutWeightedRoutingRequest request) { + String attributeName = request.getWeightedRouting().attributeName(); + Set<String> discoveredAwarenessValues = new HashSet<>(); + state.nodes().forEach(node -> { + if (node.getAttributes().containsKey(attributeName)) { + discoveredAwarenessValues.add(node.getAttributes().get(attributeName)); + } + }); + Set<String> allAwarenessValues; + if (forcedAwarenessAttributes.get(attributeName) == null) { + allAwarenessValues = new HashSet<>(); + } else { + allAwarenessValues = new HashSet<>(forcedAwarenessAttributes.get(attributeName)); + } + allAwarenessValues.addAll(discoveredAwarenessValues); + allAwarenessValues.forEach(awarenessValue -> { + if (request.getWeightedRouting().weights().containsKey(awarenessValue) == false) { + throw new UnsupportedWeightedRoutingStateException( + "weight for [" + awarenessValue + "] is not set and it is part of forced awareness value or a node has this attribute." + ); + } + }); + } + + private void ensureDecommissionedAttributeHasZeroWeight(ClusterState state, ClusterPutWeightedRoutingRequest request) { DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata(); - if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED) == false) { - throw new IllegalStateException( - "a decommission action is ongoing with status [" - + decommissionAttributeMetadata.status().status() - + "], cannot update weight during this state" + if (decommissionAttributeMetadata == null || decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED)) { + // here either there's no decommission action is ongoing or it is in failed state. In this case, we will allow weight update + return; + } + DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); + WeightedRouting weightedRouting = request.getWeightedRouting(); + if (weightedRouting.attributeName().equals(decommissionAttribute.attributeName()) == false) { + // this is unexpected when a different attribute is requested for decommission and weight update is on another attribute + throw new UnsupportedWeightedRoutingStateException( + "decommission action ongoing for attribute [{}], cannot update weight for [{}]", + decommissionAttribute.attributeName(), + weightedRouting.attributeName() + ); + } + if (weightedRouting.weights().containsKey(decommissionAttribute.attributeValue()) == false) { + // weight of an attribute undergoing decommission must be specified + throw new UnsupportedWeightedRoutingStateException( + "weight for [{}] is not specified. Please specify its weight to [{}] as it is under decommission action", + decommissionAttribute.attributeValue(), + DECOMMISSIONED_AWARENESS_VALUE_WEIGHT + ); + } + if (Objects.equals( + weightedRouting.weights().get(decommissionAttribute.attributeValue()), + DECOMMISSIONED_AWARENESS_VALUE_WEIGHT + ) == false) { + throw new UnsupportedWeightedRoutingStateException( + "weight for [{}] must be set to [{}] as it is under decommission action", + decommissionAttribute.attributeValue(), + DECOMMISSIONED_AWARENESS_VALUE_WEIGHT ); } } diff --git a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java index 559963b0e0b68..a601d20af5a3f 100644 --- a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java @@ -56,6 +56,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.routing.UnsupportedWeightedRoutingStateException; import org.opensearch.cluster.service.ClusterManagerThrottlingException; import org.opensearch.common.ParsingException; import org.opensearch.common.Strings; @@ -864,6 +865,7 @@ public void testIds() { ids.put(164, NodeDecommissionedException.class); ids.put(165, ClusterManagerThrottlingException.class); ids.put(166, SnapshotInUseDeletionException.class); + ids.put(167, UnsupportedWeightedRoutingStateException.class); Map<Class<? extends OpenSearchException>, Integer> reverse = new HashMap<>(); for (Map.Entry<Integer, Class<? extends OpenSearchException>> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java index 186e7e8638f17..cdec66d6683eb 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java @@ -35,15 +35,6 @@ public void testValidate_ValuesAreProper() { assertNull(actionRequestValidationException); } - public void testValidate_TwoZonesWithZeroWeight() { - String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; - ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); - request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); - ActionRequestValidationException actionRequestValidationException = request.validate(); - assertNotNull(actionRequestValidationException); - assertTrue(actionRequestValidationException.getMessage().contains("More than one [2] value has weight set as " + "0")); - } - public void testValidate_MissingWeights() { String reqString = "{}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index 91b8703cacf5c..89d9555fe225b 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -295,6 +295,38 @@ public void testVerifyAwarenessAttribute_ValidAttributeName() { } } + public void testAddWeightedRoutingFailsWhenWeightsNotSetForAllDiscoveredZones() throws InterruptedException { + ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder( + client, + ClusterAddWeightedRoutingAction.INSTANCE + ); + Map<String, Double> weights = Map.of("zone_A", 1.0, "zone_C", 1.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(weightedRouting); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicReference<Exception> exceptionReference = new AtomicReference<>(); + ActionListener<ClusterStateUpdateResponse> listener = new ActionListener<ClusterStateUpdateResponse>() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionReference.set(e); + countDownLatch.countDown(); + } + }; + weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); + MatcherAssert.assertThat(exceptionReference.get(), instanceOf(UnsupportedWeightedRoutingStateException.class)); + MatcherAssert.assertThat( + exceptionReference.get().getMessage(), + containsString("weight for [zone_B] is not set and it is part of forced awareness value or a node has this attribute.") + ); + } + public void testAddWeightedRoutingFailsWhenDecommissionOngoing() throws InterruptedException { Map<String, Double> weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0); DecommissionStatus status = randomFrom(DecommissionStatus.INIT, DecommissionStatus.IN_PROGRESS, DecommissionStatus.SUCCESSFUL); @@ -327,8 +359,11 @@ public void onFailure(Exception e) { weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); - MatcherAssert.assertThat(exceptionReference.get(), instanceOf(IllegalStateException.class)); - MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("a decommission action is ongoing with status")); + MatcherAssert.assertThat(exceptionReference.get(), instanceOf(UnsupportedWeightedRoutingStateException.class)); + MatcherAssert.assertThat( + exceptionReference.get().getMessage(), + containsString("weight for [zone_A] must be set to [0.0] as it is under decommission action") + ); } public void testAddWeightedRoutingPassesWhenDecommissionFailed() throws InterruptedException { @@ -362,4 +397,36 @@ public void onFailure(Exception e) {} weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + + public void testAddWeightedRoutingPassesWhenWeightOfDecommissionedAttributeStillZero() throws InterruptedException { + Map<String, Double> weights = Map.of("zone_A", 0.0, "zone_B", 1.0, "zone_C", 1.0); + DecommissionStatus status = DecommissionStatus.SUCCESSFUL; + ClusterState state = clusterService.state(); + state = setWeightedRoutingWeights(state, weights); + state = setDecommissionAttribute(state, status); + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder( + client, + ClusterAddWeightedRoutingAction.INSTANCE + ); + Map<String, Double> updatedWeights = Map.of("zone_A", 0.0, "zone_B", 2.0, "zone_C", 1.0); + WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", updatedWeights); + request.setWeightedRouting(updatedWeightedRouting); + final CountDownLatch countDownLatch = new CountDownLatch(1); + ActionListener<ClusterStateUpdateResponse> listener = new ActionListener<ClusterStateUpdateResponse>() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + assertTrue(clusterStateUpdateResponse.isAcknowledged()); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) {} + }; + weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertEquals(updatedWeightedRouting, clusterService.state().metadata().weightedRoutingMetadata().getWeightedRouting()); + } }