Skip to content

Commit

Permalink
Promote UseKRaft feature gate to beta (#9518)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj authored Jan 12, 2024
1 parent e347982 commit 02fb63d
Show file tree
Hide file tree
Showing 48 changed files with 325 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jobs:
display_name: 'feature-gates-regression-bundle I. - kafka + oauth'
profile: 'azp_kafka_oauth'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
Expand All @@ -16,7 +16,7 @@ jobs:
display_name: 'feature-gates-regression-bundle II. - security'
profile: 'azp_security'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
Expand All @@ -27,7 +27,7 @@ jobs:
display_name: 'feature-gates-regression-bundle III. - dynconfig + tracing + watcher'
profile: 'azp_dynconfig_listeners_tracing_watcher'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
Expand All @@ -38,7 +38,7 @@ jobs:
display_name: 'feature-gates-regression-bundle IV. - operators'
profile: 'azp_operators'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
Expand All @@ -49,7 +49,7 @@ jobs:
display_name: 'feature-gates-regression-bundle V. - rollingupdate'
profile: 'azp_rolling_update_bridge'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
Expand All @@ -60,7 +60,7 @@ jobs:
display_name: 'feature-gates-regression-bundle VI. - connect + mirrormaker'
profile: 'azp_connect_mirrormaker'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
Expand All @@ -71,7 +71,7 @@ jobs:
display_name: 'feature-gates-regression-bundle VII. - remaining system tests'
profile: 'azp_remaining'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
cluster_operator_install_type: 'bundle'
timeout: 360
strimzi_rbac_scope: NAMESPACE
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'

Expand All @@ -21,7 +21,7 @@ jobs:
cluster_operator_install_type: 'bundle'
timeout: 360
strimzi_rbac_scope: NAMESPACE
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'

Expand All @@ -34,7 +34,7 @@ jobs:
cluster_operator_install_type: 'bundle'
timeout: 360
strimzi_rbac_scope: NAMESPACE
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'

Expand All @@ -47,7 +47,7 @@ jobs:
cluster_operator_install_type: 'bundle'
timeout: 360
strimzi_rbac_scope: NAMESPACE
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'

Expand All @@ -60,7 +60,7 @@ jobs:
cluster_operator_install_type: 'bundle'
timeout: 360
strimzi_rbac_scope: NAMESPACE
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'

Expand All @@ -73,7 +73,7 @@ jobs:
cluster_operator_install_type: 'bundle'
timeout: 360
strimzi_rbac_scope: NAMESPACE
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'

Expand All @@ -86,6 +86,6 @@ jobs:
cluster_operator_install_type: 'bundle'
timeout: 360
strimzi_rbac_scope: NAMESPACE
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator'
strimzi_feature_gates: '-KafkaNodePools,-UnidirectionalTopicOperator,-UseKRaft'
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
15 changes: 8 additions & 7 deletions .azure/templates/jobs/system-tests/kraft_regression_jobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jobs:
display_name: 'kraft-regression-bundle I. - kafka + oauth'
profile: 'azp_kafka_oauth'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '+UseKRaft'
strimzi_feature_gates: ''
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
Expand All @@ -16,7 +16,8 @@ jobs:
display_name: 'kraft-regression-bundle II. - security'
profile: 'azp_security'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '+UseKRaft'
strimzi_feature_gates: ''
strimzi_use_kraft_in_tests: "true"
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
Expand All @@ -27,7 +28,7 @@ jobs:
display_name: 'kraft-regression-bundle III. - dynconfig + tracing + watcher'
profile: 'azp_dynconfig_listeners_tracing_watcher'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '+UseKRaft'
strimzi_feature_gates: ''
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
Expand All @@ -38,7 +39,7 @@ jobs:
display_name: 'kraft-regression-bundle IV. - operators'
profile: 'azp_operators'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '+UseKRaft'
strimzi_feature_gates: ''
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
Expand All @@ -49,7 +50,7 @@ jobs:
display_name: 'kraft-regression-bundle V. - rollingupdate'
profile: 'azp_rolling_update_bridge'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '+UseKRaft'
strimzi_feature_gates: ''
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
Expand All @@ -60,7 +61,7 @@ jobs:
display_name: 'kraft-regression-bundle VI. - connect + mirrormaker'
profile: 'azp_connect_mirrormaker'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '+UseKRaft'
strimzi_feature_gates: ''
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
Expand All @@ -71,7 +72,7 @@ jobs:
display_name: 'kraft-regression-bundle VII. - remaining system tests'
profile: 'azp_remaining'
cluster_operator_install_type: 'bundle'
strimzi_feature_gates: '+UseKRaft'
strimzi_feature_gates: ''
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
2 changes: 2 additions & 0 deletions .azure/templates/steps/system_test_general.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ parameters:
parallel: '1'
run_parallel: false
releaseVersion: "latest"
strimzi_use_kraft_in_tests: "false"

jobs:
- job: '${{ parameters.name }}_system_tests'
Expand Down Expand Up @@ -137,6 +138,7 @@ jobs:
DOCKER_REGISTRY: registry.minikube
CLUSTER_OPERATOR_INSTALL_TYPE: '${{ parameters.cluster_operator_install_type }}'
STRIMZI_FEATURE_GATES: '${{ parameters.strimzi_feature_gates }}'
STRIMZI_USE_KRAFT_IN_TESTS: '${{ parameters.strimzi_use_kraft_in_tests }}'
BUILD_ID: $(Agent.JobName)
RESOURCE_ALLOCATION_STRATEGY: "NOT_SHARED"
TEST_LOG_DIR: $(test_log_dir)
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## 0.40.0

* Remove support for Apache Kafka 3.5.0 and 3.5.1
* The `UseKRaft` feature gate moves to beta stage and is enabled by default.
If needed, `UseKRaft` can be disabled in the feature gates configuration in the Cluster Operator.
* Fix NullPointerException from missing listenerConfig when using custom auth
* Added support for Kafka Exporter `offset.show-all` parameter

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ public void setRack(Rack rack) {
this.rack = rack;
}

@Description("Storage configuration (disk). Cannot be updated.")
@JsonProperty(required = true)
@Description("Storage configuration (disk). Cannot be updated. " +
"This property is required when node pools are not used.")
public Storage getStorage() {
return storage;
}
Expand All @@ -161,9 +161,9 @@ public void setLogging(Logging logging) {
this.logging = logging;
}

@Description("The number of pods in the cluster.")
@Description("The number of pods in the cluster. " +
"This property is required when node pools are not used.")
@Minimum(1)
@JsonProperty(required = true)
public int getReplicas() {
return replicas;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ public void setKafka(KafkaClusterSpec kafka) {
this.kafka = kafka;
}

@Description("Configuration of the ZooKeeper cluster")
@JsonProperty(required = true)
@Description("Configuration of the ZooKeeper cluster. This section is required when running a ZooKeeper-based Apache Kafka cluster.")
public ZookeeperClusterSpec getZookeeper() {
return zookeeper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void testKafkaWithMissingRequired() {
KubeClusterException.class,
() -> createDeleteCustomResource("Kafka-with-missing-required-property.yaml"));

assertMissingRequiredPropertiesMessage(exception.getMessage(), "zookeeper", "kafka");
assertMissingRequiredPropertiesMessage(exception.getMessage(), "kafka");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class FeatureGates {
private static final String UNIDIRECTIONAL_TOPIC_OPERATOR = "UnidirectionalTopicOperator";

// When adding new feature gates, do not forget to add them to allFeatureGates() and toString() methods
private final FeatureGate useKRaft = new FeatureGate(USE_KRAFT, false);
private final FeatureGate useKRaft = new FeatureGate(USE_KRAFT, true);
private final FeatureGate kafkaNodePools = new FeatureGate(KAFKA_NODE_POOLS, true);
private final FeatureGate unidirectionalTopicOperator = new FeatureGate(UNIDIRECTIONAL_TOPIC_OPERATOR, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,40 @@ public static void validateMetadataVersion(String metadataVersion) {
throw new InvalidResourceException("Metadata version " + metadataVersion + " is invalid", e);
}
}

/**
* In ZooKeeper mode, some of the fields marked as not required (because they are not used in KRaft) are in fact
* required. This method validates that the fields are present and in case they are missing, it throws an exception.
*
* @param kafkaSpec The .spec section of the Kafka CR which should be checked
* @param nodePoolsEnabled Flag indicating whether Node Pools are enabled or not
*/
public static void validateKafkaCrForZooKeeper(KafkaSpec kafkaSpec, boolean nodePoolsEnabled) {
Set<String> errors = new HashSet<>(0);

if (kafkaSpec != null) {
if (kafkaSpec.getZookeeper() == null) {
errors.add("The .spec.zookeeper section of the Kafka custom resource is missing. " +
"This section is required for a ZooKeeper-based cluster.");
}

if (!nodePoolsEnabled) {
if (kafkaSpec.getKafka().getReplicas() == 0) {
errors.add("The .spec.kafka.replicas property of the Kafka custom resource is missing. " +
"This property is required for a ZooKeeper-based Kafka cluster that is not using Node Pools.");
}

if (kafkaSpec.getKafka().getStorage() == null) {
errors.add("The .spec.kafka.storage section of the Kafka custom resource is missing. " +
"This section is required for a ZooKeeper-based Kafka cluster that is not using Node Pools.");
}
}
} else {
errors.add("The .spec section of the Kafka custom resource is missing");
}

if (!errors.isEmpty()) {
throw new InvalidResourceException("Kafka configuration is not valid: " + errors);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,11 @@ Future<Void> reconcile(ReconciliationState reconcileState) {
Promise<Void> chainPromise = Promise.promise();

boolean isKRaftEnabled = featureGates.useKRaftEnabled() && ReconcilerUtils.kraftEnabled(reconcileState.kafkaAssembly);
boolean nodePoolsEnabled = featureGates.kafkaNodePoolsEnabled() && ReconcilerUtils.nodePoolsEnabled(reconcileState.kafkaAssembly);

if (isKRaftEnabled) {
// Makes sure KRaft is used only with KafkaNodePool custom resources and not with virtual node pools
if (featureGates.kafkaNodePoolsEnabled()
&& !ReconcilerUtils.nodePoolsEnabled(reconcileState.kafkaAssembly)) {
if (!nodePoolsEnabled) {
throw new InvalidConfigurationException("The UseKRaft feature gate can be used only together with a Kafka cluster based on the KafkaNodePool resources.");
}

Expand All @@ -223,6 +223,13 @@ Future<Void> reconcile(ReconciliationState reconcileState) {
} catch (InvalidResourceException e) {
return Future.failedFuture(e);
}
} else {
// Validates the properties required for a ZooKeeper based Kafka cluster
try {
KRaftUtils.validateKafkaCrForZooKeeper(reconcileState.kafkaAssembly.getSpec(), nodePoolsEnabled);
} catch (InvalidResourceException e) {
return Future.failedFuture(e);
}
}

reconcileState.initialStatus()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ClusterOperatorConfigTest {
ENV_VARS.put(ClusterOperatorConfig.STRIMZI_KAFKA_MIRROR_MAKER_IMAGES, KafkaVersionTestUtils.getKafkaMirrorMakerImagesEnvVarString());
ENV_VARS.put(ClusterOperatorConfig.STRIMZI_KAFKA_MIRROR_MAKER_2_IMAGES, KafkaVersionTestUtils.getKafkaMirrorMaker2ImagesEnvVarString());
ENV_VARS.put(ClusterOperatorConfig.OPERATOR_NAMESPACE.key(), "operator-namespace");
ENV_VARS.put(ClusterOperatorConfig.FEATURE_GATES.key(), "-KafkaNodePools");
ENV_VARS.put(ClusterOperatorConfig.FEATURE_GATES.key(), "-UseKRaft");
ENV_VARS.put(ClusterOperatorConfig.DNS_CACHE_TTL.key(), "10");
ENV_VARS.put(ClusterOperatorConfig.POD_SECURITY_PROVIDER_CLASS.key(), "my.package.CustomPodSecurityProvider");
}
Expand All @@ -66,7 +66,7 @@ public void testDefaultConfig() {
assertThat(config.getOperatorNamespace(), is("operator-namespace"));
assertThat(config.getOperatorNamespaceLabels(), is(nullValue()));
assertThat(config.featureGates().kafkaNodePoolsEnabled(), is(true));
assertThat(config.featureGates().useKRaftEnabled(), is(false));
assertThat(config.featureGates().useKRaftEnabled(), is(true));
assertThat(config.isCreateClusterRoles(), is(false));
assertThat(config.isNetworkPolicyGeneration(), is(true));
assertThat(config.isPodSetReconciliationOnly(), is(false));
Expand Down Expand Up @@ -101,7 +101,7 @@ public void testEnvVars() {
assertThat(config.getOperationTimeoutMs(), is(30_000L));
assertThat(config.getConnectBuildTimeoutMs(), is(40_000L));
assertThat(config.getOperatorNamespace(), is("operator-namespace"));
assertThat(config.featureGates().kafkaNodePoolsEnabled(), is(false));
assertThat(config.featureGates().useKRaftEnabled(), is(false));
assertThat(config.getDnsCacheTtlSec(), is(10));
assertThat(config.getPodSecurityProviderClass(), is("my.package.CustomPodSecurityProvider"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private static Map<String, String> buildEnv(String namespaces, boolean podSetsOn
env.put(ClusterOperatorConfig.STRIMZI_KAFKA_CONNECT_IMAGES, KafkaVersionTestUtils.getKafkaConnectImagesEnvVarString());
env.put(ClusterOperatorConfig.STRIMZI_KAFKA_MIRROR_MAKER_IMAGES, KafkaVersionTestUtils.getKafkaMirrorMakerImagesEnvVarString());
env.put(ClusterOperatorConfig.STRIMZI_KAFKA_MIRROR_MAKER_2_IMAGES, KafkaVersionTestUtils.getKafkaMirrorMaker2ImagesEnvVarString());
env.put(ClusterOperatorConfig.FEATURE_GATES.key(), "-KafkaNodePools");
env.put(ClusterOperatorConfig.FEATURE_GATES.key(), "-UseKRaft");

if (podSetsOnly) {
env.put(ClusterOperatorConfig.POD_SET_RECONCILIATION_ONLY.key(), "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ public class FeatureGatesTest {
public void testIndividualFeatureGates() {
for (FeatureGates.FeatureGate gate : FeatureGates.NONE.allFeatureGates()) {
FeatureGates enabled = new FeatureGates("+" + gate.getName());
FeatureGates disabled = new FeatureGates("-" + gate.getName());
FeatureGates disabled;

// KafkaNodePools FG can be disabled only together with UseKRaft FG
if (gate.getName().equals("KafkaNodePools")) {
disabled = new FeatureGates("-" + gate.getName() + ",-UseKRaft");
} else {
disabled = new FeatureGates("-" + gate.getName());
}

assertThat(enabled.allFeatureGates().stream().filter(g -> gate.getName().equals(g.getName())).findFirst().orElseThrow().isEnabled(), is(true));
assertThat(disabled.allFeatureGates().stream().filter(g -> gate.getName().equals(g.getName())).findFirst().orElseThrow().isEnabled(), is(false));
Expand Down
Loading

0 comments on commit 02fb63d

Please sign in to comment.