From d400f1b43ad50f7ef94e3c490282a48ea2cd0631 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Mon, 18 Nov 2024 12:36:55 +0100 Subject: [PATCH 1/4] Improve min.insync.replicas parsing in warnTooLargeMinIsr When reconciling a replicas change, we check the target number of replicas against the configured minISR. In that case, a ClassCastException is raised if the user sets min.insync.replicas: "1" (quoted value). This patch aligns this configuration parsing to the rest of the code, where this is tolerated. Signed-off-by: Federico Valeri --- .../operator/topic/BatchingTopicController.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java index e1ff06b6ef2..efa89aa94d1 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java @@ -820,13 +820,15 @@ private void warnTooLargeMinIsr(List reconcilableTopics) { for (ReconcilableTopic reconcilableTopic : reconcilableTopics) { var topicConfig = reconcilableTopic.kt().getSpec().getConfig(); if (topicConfig != null) { - Integer topicMinIsr = (Integer) topicConfig.get(KafkaHandler.MIN_INSYNC_REPLICAS); - var minIsr = topicMinIsr != null ? topicMinIsr : clusterMinIsr.map(Integer::parseInt).orElse(1); + var topicMinIsr = topicConfig.get(KafkaHandler.MIN_INSYNC_REPLICAS); + var configuredMinIsr = topicMinIsr != null + ? Integer.parseInt(TopicOperatorUtil.configValueAsString(KafkaHandler.MIN_INSYNC_REPLICAS, topicMinIsr)) + : clusterMinIsr.map(Integer::parseInt).orElse(1); var targetRf = reconcilableTopic.kt().getSpec().getReplicas(); - if (targetRf < minIsr) { + if (targetRf < configuredMinIsr) { LOGGER.warnCr(reconcilableTopic.reconciliation(), "The target replication factor ({}) is below the configured {} ({})", - targetRf, KafkaHandler.MIN_INSYNC_REPLICAS, minIsr); + targetRf, KafkaHandler.MIN_INSYNC_REPLICAS, configuredMinIsr); } } } From 989f89e2220d40b93a0a6c81d4781878a58e3352 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Tue, 19 Nov 2024 12:44:51 +0100 Subject: [PATCH 2/4] Update RF change test to cover string value usage Signed-off-by: Federico Valeri --- .../strimzi/operator/topic/BatchingTopicController.java | 6 +++--- .../java/io/strimzi/operator/topic/KafkaHandler.java | 9 ++++++--- .../operator/topic/BatchingTopicControllerIT.java | 4 ++-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java index efa89aa94d1..5c17e23c6a6 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java @@ -684,11 +684,11 @@ private static Either partitionChanges( Reconciliation reconciliation, KafkaTopic kafkaTopic, int currentNumPartitions ) { var requested = kafkaTopic.getSpec() == null || kafkaTopic.getSpec().getPartitions() == null - ? KafkaHandler.BROKER_DEFAULT : kafkaTopic.getSpec().getPartitions(); + ? KafkaHandler.DEFAULT_PARTITIONS_REPLICAS : kafkaTopic.getSpec().getPartitions(); if (requested > currentNumPartitions) { LOGGER.debugCr(reconciliation, "Partition increase from {} to {}", currentNumPartitions, requested); return Either.ofRight(NewPartitions.increaseTo(requested)); - } else if (requested != KafkaHandler.BROKER_DEFAULT && requested < currentNumPartitions) { + } else if (requested != KafkaHandler.DEFAULT_PARTITIONS_REPLICAS && requested < currentNumPartitions) { LOGGER.debugCr(reconciliation, "Partition decrease from {} to {}", currentNumPartitions, requested); return Either.ofLeft(new TopicOperatorException.NotSupported("Decreasing partitions not supported")); } else { @@ -823,7 +823,7 @@ private void warnTooLargeMinIsr(List reconcilableTopics) { var topicMinIsr = topicConfig.get(KafkaHandler.MIN_INSYNC_REPLICAS); var configuredMinIsr = topicMinIsr != null ? Integer.parseInt(TopicOperatorUtil.configValueAsString(KafkaHandler.MIN_INSYNC_REPLICAS, topicMinIsr)) - : clusterMinIsr.map(Integer::parseInt).orElse(1); + : clusterMinIsr.map(Integer::parseInt).orElse(KafkaHandler.DEFAULT_MIN_ISR); var targetRf = reconcilableTopic.kt().getSpec().getReplicas(); if (targetRf < configuredMinIsr) { LOGGER.warnCr(reconcilableTopic.reconciliation(), diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/KafkaHandler.java b/topic-operator/src/main/java/io/strimzi/operator/topic/KafkaHandler.java index 74baccbe5e5..94ec42b9463 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/KafkaHandler.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/KafkaHandler.java @@ -50,13 +50,16 @@ */ public class KafkaHandler { static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaHandler.class); - - /** Default value for partitions and replicas. */ - public static final int BROKER_DEFAULT = -1; + /** Kafka configuration for auto create topic. */ public static final String AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable"; /** Kafka configuration for min insync replicas. */ public static final String MIN_INSYNC_REPLICAS = "min.insync.replicas"; + + /** Default value for partitions and replicas. */ + public static final int DEFAULT_PARTITIONS_REPLICAS = -1; + /** Default value for min.insync.replicas. */ + public static final int DEFAULT_MIN_ISR = 1; private final TopicOperatorConfig config; private final TopicOperatorMetricsHolder metricsHolder; diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java index 6a39d364961..c9e99106aec 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java @@ -267,8 +267,6 @@ public void shouldHandleInterruptedExceptionFromDeleteTopics() throws ExecutionE assertOnUpdateThrowsInterruptedException(adminSpy, withDeletionTimestamp); } - // TODO kube client interrupted exceptions - @ParameterizedTest @ValueSource(booleans = { true, false }) public void replicasChangeShouldBeReconciled(boolean cruiseControlEnabled) { @@ -310,6 +308,8 @@ public void replicasChangeShouldBeReconciled(boolean cruiseControlEnabled) { .addToLabels("key", "VALUE") .endMetadata() .withNewSpec() + // we also support string values + .withConfig(Map.of("min.insync.replicas", "1")) .withPartitions(25) .withReplicas(++replicationFactor) .endSpec() From 2bb068ec43bbf225861971d0effe45dc7e24ea45 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Tue, 19 Nov 2024 19:05:26 +0100 Subject: [PATCH 3/4] Fix for shouldUpdateTopicInKafkaWhenConfigRemovedInKube Signed-off-by: Federico Valeri --- .../java/io/strimzi/operator/topic/TopicControllerIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java index 3ec7a3b10e8..21115dd907f 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java @@ -994,7 +994,7 @@ public void shouldUpdateTopicInKafkaWhenConfigRemovedInKube() throws ExecutionEx return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + var expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.remove(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG); return expectedUpdatedConfigs; }); @@ -1550,7 +1550,7 @@ private KafkaTopic modifyTopicAndAwait(KafkaTopic kt, UnaryOperator @Override public boolean test(KafkaTopic theKt) { return theKt.getStatus() != null - && (theKt.getStatus().getObservedGeneration() >= postUpdateGeneration + && (theKt.getStatus().getObservedGeneration() == postUpdateGeneration || !TopicOperatorUtil.isManaged(theKt) || TopicOperatorUtil.isPaused(theKt)) && predicate.test(theKt); } From 742d20d5c9f52960e6e3920d183b335c0eefe4a3 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Wed, 20 Nov 2024 09:44:14 +0100 Subject: [PATCH 4/4] Rename default constant Signed-off-by: Federico Valeri --- .../io/strimzi/operator/topic/BatchingTopicController.java | 4 ++-- .../src/main/java/io/strimzi/operator/topic/KafkaHandler.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java index 5c17e23c6a6..f5856a0a3d9 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java @@ -684,11 +684,11 @@ private static Either partitionChanges( Reconciliation reconciliation, KafkaTopic kafkaTopic, int currentNumPartitions ) { var requested = kafkaTopic.getSpec() == null || kafkaTopic.getSpec().getPartitions() == null - ? KafkaHandler.DEFAULT_PARTITIONS_REPLICAS : kafkaTopic.getSpec().getPartitions(); + ? KafkaHandler.DEFAULT_PARTITIONS : kafkaTopic.getSpec().getPartitions(); if (requested > currentNumPartitions) { LOGGER.debugCr(reconciliation, "Partition increase from {} to {}", currentNumPartitions, requested); return Either.ofRight(NewPartitions.increaseTo(requested)); - } else if (requested != KafkaHandler.DEFAULT_PARTITIONS_REPLICAS && requested < currentNumPartitions) { + } else if (requested != KafkaHandler.DEFAULT_PARTITIONS && requested < currentNumPartitions) { LOGGER.debugCr(reconciliation, "Partition decrease from {} to {}", currentNumPartitions, requested); return Either.ofLeft(new TopicOperatorException.NotSupported("Decreasing partitions not supported")); } else { diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/KafkaHandler.java b/topic-operator/src/main/java/io/strimzi/operator/topic/KafkaHandler.java index 94ec42b9463..fc0a1917505 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/KafkaHandler.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/KafkaHandler.java @@ -56,8 +56,8 @@ public class KafkaHandler { /** Kafka configuration for min insync replicas. */ public static final String MIN_INSYNC_REPLICAS = "min.insync.replicas"; - /** Default value for partitions and replicas. */ - public static final int DEFAULT_PARTITIONS_REPLICAS = -1; + /** Default value for partitions. */ + public static final int DEFAULT_PARTITIONS = -1; /** Default value for min.insync.replicas. */ public static final int DEFAULT_MIN_ISR = 1;