Skip to content

Commit

Permalink
Improve the role change reversal and properly revert them (#10098)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj authored May 13, 2024
1 parent 3905a2a commit e85aaf7
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private Future<KafkaAndNodePools> revertScaleDown(KafkaCluster kafka, Kafka kafk
newNodePools.add(
new KafkaNodePoolBuilder(nodePool)
.editSpec()
.withReplicas(newReplicasCount)
.withReplicas(newReplicasCount)
.endSpec()
.build());
} else {
Expand Down Expand Up @@ -281,12 +281,12 @@ private Future<KafkaAndNodePools> revertRoleChange(Kafka kafkaCr, List<KafkaNode
if (nodePool.getStatus() != null
&& nodePool.getStatus().getRoles().contains(ProcessRoles.BROKER)
&& !nodePool.getSpec().getRoles().contains(ProcessRoles.BROKER)) {
warningConditions.add(StatusUtils.buildWarningCondition("ScaleDownPreventionCheck", "Reverting role change of KafkaNodePool " + nodePool.getMetadata().getName() + " by adding the broker role to it"));
LOGGER.warnCr(reconciliation, "Reverting role change of KafkaNodePool {} by adding the broker role to it", nodePool.getMetadata().getName());
warningConditions.add(StatusUtils.buildWarningCondition("ScaleDownPreventionCheck", "Reverting role change of KafkaNodePool " + nodePool.getMetadata().getName() + " (setting roles to " + nodePool.getStatus().getRoles() + ")"));
LOGGER.warnCr(reconciliation, "Reverting role change of KafkaNodePool {} (setting roles to {})", nodePool.getMetadata().getName(), nodePool.getStatus().getRoles());
newNodePools.add(
new KafkaNodePoolBuilder(nodePool)
.editSpec()
.addToRoles(ProcessRoles.BROKER)
.withRoles(nodePool.getStatus().getRoles())
.endSpec()
.build());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,8 @@ public void testRevertRoleChangeWithKRaftMixedNodes(VertxTestContext context) {
assertThat(kc, is(notNullValue()));
assertThat(kc.nodes().size(), is(9));
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));
assertThat(kc.brokerNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));
assertThat(kc.controllerNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(3000, 3001, 3002)));
assertThat(kc.removedNodes(), is(Set.of()));
assertThat(kc.usedToBeBrokerNodes(), is(Set.of()));

Expand All @@ -856,7 +858,50 @@ public void testRevertRoleChangeWithKRaftMixedNodes(VertxTestContext context) {
assertThat(kafkaStatus.getConditions().get(0).getStatus(), is("True"));
assertThat(kafkaStatus.getConditions().get(0).getType(), is("Warning"));
assertThat(kafkaStatus.getConditions().get(0).getReason(), is("ScaleDownPreventionCheck"));
assertThat(kafkaStatus.getConditions().get(0).getMessage(), is("Reverting role change of KafkaNodePool pool-mixed by adding the broker role to it"));
assertThat(kafkaStatus.getConditions().get(0).getMessage(), is("Reverting role change of KafkaNodePool pool-mixed (setting roles to [CONTROLLER, BROKER])"));

// Scale-down reverted => should be called twice as we still scale down controllers after the revert is done
verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any());

async.flag();
})));
}

@Test
public void testRevertRoleChangeWithKRaftDedicatedNodes(VertxTestContext context) {
KafkaNodePool poolBFromBrokerToControllerOnly = new KafkaNodePoolBuilder(POOL_B_WITH_STATUS)
.editSpec()
.withRoles(ProcessRoles.CONTROLLER)
.endSpec()
.build();

ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false);

// Mock brokers-in-use check
BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck;
when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));

KafkaStatus kafkaStatus = new KafkaStatus();
KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier);

Checkpoint async = context.checkpoint();
creator.prepareKafkaCluster(KAFKA_WITH_KRAFT, List.of(POOL_MIXED_WITH_STATUS, POOL_A_WITH_STATUS, poolBFromBrokerToControllerOnly), Map.of(), null, VERSION_CHANGE, kafkaStatus, true)
.onComplete(context.succeeding(kc -> context.verify(() -> {
// Kafka cluster is created
assertThat(kc, is(notNullValue()));
assertThat(kc.nodes().size(), is(9));
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));
assertThat(kc.brokerNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));
assertThat(kc.controllerNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(3000, 3001, 3002)));
assertThat(kc.removedNodes(), is(Set.of()));
assertThat(kc.usedToBeBrokerNodes(), is(Set.of()));

// Check the status conditions
assertThat(kafkaStatus.getConditions().size(), is(1));
assertThat(kafkaStatus.getConditions().get(0).getStatus(), is("True"));
assertThat(kafkaStatus.getConditions().get(0).getType(), is("Warning"));
assertThat(kafkaStatus.getConditions().get(0).getReason(), is("ScaleDownPreventionCheck"));
assertThat(kafkaStatus.getConditions().get(0).getMessage(), is("Reverting role change of KafkaNodePool pool-b (setting roles to [BROKER])"));

// Scale-down reverted => should be called twice as we still scale down controllers after the revert is done
verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any());
Expand Down

0 comments on commit e85aaf7

Please sign in to comment.