Skip to content

Commit

Permalink
Move the error handling to the client from the operator
Browse files Browse the repository at this point in the history
Signed-off-by: Gantigmaa Selenge <[email protected]>
  • Loading branch information
tinaselenge committed Oct 29, 2024
1 parent 1b0ff08 commit 3dd12c9
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApi;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApiImpl;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlRebalanceResponse;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlResponse;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlRestException;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlUserTasksResponse;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.RebalanceOptions;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.RemoveBrokerOptions;
import io.strimzi.operator.cluster.operator.resource.kubernetes.AbstractWatchableStatusedNamespacedResourceOperator;
Expand Down Expand Up @@ -760,30 +760,27 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onPendingProposal(
.onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, true, host, apiClient, rebalanceOptionsBuilder))
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance proposal status failed", e.getCause());
handleUserTaskStatusError(reconciliation, p, kafkaRebalance, conditions, e);
p.fail(new CruiseControlRestException("Cruise Control getting rebalance proposal status failed"));
});
}
}
return p.future();
}

private void handleUserTaskStatusError(Reconciliation reconciliation, Promise<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> p, KafkaRebalance kafkaRebalance, Set<Condition> conditions, Throwable e) {
String servletFullErrorPattern = "There are already \\d+ active user tasks, which has reached the servlet capacity.";
if (e.getMessage().matches(".*" + servletFullErrorPattern + ".*")) {
LOGGER.warnCr(reconciliation, "The maximum number of active user tasks that can run concurrently has reached. You may consider adjusting CC's configuration, \"max.active.user.tasks\".");
configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName())
.onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions))));
} else {
p.fail(e);
}
}

private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseControlResponse cruiseControlResponse,
private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseControlUserTasksResponse cruiseControlResponse,
Promise<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> p, String sessionId,
Set<Condition> conditions, KafkaRebalance kafkaRebalance,
ConfigMapOperator configMapOperator, boolean dryRun,
String host, CruiseControlApi apiClient,
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder) {
if (cruiseControlResponse.isMaxActiveUserTasksReached()) {
LOGGER.warnCr(reconciliation, "The maximum number of active user tasks that can run concurrently has reached therefore will retry getting user tasks in the next reconciliation. " +
"If this occurs often, consider increasing the value for max.active.user.tasks configuration.");
configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName())
.onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions))));
return;
}

if (cruiseControlResponse.getJson().isEmpty()) {
// This may happen if:
// 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted
Expand Down Expand Up @@ -955,7 +952,7 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onRebalancing(Reco
.onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, false, host, apiClient, rebalanceOptionsBuilder))
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance task status failed", e);
handleUserTaskStatusError(reconciliation, p, kafkaRebalance, conditions, e);
p.fail(e);
});
}
return p.future();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public interface CruiseControlApi {
* This is used to retrieve the task's current state.
* @return A future for the state of the specified task.
*/
Future<CruiseControlResponse> getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskID);
Future<CruiseControlUserTasksResponse> getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskID);

/**
* Issue a stop command to the Cruise Control server. This will halt any task (e.g. a rebalance) which is currently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public Future<CruiseControlRebalanceResponse> removeBroker(Reconciliation reconc

@Override
@SuppressWarnings("deprecation")
public Future<CruiseControlResponse> getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskId) {
public Future<CruiseControlUserTasksResponse> getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskId) {

PathBuilder pathBuilder = new PathBuilder(CruiseControlEndpoints.USER_TASKS)
.withParameter(CruiseControlParameters.JSON, "true")
Expand Down Expand Up @@ -345,7 +345,7 @@ public Future<CruiseControlResponse> getUserTaskStatus(Reconciliation reconcilia
// This may happen if:
// 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted
// 2. Task's retention time expired, or the cache has become full
result.complete(new CruiseControlResponse(userTaskID, statusJson));
result.complete(new CruiseControlUserTasksResponse(userTaskID, statusJson));
} else {
JsonObject jsonUserTask = userTasks.getJsonObject(0);
String taskStatusStr = jsonUserTask.getString(STATUS_KEY);
Expand Down Expand Up @@ -386,7 +386,7 @@ public Future<CruiseControlResponse> getUserTaskStatus(Reconciliation reconcilia
default:
throw new IllegalStateException("Unexpected user task status: " + taskStatus);
}
result.complete(new CruiseControlResponse(userTaskID, statusJson));
result.complete(new CruiseControlUserTasksResponse(userTaskID, statusJson));
}
});
} else if (response.result().statusCode() == 500) {
Expand All @@ -400,8 +400,15 @@ public Future<CruiseControlResponse> getUserTaskStatus(Reconciliation reconcilia
} else {
errorString = json.toString();
}
result.fail(new CruiseControlRestException(
"Error for request: " + host + ":" + port + path + ". Server returned: " + errorString));
if (errorString.matches(".*" + "There are already \\d+ active user tasks, which has reached the servlet capacity." + ".*")) {
LOGGER.debugCr(reconciliation, errorString);
CruiseControlUserTasksResponse ccResponse = new CruiseControlUserTasksResponse(userTaskID, json);
ccResponse.setMaxActiveUserTasksReached(true);
result.complete(ccResponse);
} else {
result.fail(new CruiseControlRestException(
"Error for request: " + host + ":" + port + path + ". Server returned: " + errorString));
}
});
} else {
result.fail(new CruiseControlRestException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.operator.cluster.operator.resource.cruisecontrol;

import io.vertx.core.json.JsonObject;

/**
* Response to user tasks request
*/
public class CruiseControlUserTasksResponse extends CruiseControlResponse {
private boolean isMaxActiveUserTasksReached;

/**
* Constructor
*
* @param userTaskId User task ID
* @param json JSON data
*/
CruiseControlUserTasksResponse(String userTaskId, JsonObject json) {
super(userTaskId, json);
// The maximum number of active user tasks that can run concurrently has reached
// Sourced from the error message that contains "reached the servlet capacity" from the Cruise Control response
this.isMaxActiveUserTasksReached = false;
}

/**
* @return True If the maximum number of active user tasks that can run concurrently has reached
*/
public boolean isMaxActiveUserTasksReached() {
return isMaxActiveUserTasksReached;
}

protected void setMaxActiveUserTasksReached(boolean maxActiveUserTasksReached) {
this.isMaxActiveUserTasksReached = maxActiveUserTasksReached;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ public void testMockCCServerPendingCallsOverride(Vertx vertx, VertxTestContext c

cruiseControlServer.setupCCUserTasksResponseNoGoals(0, pendingCalls1);

Future<CruiseControlResponse> statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID);
Future<CruiseControlUserTasksResponse> statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID);

for (int i = 1; i <= pendingCalls1; i++) {
statusFuture = statusFuture.compose(response -> {
Expand Down Expand Up @@ -504,7 +504,7 @@ private void runTest(Vertx vertx, VertxTestContext context, String userTaskID, i

CruiseControlApi client = cruiseControlClientProvider(vertx);

Future<CruiseControlResponse> statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID);
Future<CruiseControlUserTasksResponse> statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID);

// One interaction is always expected at the end of the test, hence the +1
Checkpoint expectedInteractions = context.checkpoint(pendingCalls + 1);
Expand Down

0 comments on commit 3dd12c9

Please sign in to comment.