Skip to content

Commit

Permalink
Handle the failure due to reaching the servlet capacity when getting …
Browse files Browse the repository at this point in the history
…user tasks

Signed-off-by: Gantigmaa Selenge <[email protected]>
  • Loading branch information
tinaselenge committed Oct 28, 2024
1 parent 23e5305 commit 1b0ff08
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -760,13 +760,24 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onPendingProposal(
.onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, true, host, apiClient, rebalanceOptionsBuilder))
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance proposal status failed", e.getCause());
p.fail(new CruiseControlRestException("Cruise Control getting rebalance proposal status failed"));
handleUserTaskStatusError(reconciliation, p, kafkaRebalance, conditions, e);
});
}
}
return p.future();
}

private void handleUserTaskStatusError(Reconciliation reconciliation, Promise<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> p, KafkaRebalance kafkaRebalance, Set<Condition> conditions, Throwable e) {
String servletFullErrorPattern = "There are already \\d+ active user tasks, which has reached the servlet capacity.";
if (e.getMessage().matches(".*" + servletFullErrorPattern + ".*")) {
LOGGER.warnCr(reconciliation, "The maximum number of active user tasks that can run concurrently has reached. You may consider adjusting CC's configuration, \"max.active.user.tasks\".");
configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName())
.onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions))));
} else {
p.fail(e);
}
}

private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseControlResponse cruiseControlResponse,
Promise<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> p, String sessionId,
Set<Condition> conditions, KafkaRebalance kafkaRebalance,
Expand Down Expand Up @@ -944,7 +955,7 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onRebalancing(Reco
.onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, false, host, apiClient, rebalanceOptionsBuilder))
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance task status failed", e);
p.fail(e);
handleUserTaskStatusError(reconciliation, p, kafkaRebalance, conditions, e);
});
}
return p.future();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

@ExtendWith(VertxExtension.class)
Expand Down Expand Up @@ -1125,6 +1126,50 @@ private void krRebalancingCompletedWithError(Vertx vertx, VertxTestContext conte
.onComplete(result -> checkOptimizationResults(result, context, true));
}

@Test
public void testGetUserTasksReturnError(Vertx vertx, VertxTestContext context) throws IOException, URISyntaxException {
KafkaRebalance kcRebalance = createKafkaRebalance(KafkaRebalanceState.Rebalancing, MockCruiseControl.REBALANCE_NO_GOALS_RESPONSE_UTID, null, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, null, false);
cruiseControlServer.setupCCUserTasksToReturnError(500, "Some error");

CruiseControlApi client = new CruiseControlApiImpl(vertx, HTTP_DEFAULT_IDLE_TIMEOUT_SECONDS, MockCruiseControl.CC_SECRET, MockCruiseControl.CC_API_SECRET, true, true);
KafkaRebalanceAssemblyOperator kcrao = new KafkaRebalanceAssemblyOperator(vertx, ResourceUtils.supplierWithMocks(true), ResourceUtils.dummyClusterOperatorConfig(), cruiseControlPort) {
@Override
public String cruiseControlHost(String clusterName, String clusterNamespace) {
return HOST;
}
};

Reconciliation recon = new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, CLUSTER_NAMESPACE, RESOURCE_NAME);

kcrao.computeNextStatus(recon, HOST, client, kcRebalance, KafkaRebalanceState.Rebalancing, null)
.onComplete(mapAndStatusAsyncResult -> {
assertTrue(mapAndStatusAsyncResult.failed());
context.completeNow();
});
}

@Test
public void testGetUserTasksForRebalancingReturnServletFullError(Vertx vertx, VertxTestContext context) throws IOException, URISyntaxException {
KafkaRebalance kcRebalance = createKafkaRebalance(KafkaRebalanceState.Rebalancing, MockCruiseControl.REBALANCE_NO_GOALS_RESPONSE_UTID, null, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, null, false);
cruiseControlServer.setupCCUserTasksToReturnError(500, "Error processing POST request '/remove_broker' due to: 'There are already 5 active user tasks, which has reached the servlet capacity'.");

checkTransition(vertx, context,
KafkaRebalanceState.Rebalancing, KafkaRebalanceState.Rebalancing,
kcRebalance)
.onComplete(result -> checkOptimizationResults(result, context, true));
}

@Test
public void testGetUserTasksForProposalReturnServletFullError(Vertx vertx, VertxTestContext context) throws IOException, URISyntaxException {
KafkaRebalance kcRebalance = createKafkaRebalance(KafkaRebalanceState.PendingProposal, MockCruiseControl.REBALANCE_NO_GOALS_RESPONSE_UTID, null, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, null, false);
cruiseControlServer.setupCCUserTasksToReturnError(500, "Error processing POST request '/remove_broker' due to: 'There are already 5 active user tasks, which has reached the servlet capacity'.");

checkTransition(vertx, context,
KafkaRebalanceState.PendingProposal, KafkaRebalanceState.PendingProposal,
kcRebalance)
.onComplete(result -> checkOptimizationResults(result, context, true));
}

/**
* Tests the transition from 'Stopped' to 'PendingProposal' when refresh
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,25 @@ public void setupCCUserTasksCompletedWithError() throws IOException, URISyntaxEx
.withDelay(TimeUnit.SECONDS, 0));
}

public void setupCCUserTasksToReturnError(int statusCode, String errorMessage) throws IOException, URISyntaxException {
// This simulates asking for the status of a task that has Complete with error and fetch_completed_task=true
JsonBody errorJson = new JsonBody("{\"errorMessage\":\"" + errorMessage + "\"}");
server
.when(
request()
.withMethod("GET")
.withQueryStringParameter(Parameter.param(CruiseControlParameters.JSON.toString(), "true"))
.withQueryStringParameter(Parameter.param(CruiseControlParameters.FETCH_COMPLETE.toString(), "true"))
.withPath(CruiseControlEndpoints.USER_TASKS.toString())
.withHeader(AUTH_HEADER)
.withSecure(true))
.respond(
response()
.withBody(errorJson)
.withStatusCode(statusCode)
.withDelay(TimeUnit.SECONDS, 0));
}

/**
* Setup response when user task is not found
*/
Expand Down

0 comments on commit 1b0ff08

Please sign in to comment.