From f6effce9d397754d8b972b6eefb5eba2dd921b3c Mon Sep 17 00:00:00 2001 From: Kyle Liberti Date: Fri, 9 Feb 2024 15:22:11 -0500 Subject: [PATCH] Set default replication factor for CC sample topics Signed-off-by: Kyle Liberti --- .../operator/cluster/model/CruiseControl.java | 20 +++-- .../cluster/model/CruiseControlTest.java | 88 ++++++++++++------- .../assembly/CruiseControlReconcilerTest.java | 6 +- .../CruiseControlConfigurationParameters.java | 5 ++ 4 files changed, 78 insertions(+), 41 deletions(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java index 18d4f9dfed4..7cf3e54f78f 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java @@ -211,7 +211,10 @@ public static CruiseControl fromCrd( } result.image = image; - result.updateConfiguration(ccSpec); + KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(reconciliation, kafkaClusterSpec.getConfig().entrySet()); + CruiseControlConfiguration cruiseControlConfiguration = new CruiseControlConfiguration(reconciliation, ccSpec.getConfig().entrySet()); + result.updateConfigurationWithDefaults(cruiseControlConfiguration, kafkaConfiguration); + CruiseControlConfiguration ccConfiguration = result.configuration; result.sslEnabled = ccConfiguration.isApiSslEnabled(); result.authEnabled = ccConfiguration.isApiAuthEnabled(); @@ -243,16 +246,17 @@ public static CruiseControl fromCrd( } } - private void updateConfiguration(CruiseControlSpec spec) { - CruiseControlConfiguration userConfiguration = new CruiseControlConfiguration(reconciliation, spec.getConfig().entrySet()); - for (Map.Entry defaultEntry : CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap().entrySet()) { - if (userConfiguration.getConfigOption(defaultEntry.getKey()) == null) { - userConfiguration.setConfigOption(defaultEntry.getKey(), defaultEntry.getValue()); + private void updateConfigurationWithDefaults(CruiseControlConfiguration cruiseControlConfiguration, KafkaConfiguration kafkaConfiguration) { + Map defaultCruiseControlProperties = new HashMap<>(CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap()); + defaultCruiseControlProperties.put(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR)); + for (Map.Entry defaultEntry : defaultCruiseControlProperties.entrySet()) { + if (cruiseControlConfiguration.getConfigOption(defaultEntry.getKey()) == null) { + cruiseControlConfiguration.setConfigOption(defaultEntry.getKey(), defaultEntry.getValue()); } } // Ensure that the configured anomaly.detection.goals are a sub-set of the default goals - checkGoals(userConfiguration); - this.configuration = userConfiguration; + checkGoals(cruiseControlConfiguration); + this.configuration = cruiseControlConfiguration; } /** diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java index 61cf3987f51..75422b3cdc5 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java @@ -71,11 +71,14 @@ import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterAll; +import java.io.IOException; +import java.io.StringReader; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import static io.strimzi.operator.cluster.model.CruiseControl.API_HEALTHCHECK_PATH; @@ -116,16 +119,16 @@ public class CruiseControlTest { private static final String IMAGE = "my-image:latest"; private static final int HEALTH_DELAY = 120; private static final int HEALTH_TIMEOUT = 30; + private static final String REPLICATION_FACTOR = "3"; private static final String MIN_INSYNC_REPLICAS = "2"; private static final String BROKER_CAPACITY_CPU = "6.0"; private static final String BROKER_CAPACITY_OVERRIDE_CPU = "2.0"; private static final String RESOURCE_LIMIT_CPU = "3.0"; private static final String RESOURCE_REQUESTS_CPU = "4.0"; - private final Map kafkaConfig = singletonMap(CruiseControl.MIN_INSYNC_REPLICAS, MIN_INSYNC_REPLICAS); - private final Map ccConfig = new HashMap<>() {{ - putAll(CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap()); - put("num.partition.metrics.windows", "2"); + private final Map kafkaConfig = new HashMap<>() {{ + put(CruiseControl.MIN_INSYNC_REPLICAS, MIN_INSYNC_REPLICAS); + put(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR, REPLICATION_FACTOR); }}; private final Storage kafkaStorage = new EphemeralStorage(); @@ -141,7 +144,6 @@ public class CruiseControlTest { private final CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder() .withImage(ccImage) - .withConfig(ccConfig) .withNewTemplate() .withNewPod() .withTmpDirSizeLimit("100Mi") @@ -229,17 +231,6 @@ private static boolean isJBOD(Object diskCapacity) { return diskCapacity instanceof JsonObject; } - public Kafka kafkaSpec(CruiseControlSpec cruiseControlSpec, ResourceRequirements resourceRequirements) { - return new KafkaBuilder() - .withNewSpec() - .withNewKafka() - .withResources(resourceRequirements) - .endKafka() - .withCruiseControl(cruiseControlSpec) - .endSpec() - .build(); - } - @ParallelTest public void testBrokerCapacities() { // Test user defined capacities @@ -272,6 +263,7 @@ public void testBrokerCapacities() { .editSpec() .editKafka() .withVersion(KafkaVersionTestUtils.DEFAULT_KAFKA_VERSION) + .withConfig(kafkaConfig) .withStorage(jbodStorage) .withResources(new ResourceRequirementsBuilder().withRequests(requests).withLimits(limits).build()) .endKafka() @@ -366,6 +358,7 @@ public void testBrokerCapacities() { .withVersion(KafkaVersionTestUtils.DEFAULT_KAFKA_VERSION) .withStorage(jbodStorage) .withResources(new ResourceRequirementsBuilder().withRequests(requests).withLimits(limits).build()) + .withConfig(kafkaConfig) .endKafka() .withCruiseControl(cruiseControlSpec) .endSpec() @@ -387,12 +380,6 @@ public void testBrokerCapacities() { @ParallelTest public void testBrokerCapacitiesWithPools() { - Kafka kafka = new KafkaBuilder(ResourceUtils.createKafka(NAMESPACE, CLUSTER, REPLICAS, IMAGE, HEALTH_DELAY, HEALTH_TIMEOUT)) - .editSpec() - .withCruiseControl(cruiseControlSpec) - .endSpec() - .build(); - Set nodes = Set.of( new NodeRef("foo-pool1-0", 0, "pool1", false, true), new NodeRef("foo-pool1-1", 1, "pool1", false, true), @@ -867,7 +854,17 @@ private void verifyBrokerCapacity(Set nodes, .withBrokerCapacity(brokerCapacity) .build(); - CruiseControl cc = CruiseControl.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafkaSpec(cruiseControlSpec, resourceRequirements), VERSIONS, nodes, storage, resources, SHARED_ENV_PROVIDER); + Kafka kafka = new KafkaBuilder() + .withNewSpec() + .withNewKafka() + .withResources(resourceRequirements) + .withConfig(kafkaConfig) + .endKafka() + .withCruiseControl(cruiseControlSpec) + .endSpec() + .build(); + + CruiseControl cc = CruiseControl.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafka, VERSIONS, nodes, storage, resources, SHARED_ENV_PROVIDER); ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); @@ -894,7 +891,7 @@ public void testApiSecurity(Boolean apiAuthEnabled, Boolean apiSslEnabled) { String e2Value = apiSslEnabled.toString(); EnvVar e2 = new EnvVar(e2Key, e2Value, null); - Map config = ccConfig; + Map config = new HashMap<>(); config.put(CruiseControlConfigurationParameters.WEBSERVER_SECURITY_ENABLE.getValue(), apiAuthEnabled); config.put(CruiseControlConfigurationParameters.WEBSERVER_SSL_ENABLE.getValue(), apiSslEnabled); @@ -950,7 +947,6 @@ public void testProbeConfiguration() { public void testSecurityContext() { CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder() .withImage(ccImage) - .withConfig(ccConfig) .withNewTemplate() .withNewPod() .withSecurityContext(new PodSecurityContextBuilder().withFsGroup(123L).withRunAsGroup(456L).withRunAsUser(789L).build()) @@ -1034,7 +1030,6 @@ public void testCruiseControlContainerSecurityContext() { CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder() .withImage(ccImage) - .withConfig(ccConfig) .withNewTemplate() .withNewCruiseControlContainer() .withSecurityContext(securityContext) @@ -1119,7 +1114,7 @@ public void testGoalsCheck() { String customGoals = "com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal," + "com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal"; - Map customGoalConfig = ccConfig; + Map customGoalConfig = new HashMap<>(); customGoalConfig.put(DEFAULT_GOALS_CONFIG_KEY.getValue(), customGoals); CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder() @@ -1176,12 +1171,9 @@ public void testDefaultTopicNames() { topicConfigs.put(CruiseControlConfigurationParameters.PARTITION_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_PARTITION_METRIC_TOPIC_NAME); topicConfigs.put(CruiseControlConfigurationParameters.BROKER_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_BROKER_METRIC_TOPIC_NAME); topicConfigs.put(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_METRIC_REPORTER_TOPIC_NAME); - Map customConfig = ccConfig; - customConfig.putAll(topicConfigs); CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder() .withImage(ccImage) - .withConfig(ccConfig) .build(); Kafka resource = createKafka(cruiseControlSpec); @@ -1195,7 +1187,7 @@ public void testCustomTopicNames() { topicConfigs.put(CruiseControlConfigurationParameters.PARTITION_METRIC_TOPIC_NAME.getValue(), "partition-topic"); topicConfigs.put(CruiseControlConfigurationParameters.BROKER_METRIC_TOPIC_NAME.getValue(), "broker-topic"); topicConfigs.put(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), "metric-reporter-topic"); - Map customConfig = ccConfig; + Map customConfig = new HashMap<>(); customConfig.putAll(topicConfigs); CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder() @@ -1208,6 +1200,40 @@ public void testCustomTopicNames() { topicConfigs.forEach((configParam, name) -> assertThat(cc.configuration.getConfiguration(), containsString(String.format("%s=%s", configParam, name)))); } + private Properties getCcProperties(Kafka resource) { + CruiseControl cc = createCruiseControl(resource); + ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); + return parsePropertiesString(configMap.getData().get(CruiseControl.SERVER_CONFIG_FILENAME)); + } + + private static Properties parsePropertiesString(String kafkaPropertiesString) { + Properties properties = new Properties(); + try (StringReader reader = new StringReader(kafkaPropertiesString)) { + properties.load(reader); + } catch (IOException e) { + e.printStackTrace(); + } + return properties; + } + + @ParallelTest + public void testSampleStoreTopicReplicationFactorConfig() { + // Test that the replication factor of Cruise Control's sample store topic is set to Kafka cluster's `default.replication.factor` + // when not explicitly set in Cruise Control config + Properties properties = getCcProperties(kafka); + assertThat(properties.getProperty(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue()), is(REPLICATION_FACTOR)); + + // Test that the replication factor of Cruise Control's sample store topic is set to value set in Cruise Control config + String replicationFactor = "1"; + CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder() + .withImage(ccImage) + .withConfig(Map.of(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), replicationFactor)) + .build(); + + properties = getCcProperties(createKafka(cruiseControlSpec)); + assertThat(properties.getProperty(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue()), is(replicationFactor)); + } + @AfterAll public static void cleanUp() { ResourceUtils.cleanUpTemporaryTLSFiles(); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java index 8d8babf5b4c..85bba65f3ef 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java @@ -28,6 +28,7 @@ import io.strimzi.operator.common.Annotations; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.model.PasswordGenerator; +import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters; import io.strimzi.operator.common.operator.MockCertManager; import io.strimzi.operator.common.operator.resource.ConfigMapOperator; import io.strimzi.operator.common.operator.resource.DeploymentOperator; @@ -67,7 +68,8 @@ public class CruiseControlReconcilerTest { new NodeRef(NAME + "kafka-2", 2, "kafka", false, true)); private final CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder() .withBrokerCapacity(new BrokerCapacityBuilder().withInboundNetwork("10000KB/s").withOutboundNetwork("10000KB/s").build()) - .withConfig(Map.of("hard.goals", "com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal")) + .withConfig(Map.of("hard.goals", "com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal", + CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), "3")) .build(); @Test @@ -152,7 +154,7 @@ public void reconcileEnabledCruiseControl(VertxTestContext context) { // Verify deployment assertThat(deployCaptor.getAllValues().size(), is(1)); assertThat(deployCaptor.getValue(), is(notNullValue())); - assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_SERVER_CONFIGURATION_HASH), is("f6dc41c7")); + assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_SERVER_CONFIGURATION_HASH), is("fc0ad847")); assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_CAPACITY_CONFIGURATION_HASH), is("1eb49220")); assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_AUTH_HASH), is("27ada64b")); diff --git a/operator-common/src/main/java/io/strimzi/operator/common/model/cruisecontrol/CruiseControlConfigurationParameters.java b/operator-common/src/main/java/io/strimzi/operator/common/model/cruisecontrol/CruiseControlConfigurationParameters.java index 7cb7d0cdb0b..cafa85ac01e 100644 --- a/operator-common/src/main/java/io/strimzi/operator/common/model/cruisecontrol/CruiseControlConfigurationParameters.java +++ b/operator-common/src/main/java/io/strimzi/operator/common/model/cruisecontrol/CruiseControlConfigurationParameters.java @@ -78,6 +78,11 @@ public enum CruiseControlConfigurationParameters { */ BROKER_METRIC_TOPIC_NAME("broker.metric.sample.store.topic"), + /** + * Replication factor of Kafka sample store topics + */ + SAMPLE_STORE_TOPIC_REPLICATION_FACTOR("sample.store.topic.replication.factor"), + /** * Metrics reporter topic */