From 3dd12c9c847ac18f16408e375b2fff59a9346e4c Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Tue, 29 Oct 2024 12:53:03 +0000 Subject: [PATCH] Move the error handling to the client from the operator Signed-off-by: Gantigmaa Selenge --- .../KafkaRebalanceAssemblyOperator.java | 27 ++++++------- .../cruisecontrol/CruiseControlApi.java | 2 +- .../cruisecontrol/CruiseControlApiImpl.java | 17 ++++++--- .../CruiseControlUserTasksResponse.java | 38 +++++++++++++++++++ .../CruiseControlClientTest.java | 4 +- 5 files changed, 65 insertions(+), 23 deletions(-) create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlUserTasksResponse.java diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java index 0a30ff70d0b..241fbcc6a61 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java @@ -34,8 +34,8 @@ import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApi; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApiImpl; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlRebalanceResponse; -import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlResponse; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlRestException; +import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlUserTasksResponse; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.RebalanceOptions; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.RemoveBrokerOptions; import io.strimzi.operator.cluster.operator.resource.kubernetes.AbstractWatchableStatusedNamespacedResourceOperator; @@ -760,30 +760,27 @@ private Future> onPendingProposal( .onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, true, host, apiClient, rebalanceOptionsBuilder)) .onFailure(e -> { LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance proposal status failed", e.getCause()); - handleUserTaskStatusError(reconciliation, p, kafkaRebalance, conditions, e); + p.fail(new CruiseControlRestException("Cruise Control getting rebalance proposal status failed")); }); } } return p.future(); } - private void handleUserTaskStatusError(Reconciliation reconciliation, Promise> p, KafkaRebalance kafkaRebalance, Set conditions, Throwable e) { - String servletFullErrorPattern = "There are already \\d+ active user tasks, which has reached the servlet capacity."; - if (e.getMessage().matches(".*" + servletFullErrorPattern + ".*")) { - LOGGER.warnCr(reconciliation, "The maximum number of active user tasks that can run concurrently has reached. You may consider adjusting CC's configuration, \"max.active.user.tasks\"."); - configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()) - .onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions)))); - } else { - p.fail(e); - } - } - - private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseControlResponse cruiseControlResponse, + private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseControlUserTasksResponse cruiseControlResponse, Promise> p, String sessionId, Set conditions, KafkaRebalance kafkaRebalance, ConfigMapOperator configMapOperator, boolean dryRun, String host, CruiseControlApi apiClient, AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder rebalanceOptionsBuilder) { + if (cruiseControlResponse.isMaxActiveUserTasksReached()) { + LOGGER.warnCr(reconciliation, "The maximum number of active user tasks that can run concurrently has reached therefore will retry getting user tasks in the next reconciliation. " + + "If this occurs often, consider increasing the value for max.active.user.tasks configuration."); + configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()) + .onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions)))); + return; + } + if (cruiseControlResponse.getJson().isEmpty()) { // This may happen if: // 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted @@ -955,7 +952,7 @@ private Future> onRebalancing(Reco .onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, false, host, apiClient, rebalanceOptionsBuilder)) .onFailure(e -> { LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance task status failed", e); - handleUserTaskStatusError(reconciliation, p, kafkaRebalance, conditions, e); + p.fail(e); }); } return p.future(); diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApi.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApi.java index d0ccf205616..383580ac30f 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApi.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApi.java @@ -87,7 +87,7 @@ public interface CruiseControlApi { * This is used to retrieve the task's current state. * @return A future for the state of the specified task. */ - Future getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskID); + Future getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskID); /** * Issue a stop command to the Cruise Control server. This will halt any task (e.g. a rebalance) which is currently diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApiImpl.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApiImpl.java index 9063f8cc88c..59378aa6853 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApiImpl.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApiImpl.java @@ -310,7 +310,7 @@ public Future removeBroker(Reconciliation reconc @Override @SuppressWarnings("deprecation") - public Future getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskId) { + public Future getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskId) { PathBuilder pathBuilder = new PathBuilder(CruiseControlEndpoints.USER_TASKS) .withParameter(CruiseControlParameters.JSON, "true") @@ -345,7 +345,7 @@ public Future getUserTaskStatus(Reconciliation reconcilia // This may happen if: // 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted // 2. Task's retention time expired, or the cache has become full - result.complete(new CruiseControlResponse(userTaskID, statusJson)); + result.complete(new CruiseControlUserTasksResponse(userTaskID, statusJson)); } else { JsonObject jsonUserTask = userTasks.getJsonObject(0); String taskStatusStr = jsonUserTask.getString(STATUS_KEY); @@ -386,7 +386,7 @@ public Future getUserTaskStatus(Reconciliation reconcilia default: throw new IllegalStateException("Unexpected user task status: " + taskStatus); } - result.complete(new CruiseControlResponse(userTaskID, statusJson)); + result.complete(new CruiseControlUserTasksResponse(userTaskID, statusJson)); } }); } else if (response.result().statusCode() == 500) { @@ -400,8 +400,15 @@ public Future getUserTaskStatus(Reconciliation reconcilia } else { errorString = json.toString(); } - result.fail(new CruiseControlRestException( - "Error for request: " + host + ":" + port + path + ". Server returned: " + errorString)); + if (errorString.matches(".*" + "There are already \\d+ active user tasks, which has reached the servlet capacity." + ".*")) { + LOGGER.debugCr(reconciliation, errorString); + CruiseControlUserTasksResponse ccResponse = new CruiseControlUserTasksResponse(userTaskID, json); + ccResponse.setMaxActiveUserTasksReached(true); + result.complete(ccResponse); + } else { + result.fail(new CruiseControlRestException( + "Error for request: " + host + ":" + port + path + ". Server returned: " + errorString)); + } }); } else { result.fail(new CruiseControlRestException( diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlUserTasksResponse.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlUserTasksResponse.java new file mode 100644 index 00000000000..a2c6d3e0fdc --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlUserTasksResponse.java @@ -0,0 +1,38 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.operator.resource.cruisecontrol; + +import io.vertx.core.json.JsonObject; + +/** + * Response to user tasks request + */ +public class CruiseControlUserTasksResponse extends CruiseControlResponse { + private boolean isMaxActiveUserTasksReached; + + /** + * Constructor + * + * @param userTaskId User task ID + * @param json JSON data + */ + CruiseControlUserTasksResponse(String userTaskId, JsonObject json) { + super(userTaskId, json); + // The maximum number of active user tasks that can run concurrently has reached + // Sourced from the error message that contains "reached the servlet capacity" from the Cruise Control response + this.isMaxActiveUserTasksReached = false; + } + + /** + * @return True If the maximum number of active user tasks that can run concurrently has reached + */ + public boolean isMaxActiveUserTasksReached() { + return isMaxActiveUserTasksReached; + } + + protected void setMaxActiveUserTasksReached(boolean maxActiveUserTasksReached) { + this.isMaxActiveUserTasksReached = maxActiveUserTasksReached; + } +} diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlClientTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlClientTest.java index 9c58baa4e14..e5f2a87a591 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlClientTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlClientTest.java @@ -446,7 +446,7 @@ public void testMockCCServerPendingCallsOverride(Vertx vertx, VertxTestContext c cruiseControlServer.setupCCUserTasksResponseNoGoals(0, pendingCalls1); - Future statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID); + Future statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID); for (int i = 1; i <= pendingCalls1; i++) { statusFuture = statusFuture.compose(response -> { @@ -504,7 +504,7 @@ private void runTest(Vertx vertx, VertxTestContext context, String userTaskID, i CruiseControlApi client = cruiseControlClientProvider(vertx); - Future statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID); + Future statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID); // One interaction is always expected at the end of the test, hence the +1 Checkpoint expectedInteractions = context.checkpoint(pendingCalls + 1);