diff --git a/.azure/templates/jobs/system-tests/feature_gates_regression_jobs.yaml b/.azure/templates/jobs/system-tests/feature_gates_regression_jobs.yaml index 4bb4bbac137..9ba4db1e80d 100644 --- a/.azure/templates/jobs/system-tests/feature_gates_regression_jobs.yaml +++ b/.azure/templates/jobs/system-tests/feature_gates_regression_jobs.yaml @@ -5,7 +5,7 @@ jobs: display_name: 'feature-gates-regression-bundle I. - kafka + oauth' profile: 'azp_kafka_oauth' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -16,7 +16,7 @@ jobs: display_name: 'feature-gates-regression-bundle II. - security' profile: 'azp_security' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -27,7 +27,7 @@ jobs: display_name: 'feature-gates-regression-bundle III. - dynconfig + tracing + watcher' profile: 'azp_dynconfig_listeners_tracing_watcher' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -38,7 +38,7 @@ jobs: display_name: 'feature-gates-regression-bundle IV. - operators' profile: 'azp_operators' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -49,7 +49,7 @@ jobs: display_name: 'feature-gates-regression-bundle V. - rollingupdate' profile: 'azp_rolling_update_bridge' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -60,7 +60,7 @@ jobs: display_name: 'feature-gates-regression-bundle VI. - connect + mirrormaker' profile: 'azp_connect_mirrormaker' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -71,7 +71,7 @@ jobs: display_name: 'feature-gates-regression-bundle VII. - remaining system tests' profile: 'azp_remaining' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' diff --git a/.azure/templates/jobs/system-tests/feature_gates_regression_namespace_rbac_jobs.yaml b/.azure/templates/jobs/system-tests/feature_gates_regression_namespace_rbac_jobs.yaml index 8789f6ed179..9ecea8658c9 100644 --- a/.azure/templates/jobs/system-tests/feature_gates_regression_namespace_rbac_jobs.yaml +++ b/.azure/templates/jobs/system-tests/feature_gates_regression_namespace_rbac_jobs.yaml @@ -8,7 +8,7 @@ jobs: cluster_operator_install_type: 'bundle' timeout: 360 strimzi_rbac_scope: NAMESPACE - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -21,7 +21,7 @@ jobs: cluster_operator_install_type: 'bundle' timeout: 360 strimzi_rbac_scope: NAMESPACE - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -34,7 +34,7 @@ jobs: cluster_operator_install_type: 'bundle' timeout: 360 strimzi_rbac_scope: NAMESPACE - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -47,7 +47,7 @@ jobs: cluster_operator_install_type: 'bundle' timeout: 360 strimzi_rbac_scope: NAMESPACE - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -60,7 +60,7 @@ jobs: cluster_operator_install_type: 'bundle' timeout: 360 strimzi_rbac_scope: NAMESPACE - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -73,7 +73,7 @@ jobs: cluster_operator_install_type: 'bundle' timeout: 360 strimzi_rbac_scope: NAMESPACE - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -86,6 +86,6 @@ jobs: cluster_operator_install_type: 'bundle' timeout: 360 strimzi_rbac_scope: NAMESPACE - strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator' + strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft' releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' diff --git a/.azure/templates/jobs/system-tests/kraft_regression_jobs.yaml b/.azure/templates/jobs/system-tests/kraft_regression_jobs.yaml index 30397da16b5..2f19f1a6e67 100644 --- a/.azure/templates/jobs/system-tests/kraft_regression_jobs.yaml +++ b/.azure/templates/jobs/system-tests/kraft_regression_jobs.yaml @@ -5,7 +5,7 @@ jobs: display_name: 'kraft-regression-bundle I. - kafka + oauth' profile: 'azp_kafka_oauth' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '+UseKRaft' + strimzi_feature_gates: '' timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -16,7 +16,8 @@ jobs: display_name: 'kraft-regression-bundle II. - security' profile: 'azp_security' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '+UseKRaft' + strimzi_feature_gates: '' + strimzi_use_kraft_in_tests: "true" timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -27,7 +28,7 @@ jobs: display_name: 'kraft-regression-bundle III. - dynconfig + tracing + watcher' profile: 'azp_dynconfig_listeners_tracing_watcher' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '+UseKRaft' + strimzi_feature_gates: '' timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -38,7 +39,7 @@ jobs: display_name: 'kraft-regression-bundle IV. - operators' profile: 'azp_operators' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '+UseKRaft' + strimzi_feature_gates: '' timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -49,7 +50,7 @@ jobs: display_name: 'kraft-regression-bundle V. - rollingupdate' profile: 'azp_rolling_update_bridge' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '+UseKRaft' + strimzi_feature_gates: '' timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -60,7 +61,7 @@ jobs: display_name: 'kraft-regression-bundle VI. - connect + mirrormaker' profile: 'azp_connect_mirrormaker' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '+UseKRaft' + strimzi_feature_gates: '' timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' @@ -71,7 +72,7 @@ jobs: display_name: 'kraft-regression-bundle VII. - remaining system tests' profile: 'azp_remaining' cluster_operator_install_type: 'bundle' - strimzi_feature_gates: '+UseKRaft' + strimzi_feature_gates: '' timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' diff --git a/.azure/templates/steps/system_test_general.yaml b/.azure/templates/steps/system_test_general.yaml index b5e3f963a82..941fac8aa8c 100644 --- a/.azure/templates/steps/system_test_general.yaml +++ b/.azure/templates/steps/system_test_general.yaml @@ -12,6 +12,7 @@ parameters: parallel: '1' run_parallel: false releaseVersion: "latest" + strimzi_use_kraft_in_tests: "false" jobs: - job: '${{ parameters.name }}_system_tests' @@ -137,6 +138,7 @@ jobs: DOCKER_REGISTRY: registry.minikube CLUSTER_OPERATOR_INSTALL_TYPE: '${{ parameters.cluster_operator_install_type }}' STRIMZI_FEATURE_GATES: '${{ parameters.strimzi_feature_gates }}' + STRIMZI_USE_KRAFT_IN_TESTS: '${{ parameters.strimzi_use_kraft_in_tests }}' BUILD_ID: $(Agent.JobName) RESOURCE_ALLOCATION_STRATEGY: "NOT_SHARED" TEST_LOG_DIR: $(test_log_dir) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4eceef3f6fc..f9b8600e3cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ ## 0.40.0 * Remove support for Apache Kafka 3.5.0 and 3.5.1 +* The `UseKRaft` feature gate moves to beta stage and is enabled by default. + If needed, `UseKRaft` can be disabled in the feature gates configuration in the Cluster Operator. * Fix NullPointerException from missing listenerConfig when using custom auth * Added support for Kafka Exporter `offset.show-all` parameter diff --git a/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaClusterSpec.java b/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaClusterSpec.java index a0d49ae1b01..c72e1159454 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaClusterSpec.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaClusterSpec.java @@ -139,8 +139,8 @@ public void setRack(Rack rack) { this.rack = rack; } - @Description("Storage configuration (disk). Cannot be updated.") - @JsonProperty(required = true) + @Description("Storage configuration (disk). Cannot be updated. " + + "This property is required when node pools are not used.") public Storage getStorage() { return storage; } @@ -161,9 +161,9 @@ public void setLogging(Logging logging) { this.logging = logging; } - @Description("The number of pods in the cluster.") + @Description("The number of pods in the cluster. " + + "This property is required when node pools are not used.") @Minimum(1) - @JsonProperty(required = true) public int getReplicas() { return replicas; } diff --git a/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaSpec.java b/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaSpec.java index f339684998a..f723970bdce 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaSpec.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaSpec.java @@ -62,8 +62,7 @@ public void setKafka(KafkaClusterSpec kafka) { this.kafka = kafka; } - @Description("Configuration of the ZooKeeper cluster") - @JsonProperty(required = true) + @Description("Configuration of the ZooKeeper cluster. This section is required when running a ZooKeeper-based Apache Kafka cluster.") public ZookeeperClusterSpec getZookeeper() { return zookeeper; } diff --git a/api/src/test/java/io/strimzi/api/kafka/model/kafka/KafkaCrdIT.java b/api/src/test/java/io/strimzi/api/kafka/model/kafka/KafkaCrdIT.java index 48d08ef5631..07bf4d54bc9 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/kafka/KafkaCrdIT.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/kafka/KafkaCrdIT.java @@ -57,7 +57,7 @@ void testKafkaWithMissingRequired() { KubeClusterException.class, () -> createDeleteCustomResource("Kafka-with-missing-required-property.yaml")); - assertMissingRequiredPropertiesMessage(exception.getMessage(), "zookeeper", "kafka"); + assertMissingRequiredPropertiesMessage(exception.getMessage(), "kafka"); } @Test diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/FeatureGates.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/FeatureGates.java index 381a9b0012d..017fddcdf54 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/FeatureGates.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/FeatureGates.java @@ -21,7 +21,7 @@ public class FeatureGates { private static final String UNIDIRECTIONAL_TOPIC_OPERATOR = "UnidirectionalTopicOperator"; // When adding new feature gates, do not forget to add them to allFeatureGates() and toString() methods - private final FeatureGate useKRaft = new FeatureGate(USE_KRAFT, false); + private final FeatureGate useKRaft = new FeatureGate(USE_KRAFT, true); private final FeatureGate kafkaNodePools = new FeatureGate(KAFKA_NODE_POOLS, true); private final FeatureGate unidirectionalTopicOperator = new FeatureGate(UNIDIRECTIONAL_TOPIC_OPERATOR, true); diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KRaftUtils.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KRaftUtils.java index feee481f63d..deca45d9a4c 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KRaftUtils.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KRaftUtils.java @@ -70,4 +70,40 @@ public static void validateMetadataVersion(String metadataVersion) { throw new InvalidResourceException("Metadata version " + metadataVersion + " is invalid", e); } } + + /** + * In ZooKeeper mode, some of the fields marked as not required (because they are not used in KRaft) are in fact + * required. This method validates that the fields are present and in case they are missing, it throws an exception. + * + * @param kafkaSpec The .spec section of the Kafka CR which should be checked + * @param nodePoolsEnabled Flag indicating whether Node Pools are enabled or not + */ + public static void validateKafkaCrForZooKeeper(KafkaSpec kafkaSpec, boolean nodePoolsEnabled) { + Set errors = new HashSet<>(0); + + if (kafkaSpec != null) { + if (kafkaSpec.getZookeeper() == null) { + errors.add("The .spec.zookeeper section of the Kafka custom resource is missing. " + + "This section is required for a ZooKeeper-based cluster."); + } + + if (!nodePoolsEnabled) { + if (kafkaSpec.getKafka().getReplicas() == 0) { + errors.add("The .spec.kafka.replicas property of the Kafka custom resource is missing. " + + "This property is required for a ZooKeeper-based Kafka cluster that is not using Node Pools."); + } + + if (kafkaSpec.getKafka().getStorage() == null) { + errors.add("The .spec.kafka.storage section of the Kafka custom resource is missing. " + + "This section is required for a ZooKeeper-based Kafka cluster that is not using Node Pools."); + } + } + } else { + errors.add("The .spec section of the Kafka custom resource is missing"); + } + + if (!errors.isEmpty()) { + throw new InvalidResourceException("Kafka configuration is not valid: " + errors); + } + } } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java index 0e473834e0d..f735f8f2499 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java @@ -209,11 +209,11 @@ Future reconcile(ReconciliationState reconcileState) { Promise chainPromise = Promise.promise(); boolean isKRaftEnabled = featureGates.useKRaftEnabled() && ReconcilerUtils.kraftEnabled(reconcileState.kafkaAssembly); + boolean nodePoolsEnabled = featureGates.kafkaNodePoolsEnabled() && ReconcilerUtils.nodePoolsEnabled(reconcileState.kafkaAssembly); if (isKRaftEnabled) { // Makes sure KRaft is used only with KafkaNodePool custom resources and not with virtual node pools - if (featureGates.kafkaNodePoolsEnabled() - && !ReconcilerUtils.nodePoolsEnabled(reconcileState.kafkaAssembly)) { + if (!nodePoolsEnabled) { throw new InvalidConfigurationException("The UseKRaft feature gate can be used only together with a Kafka cluster based on the KafkaNodePool resources."); } @@ -223,6 +223,13 @@ Future reconcile(ReconciliationState reconcileState) { } catch (InvalidResourceException e) { return Future.failedFuture(e); } + } else { + // Validates the properties required for a ZooKeeper based Kafka cluster + try { + KRaftUtils.validateKafkaCrForZooKeeper(reconcileState.kafkaAssembly.getSpec(), nodePoolsEnabled); + } catch (InvalidResourceException e) { + return Future.failedFuture(e); + } } reconcileState.initialStatus() diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorConfigTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorConfigTest.java index 9802c149264..fef68a153df 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorConfigTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorConfigTest.java @@ -43,7 +43,7 @@ public class ClusterOperatorConfigTest { ENV_VARS.put(ClusterOperatorConfig.STRIMZI_KAFKA_MIRROR_MAKER_IMAGES, KafkaVersionTestUtils.getKafkaMirrorMakerImagesEnvVarString()); ENV_VARS.put(ClusterOperatorConfig.STRIMZI_KAFKA_MIRROR_MAKER_2_IMAGES, KafkaVersionTestUtils.getKafkaMirrorMaker2ImagesEnvVarString()); ENV_VARS.put(ClusterOperatorConfig.OPERATOR_NAMESPACE.key(), "operator-namespace"); - ENV_VARS.put(ClusterOperatorConfig.FEATURE_GATES.key(), "-KafkaNodePools"); + ENV_VARS.put(ClusterOperatorConfig.FEATURE_GATES.key(), "-UseKRaft"); ENV_VARS.put(ClusterOperatorConfig.DNS_CACHE_TTL.key(), "10"); ENV_VARS.put(ClusterOperatorConfig.POD_SECURITY_PROVIDER_CLASS.key(), "my.package.CustomPodSecurityProvider"); } @@ -66,7 +66,7 @@ public void testDefaultConfig() { assertThat(config.getOperatorNamespace(), is("operator-namespace")); assertThat(config.getOperatorNamespaceLabels(), is(nullValue())); assertThat(config.featureGates().kafkaNodePoolsEnabled(), is(true)); - assertThat(config.featureGates().useKRaftEnabled(), is(false)); + assertThat(config.featureGates().useKRaftEnabled(), is(true)); assertThat(config.isCreateClusterRoles(), is(false)); assertThat(config.isNetworkPolicyGeneration(), is(true)); assertThat(config.isPodSetReconciliationOnly(), is(false)); @@ -101,7 +101,7 @@ public void testEnvVars() { assertThat(config.getOperationTimeoutMs(), is(30_000L)); assertThat(config.getConnectBuildTimeoutMs(), is(40_000L)); assertThat(config.getOperatorNamespace(), is("operator-namespace")); - assertThat(config.featureGates().kafkaNodePoolsEnabled(), is(false)); + assertThat(config.featureGates().useKRaftEnabled(), is(false)); assertThat(config.getDnsCacheTtlSec(), is(10)); assertThat(config.getPodSecurityProviderClass(), is("my.package.CustomPodSecurityProvider")); } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorTest.java index 65226ae616a..83c6da7dc17 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorTest.java @@ -65,7 +65,7 @@ private static Map buildEnv(String namespaces, boolean podSetsOn env.put(ClusterOperatorConfig.STRIMZI_KAFKA_CONNECT_IMAGES, KafkaVersionTestUtils.getKafkaConnectImagesEnvVarString()); env.put(ClusterOperatorConfig.STRIMZI_KAFKA_MIRROR_MAKER_IMAGES, KafkaVersionTestUtils.getKafkaMirrorMakerImagesEnvVarString()); env.put(ClusterOperatorConfig.STRIMZI_KAFKA_MIRROR_MAKER_2_IMAGES, KafkaVersionTestUtils.getKafkaMirrorMaker2ImagesEnvVarString()); - env.put(ClusterOperatorConfig.FEATURE_GATES.key(), "-KafkaNodePools"); + env.put(ClusterOperatorConfig.FEATURE_GATES.key(), "-UseKRaft"); if (podSetsOnly) { env.put(ClusterOperatorConfig.POD_SET_RECONCILIATION_ONLY.key(), "true"); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/FeatureGatesTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/FeatureGatesTest.java index a3bffc53698..41a9ead335b 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/FeatureGatesTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/FeatureGatesTest.java @@ -22,7 +22,14 @@ public class FeatureGatesTest { public void testIndividualFeatureGates() { for (FeatureGates.FeatureGate gate : FeatureGates.NONE.allFeatureGates()) { FeatureGates enabled = new FeatureGates("+" + gate.getName()); - FeatureGates disabled = new FeatureGates("-" + gate.getName()); + FeatureGates disabled; + + // KafkaNodePools FG can be disabled only together with UseKRaft FG + if (gate.getName().equals("KafkaNodePools")) { + disabled = new FeatureGates("-" + gate.getName() + ",-UseKRaft"); + } else { + disabled = new FeatureGates("-" + gate.getName()); + } assertThat(enabled.allFeatureGates().stream().filter(g -> gate.getName().equals(g.getName())).findFirst().orElseThrow().isEnabled(), is(true)); assertThat(disabled.allFeatureGates().stream().filter(g -> gate.getName().equals(g.getName())).findFirst().orElseThrow().isEnabled(), is(false)); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KRaftUtilsTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KRaftUtilsTest.java index b7a74f116f7..a618db34add 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KRaftUtilsTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KRaftUtilsTest.java @@ -166,4 +166,112 @@ public void testKRaftMetadataVersionValidation() { e = assertThrows(InvalidResourceException.class, () -> KRaftUtils.validateMetadataVersion("3.2-IV0")); assertThat(e.getMessage(), containsString("The oldest supported metadata version is 3.3-IV0")); } + + @ParallelTest + public void testValidZooBasedCluster() { + KafkaSpec spec = new KafkaSpecBuilder() + .withNewZookeeper() + .withReplicas(3) + .withNewEphemeralStorage() + .endEphemeralStorage() + .endZookeeper() + .withNewKafka() + .withReplicas(3) + .withListeners(new GenericKafkaListenerBuilder() + .withName("listener") + .withPort(9092) + .withTls(true) + .withType(KafkaListenerType.INTERNAL) + .withNewKafkaListenerAuthenticationTlsAuth() + .endKafkaListenerAuthenticationTlsAuth() + .build()) + .withNewEphemeralStorage() + .endEphemeralStorage() + .withNewKafkaAuthorizationOpa() + .withUrl("http://opa:8080") + .endKafkaAuthorizationOpa() + .endKafka() + .build(); + + assertDoesNotThrow(() -> KRaftUtils.validateKafkaCrForZooKeeper(spec, false)); + } + + @ParallelTest + public void testValidZooBasedClusterWithNodePools() { + KafkaSpec spec = new KafkaSpecBuilder() + .withNewZookeeper() + .withReplicas(3) + .withNewEphemeralStorage() + .endEphemeralStorage() + .endZookeeper() + .withNewKafka() + .withListeners(new GenericKafkaListenerBuilder() + .withName("listener") + .withPort(9092) + .withTls(true) + .withType(KafkaListenerType.INTERNAL) + .withNewKafkaListenerAuthenticationTlsAuth() + .endKafkaListenerAuthenticationTlsAuth() + .build()) + .withNewKafkaAuthorizationOpa() + .withUrl("http://opa:8080") + .endKafkaAuthorizationOpa() + .endKafka() + .build(); + + assertDoesNotThrow(() -> KRaftUtils.validateKafkaCrForZooKeeper(spec, true)); + } + + @ParallelTest + public void testZooBasedClusterWithMissingZooSection() { + KafkaSpec spec = new KafkaSpecBuilder() + .withNewKafka() + .withReplicas(3) + .withListeners(new GenericKafkaListenerBuilder() + .withName("listener") + .withPort(9092) + .withTls(true) + .withType(KafkaListenerType.INTERNAL) + .withNewKafkaListenerAuthenticationTlsAuth() + .endKafkaListenerAuthenticationTlsAuth() + .build()) + .withNewEphemeralStorage() + .endEphemeralStorage() + .withNewKafkaAuthorizationOpa() + .withUrl("http://opa:8080") + .endKafkaAuthorizationOpa() + .endKafka() + .build(); + + InvalidResourceException e = assertThrows(InvalidResourceException.class, () -> KRaftUtils.validateKafkaCrForZooKeeper(spec, false)); + assertThat(e.getMessage(), containsString("The .spec.zookeeper section of the Kafka custom resource is missing. This section is required for a ZooKeeper-based cluster.")); + } + + @ParallelTest + public void testZooBasedClusterWithMissingReplicasAndStorage() { + KafkaSpec spec = new KafkaSpecBuilder() + .withNewZookeeper() + .withReplicas(3) + .withNewEphemeralStorage() + .endEphemeralStorage() + .endZookeeper() + .withNewKafka() + .withListeners(new GenericKafkaListenerBuilder() + .withName("listener") + .withPort(9092) + .withTls(true) + .withType(KafkaListenerType.INTERNAL) + .withNewKafkaListenerAuthenticationTlsAuth() + .endKafkaListenerAuthenticationTlsAuth() + .build()) + .withNewKafkaAuthorizationOpa() + .withUrl("http://opa:8080") + .endKafkaAuthorizationOpa() + .endKafka() + .build(); + + InvalidResourceException e = assertThrows(InvalidResourceException.class, () -> KRaftUtils.validateKafkaCrForZooKeeper(spec, false)); + assertThat(e.getMessage(), containsString("The .spec.kafka.replicas property of the Kafka custom resource is missing. This property is required for a ZooKeeper-based Kafka cluster that is not using Node Pools.")); + assertThat(e.getMessage(), containsString("The .spec.kafka.storage section of the Kafka custom resource is missing. This section is required for a ZooKeeper-based Kafka cluster that is not using Node Pools.")); + } } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaSpecCheckerWithNodePoolsTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaSpecCheckerWithNodePoolsTest.java index 0e8aa7e45bb..02c67409710 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaSpecCheckerWithNodePoolsTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaSpecCheckerWithNodePoolsTest.java @@ -41,10 +41,6 @@ public class KafkaSpecCheckerWithNodePoolsTest { .endMetadata() .withNewSpec() .withNewKafka() - .withReplicas(3) - .withNewJbodStorage() - .withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").build()) - .endJbodStorage() .withListeners(new GenericKafkaListenerBuilder().withName("tls").withPort(9093).withType(KafkaListenerType.INTERNAL).withTls().build()) .withConfig(Map.of("default.replication.factor", 2, "min.insync.replicas", 2)) .endKafka() diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorNodePoolWatcherTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorNodePoolWatcherTest.java index e1adb576fa2..6de15fe9a23 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorNodePoolWatcherTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorNodePoolWatcherTest.java @@ -49,15 +49,12 @@ public class KafkaAssemblyOperatorNodePoolWatcherTest { .endMetadata() .withNewSpec() .withNewKafka() - .withReplicas(3) .withListeners(new GenericKafkaListenerBuilder() .withName("plain") .withPort(9092) .withType(KafkaListenerType.INTERNAL) .withTls(false) .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() .endKafka() .withNewZookeeper() .withReplicas(3) diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorNonParametrizedTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorNonParametrizedTest.java index ee5c046b13c..0ba47b2291e 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorNonParametrizedTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorNonParametrizedTest.java @@ -10,6 +10,8 @@ import io.strimzi.api.kafka.model.kafka.KafkaBuilder; import io.strimzi.api.kafka.model.kafka.KafkaList; import io.strimzi.api.kafka.model.kafka.KafkaResources; +import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; +import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; import io.strimzi.certs.OpenSslCertManager; import io.strimzi.operator.cluster.ClusterOperatorConfig; import io.strimzi.operator.cluster.KafkaVersionTestUtils; @@ -17,6 +19,7 @@ import io.strimzi.operator.cluster.ResourceUtils; import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; import io.strimzi.operator.common.Reconciliation; +import io.strimzi.operator.common.model.InvalidResourceException; import io.strimzi.operator.common.model.PasswordGenerator; import io.strimzi.operator.common.operator.resource.ClusterRoleBindingOperator; import io.strimzi.operator.common.operator.resource.CrdOperator; @@ -37,9 +40,11 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; @@ -151,4 +156,57 @@ public void testSelectorLabels(VertxTestContext context) { async.flag(); }))); } + + /** + * Tests that KRaft cluster cannot be deployed without using NodePools + * + * @param context Test context + */ + @Test + public void testOptionalCustomResourceFieldsValidation(VertxTestContext context) { + Kafka kafka = new KafkaBuilder() + .withNewMetadata() + .withName(NAME) + .withNamespace(NAMESPACE) + .endMetadata() + .withNewSpec() + .withNewKafka() + .withListeners(new GenericKafkaListenerBuilder() + .withName("listener") + .withPort(9092) + .withTls(true) + .withType(KafkaListenerType.INTERNAL) + .withNewKafkaListenerAuthenticationTlsAuth() + .endKafkaListenerAuthenticationTlsAuth() + .build()) + .endKafka() + .endSpec() + .build(); + + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + CrdOperator mockKafkaOps = supplier.kafkaOperator; + when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(NAME))).thenReturn(Future.succeededFuture(kafka)); + + ClusterOperatorConfig config = ResourceUtils.dummyClusterOperatorConfig(KafkaVersionTestUtils.getKafkaVersionLookup()); + + KafkaAssemblyOperator kao = new KafkaAssemblyOperator( + vertx, new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), + certManager, + passwordGenerator, + supplier, + config); + + Checkpoint async = context.checkpoint(); + kao.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME)) + .onComplete(context.failing(v -> context.verify(() -> { + assertThat(v, instanceOf(InvalidResourceException.class)); + + assertThat(v.getMessage(), containsString("The .spec.zookeeper section of the Kafka custom resource is missing. This section is required for a ZooKeeper-based cluster.")); + assertThat(v.getMessage(), containsString("The .spec.kafka.replicas property of the Kafka custom resource is missing. This property is required for a ZooKeeper-based Kafka cluster that is not using Node Pools.")); + assertThat(v.getMessage(), containsString("The .spec.kafka.storage section of the Kafka custom resource is missing. This section is required for a ZooKeeper-based Kafka cluster that is not using Node Pools.")); + + async.flag(); + }))); + } } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithPoolsKRaftMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithPoolsKRaftMockTest.java index 590a0b3f720..c47712c6826 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithPoolsKRaftMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithPoolsKRaftMockTest.java @@ -88,30 +88,16 @@ public void init() { .withNewSpec() .withNewKafka() .withConfig(new HashMap<>()) - .withReplicas(3) .withListeners(new GenericKafkaListenerBuilder() .withName("tls") .withPort(9092) .withType(KafkaListenerType.INTERNAL) .withTls(true) .build()) - .withNewPersistentClaimStorage() - .withSize("123") - .withStorageClass("foo") - .withDeleteClaim(true) - .endPersistentClaimStorage() .endKafka() - .withNewZookeeper() - .withReplicas(3) - .withNewPersistentClaimStorage() - .withSize("123") - .withStorageClass("foo") - .withDeleteClaim(true) - .endPersistentClaimStorage() - .endZookeeper() .endSpec() .withNewStatus() - .withClusterId("CLUSTERID") // Needed to avoid CLuster ID conflicts => should be the same as used in the Kafka Admin API + .withClusterId("CLUSTERID") // Needed to avoid CLuster ID conflicts => should be the same as used in the Kafka Admin API .endStatus() .build(); @@ -173,7 +159,6 @@ public void init() { ClusterOperatorConfig config = new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), VERSIONS) .with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "10000") - .with(ClusterOperatorConfig.FEATURE_GATES.key(), "+UseKRaft") .build(); operator = new KafkaAssemblyOperator(vertx, pfa, new MockCertManager(), new PasswordGenerator(10, "a", "a"), supplier, config); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithPoolsMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithPoolsMockTest.java index 9cec98767bd..d1388fe2c14 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithPoolsMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithPoolsMockTest.java @@ -98,18 +98,12 @@ public void init() { .withNewSpec() .withNewKafka() .withConfig(new HashMap<>()) - .withReplicas(3) .withListeners(new GenericKafkaListenerBuilder() .withName("tls") .withPort(9092) .withType(KafkaListenerType.INTERNAL) .withTls(true) .build()) - .withNewPersistentClaimStorage() - .withSize("123") - .withStorageClass("foo") - .withDeleteClaim(true) - .endPersistentClaimStorage() .endKafka() .withNewZookeeper() .withReplicas(3) diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithPoolsTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithPoolsTest.java index 9e80a0088d0..e3725d470a7 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithPoolsTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithPoolsTest.java @@ -129,15 +129,12 @@ public class KafkaAssemblyOperatorWithPoolsTest { .endMetadata() .withNewSpec() .withNewKafka() - .withReplicas(3) .withListeners(new GenericKafkaListenerBuilder() .withName("plain") .withPort(9092) .withType(KafkaListenerType.INTERNAL) .withTls(false) .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() .endKafka() .withNewZookeeper() .withReplicas(3) @@ -1347,8 +1344,11 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), public void testKRaftClusterWithoutNodePools(VertxTestContext context) { Kafka kafka = new KafkaBuilder(KAFKA) .editMetadata() - .withAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled")) + .withAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled")) .endMetadata() + .editSpec() + .withZookeeper(null) + .endSpec() .build(); ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); @@ -1356,9 +1356,7 @@ public void testKRaftClusterWithoutNodePools(VertxTestContext context) { CrdOperator mockKafkaOps = supplier.kafkaOperator; when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(kafka)); - ClusterOperatorConfig config = new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), VERSIONS) - .with(ClusterOperatorConfig.FEATURE_GATES.key(), "+UseKRaft") - .build(); + ClusterOperatorConfig config = ResourceUtils.dummyClusterOperatorConfig(VERSIONS); KafkaAssemblyOperator kao = new KafkaAssemblyOperator( vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), @@ -1396,8 +1394,11 @@ public void testNoNodePoolsValidation(VertxTestContext context) { CrdOperator mockKafkaOps = supplier.kafkaOperator; Kafka kraftEnabledKafka = new KafkaBuilder(KAFKA) .editMetadata() - .addToAnnotations(Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled") + .addToAnnotations(Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled") .endMetadata() + .editSpec() + .withZookeeper(null) + .endSpec() .build(); when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(kraftEnabledKafka)); when(mockKafkaOps.updateStatusAsync(any(), any())).thenReturn(Future.succeededFuture()); @@ -1412,9 +1413,7 @@ public void testNoNodePoolsValidation(VertxTestContext context) { CrdOperator mockKafkaNodePoolOps = supplier.kafkaNodePoolOperator; when(mockKafkaNodePoolOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(null)); - ClusterOperatorConfig config = new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), VERSIONS) - .with(ClusterOperatorConfig.FEATURE_GATES.key(), "+UseKRaft") - .build(); + ClusterOperatorConfig config = ResourceUtils.dummyClusterOperatorConfig(VERSIONS); KafkaAssemblyOperator kao = new KafkaAssemblyOperator( vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaUpgradeDowngradeWithKRaftMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaUpgradeDowngradeWithKRaftMockTest.java index 5862a711428..59e81d57fc1 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaUpgradeDowngradeWithKRaftMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaUpgradeDowngradeWithKRaftMockTest.java @@ -91,9 +91,6 @@ public class KafkaUpgradeDowngradeWithKRaftMockTest { .endMetadata() .withNewSpec() .withNewKafka() - .withReplicas(3) - .withNewEphemeralStorage() - .endEphemeralStorage() .withListeners(new GenericKafkaListenerBuilder() .withName("plain") .withPort(9092) @@ -101,11 +98,6 @@ public class KafkaUpgradeDowngradeWithKRaftMockTest { .withTls(false) .build()) .endKafka() - .withNewZookeeper() - .withReplicas(3) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endZookeeper() .withNewEntityOperator() .withNewTopicOperator() .endTopicOperator() @@ -191,7 +183,6 @@ private Future initialize(Kafka initialKafka, String initialMetadataVersio ClusterOperatorConfig config = new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), VERSIONS) .with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "10000") - .with(ClusterOperatorConfig.FEATURE_GATES.key(), "+UseKRaft") .build(); operator = new KafkaAssemblyOperator(vertx, PFA, new MockCertManager(), diff --git a/documentation/assemblies/configuring/assembly-storage.adoc b/documentation/assemblies/configuring/assembly-storage.adoc index 9e77c4e46b7..f6d347c8dbe 100644 --- a/documentation/assemblies/configuring/assembly-storage.adoc +++ b/documentation/assemblies/configuring/assembly-storage.adoc @@ -17,8 +17,7 @@ The supported storage types are: To configure storage, you specify `storage` properties in the custom resource of the component. The storage type is set using the `storage.type` property. -You can also use the preview of the node pools feature for advanced storage management of the Kafka cluster. -You can specify storage configuration unique to each node pool used in the cluster. +When using node pools, you can specify storage configuration unique to each node pool used in the cluster. The same storage properties available to the `Kafka` resource are also available to the `KafkaNodePool` pool resource. The storage-related schema references provide more information on the storage configuration properties: diff --git a/documentation/modules/appendix_crds.adoc b/documentation/modules/appendix_crds.adoc index d41c2e8fe75..4549744be2a 100644 --- a/documentation/modules/appendix_crds.adoc +++ b/documentation/modules/appendix_crds.adoc @@ -25,7 +25,7 @@ Used in: xref:type-Kafka-{context}[`Kafka`] |Property |Description |kafka 1.2+<. * Only SPS will be reconciled, when this env variable will be true @@ -205,6 +210,7 @@ public class Environment { public static final boolean SKIP_TEARDOWN = getOrDefault(SKIP_TEARDOWN_ENV, Boolean::parseBoolean, false); public static final String STRIMZI_RBAC_SCOPE = getOrDefault(STRIMZI_RBAC_SCOPE_ENV, STRIMZI_RBAC_SCOPE_DEFAULT); public static final String STRIMZI_FEATURE_GATES = getOrDefault(STRIMZI_FEATURE_GATES_ENV, STRIMZI_FEATURE_GATES_DEFAULT); + public static final boolean STRIMZI_USE_KRAFT_IN_TESTS = getOrDefault(STRIMZI_USE_KRAFT_IN_TESTS_ENV, Boolean::parseBoolean, false); // variables for kafka client app images private static final String TEST_CLIENTS_VERSION = getOrDefault(TEST_CLIENTS_VERSION_ENV, TEST_CLIENTS_VERSION_DEFAULT); @@ -271,7 +277,7 @@ public static boolean isNamespaceRbacScope() { * @return true if KRaft mode is enabled, otherwise false */ public static boolean isKRaftModeEnabled() { - return STRIMZI_FEATURE_GATES.contains(TestConstants.USE_KRAFT_MODE); + return !STRIMZI_FEATURE_GATES.contains(TestConstants.DONT_USE_KRAFT_MODE) && STRIMZI_USE_KRAFT_IN_TESTS; } public static boolean isKafkaNodePoolsEnabled() { diff --git a/systemtest/src/main/java/io/strimzi/systemtest/TestConstants.java b/systemtest/src/main/java/io/strimzi/systemtest/TestConstants.java index 09640e1cff3..f8f2313169f 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/TestConstants.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/TestConstants.java @@ -200,9 +200,10 @@ public interface TestConstants { /** * Feature gate related constants */ - String USE_KRAFT_MODE = "+UseKRaft"; + String DONT_USE_KRAFT_MODE = "-UseKRaft"; String DONT_USE_KAFKA_NODE_POOLS = "-KafkaNodePools"; // kept for upgrade/downgrade tests in KRaft + String USE_KRAFT_MODE = "+UseKRaft"; String USE_KAFKA_NODE_POOLS = "+KafkaNodePools"; String DONT_USE_UNIDIRECTIONAL_TOPIC_OPERATOR = "-UnidirectionalTopicOperator"; String USE_UNIDIRECTIONAL_TOPIC_OPERATOR = "+UnidirectionalTopicOperator"; diff --git a/systemtest/src/test/java/io/strimzi/systemtest/operators/FeatureGatesST.java b/systemtest/src/test/java/io/strimzi/systemtest/operators/FeatureGatesST.java index 2a2162b1424..1a146c0ad59 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/operators/FeatureGatesST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/operators/FeatureGatesST.java @@ -176,7 +176,7 @@ void testKafkaManagementTransferToAndFromKafkaNodePool(ExtensionContext extensio final String kafkaNodePoolName = "kafka"; // as the only FG set in the CO is 'KafkaNodePools' (kraft not included) Broker role is the only one that kafka broker can take - setupClusterOperatorWithFeatureGate(extensionContext, "+KafkaNodePools"); + setupClusterOperatorWithFeatureGate(extensionContext, ""); // setup clients final KafkaClients clients = new KafkaClientsBuilder() @@ -239,7 +239,7 @@ void testKafkaManagementTransferToAndFromKafkaNodePool(ExtensionContext extensio LOGGER.info("Changing FG env variable to disable Kafka Node Pools"); List coEnvVars = kubeClient().getDeployment(clusterOperator.getDeploymentNamespace(), TestConstants.STRIMZI_DEPLOYMENT_NAME).getSpec().getTemplate().getSpec().getContainers().get(0).getEnv(); - coEnvVars.stream().filter(env -> env.getName().equals(Environment.STRIMZI_FEATURE_GATES_ENV)).findFirst().get().setValue("-KafkaNodePools"); + coEnvVars.stream().filter(env -> env.getName().equals(Environment.STRIMZI_FEATURE_GATES_ENV)).findFirst().get().setValue("-KafkaNodePools,-UseKRaft"); Deployment coDep = kubeClient().getDeployment(clusterOperator.getDeploymentNamespace(), TestConstants.STRIMZI_DEPLOYMENT_NAME); coDep.getSpec().getTemplate().getSpec().getContainers().get(0).setEnv(coEnvVars); diff --git a/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml b/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml index 621e01bad0f..7a1abc610da 100644 --- a/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml +++ b/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml @@ -56,7 +56,7 @@ filePaths: kafkaBefore: "/examples/kafka/kafka-persistent.yaml" kafkaAfter: "/examples/kafka/kafka-persistent.yaml" - kafkaKRaftBefore: "/examples/kafka/nodepools/kafka-with-kraft.yaml" + kafkaKRaftBefore: "/examples/kafka/kraft/kafka.yaml" kafkaKRaftAfter: "/examples/kafka/nodepools/kafka-with-kraft.yaml" - fromVersion: HEAD toVersion: 0.39.0 @@ -84,5 +84,5 @@ filePaths: kafkaBefore: "/examples/kafka/kafka-persistent.yaml" kafkaAfter: "/examples/kafka/kafka-persistent.yaml" - kafkaKRaftBefore: "/examples/kafka/nodepools/kafka-with-kraft.yaml" + kafkaKRaftBefore: "/examples/kafka/kraft/kafka.yaml" kafkaKRaftAfter: "/examples/kafka/nodepools/kafka-with-kraft.yaml" diff --git a/systemtest/src/test/resources/upgrade/BundleUpgrade.yaml b/systemtest/src/test/resources/upgrade/BundleUpgrade.yaml index 203b064aa67..4828e6af2b6 100644 --- a/systemtest/src/test/resources/upgrade/BundleUpgrade.yaml +++ b/systemtest/src/test/resources/upgrade/BundleUpgrade.yaml @@ -53,7 +53,7 @@ kafkaBefore: "/examples/kafka/kafka-persistent.yaml" kafkaAfter: "/examples/kafka/kafka-persistent.yaml" kafkaKRaftBefore: "/examples/kafka/nodepools/kafka-with-kraft.yaml" - kafkaKRaftAfter: "/examples/kafka/nodepools/kafka-with-kraft.yaml" + kafkaKRaftAfter: "/examples/kafka/kraft/kafka.yaml" - fromVersion: 0.39.0 fromExamples: strimzi-0.39.0 oldestKafka: 3.5.2 @@ -78,4 +78,4 @@ kafkaBefore: "/examples/kafka/kafka-persistent.yaml" kafkaAfter: "/examples/kafka/kafka-persistent.yaml" kafkaKRaftBefore: "/examples/kafka/nodepools/kafka-with-kraft.yaml" - kafkaKRaftAfter: "/examples/kafka/nodepools/kafka-with-kraft.yaml" \ No newline at end of file + kafkaKRaftAfter: "/examples/kafka/kraft/kafka.yaml" \ No newline at end of file diff --git a/systemtest/tmt/tests/strimzi/main.fmf b/systemtest/tmt/tests/strimzi/main.fmf index 0540fdce74a..d2f42dd6469 100644 --- a/systemtest/tmt/tests/strimzi/main.fmf +++ b/systemtest/tmt/tests/strimzi/main.fmf @@ -55,7 +55,6 @@ adjust: tier: 2 environment+: TEST_PROFILE: operators - STRIMZI_FEATURE_GATES: "+UseKRaft" /kraft-components: summary: Run regression kraft strimzi test suite @@ -64,7 +63,6 @@ adjust: tier: 2 environment+: TEST_PROFILE: components - STRIMZI_FEATURE_GATES: "+UseKRaft" /acceptance: summary: Run acceptance strimzi test suite