Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#8732 Enhance KafkaBridge resource with consumer inactivity timeout and HTTP consumer/producer parts enablement #9820

Merged
merged 36 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0fb606b
Improved reusability by moving into separate function to use in Kafka…
steffen-karlsson Mar 12, 2024
211d1f1
Adding objects to include into `http` for enabling `consumer` and `pr…
steffen-karlsson Mar 12, 2024
bfb4299
Adding `timeoutSeconds `, `consumer` and `producer` to `http` config
steffen-karlsson Mar 12, 2024
695b5da
Rewriting env generation to reduce cognitive complexity
steffen-karlsson Mar 12, 2024
59678c4
Adding tests for `timeoutSeconds `, `consumer` and `producer`
steffen-karlsson Mar 12, 2024
144151a
Updating `HttpConfig` with `timeoutSeconds`
steffen-karlsson Mar 12, 2024
ae65e04
Fixing bugs by pipeline
steffen-karlsson Mar 12, 2024
2da7d13
Removing host again as per suggestion
steffen-karlsson Mar 13, 2024
452d181
Adding bridge CRD
steffen-karlsson Mar 13, 2024
6300db7
Moving to cluster operator to ModelUtils
steffen-karlsson Mar 13, 2024
8ee9489
Unused imports
steffen-karlsson Mar 13, 2024
54bd399
Merge remote-tracking branch 'upstream/main' into 8732-enhance-kafkab…
steffen-karlsson May 7, 2024
40bcdd8
Move timeoutSeconds to consumer config and rename to config
steffen-karlsson May 7, 2024
a55454d
Updating crd and docs to new names
steffen-karlsson May 7, 2024
a6dc90c
Fixing spotbug: KafkaBridgeCluster.http not initialized in constructo…
steffen-karlsson May 7, 2024
2bb1287
Revert logic back to main, too much commit
steffen-karlsson May 7, 2024
5828da1
Revert extra space
steffen-karlsson May 7, 2024
87becf0
Revert unused
steffen-karlsson May 7, 2024
cd086b9
Update CHANGELOG.md
steffen-karlsson May 7, 2024
e22f7e2
Update documentation
steffen-karlsson May 7, 2024
e099bc3
Fixing order in test
steffen-karlsson May 7, 2024
d9f0402
Moving and to and instead of
steffen-karlsson May 7, 2024
d726cac
Updating documentation
steffen-karlsson May 7, 2024
07be48d
Update derived resources
steffen-karlsson May 7, 2024
eba352a
Fixing IT test for KafkaBridge
steffen-karlsson May 7, 2024
8d07b18
Consumer and Producer should always be initated otherwise enabled is …
steffen-karlsson May 8, 2024
26928f7
Extending documentation as per request
steffen-karlsson May 10, 2024
9759bfc
Review comment
steffen-karlsson May 10, 2024
f148dda
Moved to Release 0.42.0
steffen-karlsson May 10, 2024
72b3083
Adding JsonInclude.Include.NON_DEFAULT
steffen-karlsson May 10, 2024
38b1faa
Update documentation/api/io.strimzi.api.kafka.model.bridge.KafkaBridg…
steffen-karlsson May 10, 2024
c21af96
Merge branch 'main' into 8732-enhance-kafkabridge
steffen-karlsson May 10, 2024
9acb682
Updating CRDs
steffen-karlsson May 10, 2024
2aef28e
Remove default initialisation
steffen-karlsson May 15, 2024
3663890
Update derived resources
steffen-karlsson May 15, 2024
9e7e000
Removed because of review comments
steffen-karlsson May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# CHANGELOG

## 0.42.0

* Enhance `KafkaBridge` resource with consumer inactivity timeout and HTTP consumer/producer enablement.

## 0.41.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,41 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"config"})
@JsonPropertyOrder({"enabled", "timeoutSeconds", "config"})
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaBridgeConsumerSpec extends KafkaBridgeClientSpec {
public static final String FORBIDDEN_PREFIXES = "ssl., bootstrap.servers, group.id, sasl., security.";
public static final String FORBIDDEN_PREFIX_EXCEPTIONS = "ssl.endpoint.identification.algorithm, ssl.cipher.suites, ssl.protocol, ssl.enabled.protocols";

public static final long HTTP_DEFAULT_TIMEOUT = -1L;

private boolean enabled = true;
private long timeoutSeconds = HTTP_DEFAULT_TIMEOUT;

@Override
@Description("The Kafka consumer configuration used for consumer instances created by the bridge. Properties with the following prefixes cannot be set: " + FORBIDDEN_PREFIXES + " (with the exception of: " + FORBIDDEN_PREFIX_EXCEPTIONS + ").")
public Map<String, Object> getConfig() {
return config;
}

@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@Description("Whether the HTTP consumer should be enabled or disabled, default is enabled.")
public boolean isEnabled() {
steffen-karlsson marked this conversation as resolved.
Show resolved Hide resolved
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@Description("The timeout in seconds for deleting inactive consumers, default is -1 (disabled).")
public long getTimeoutSeconds() {
steffen-karlsson marked this conversation as resolved.
Show resolved Hide resolved
return timeoutSeconds;
}

public void setTimeoutSeconds(long timeoutSeconds) {
this.timeoutSeconds = timeoutSeconds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,28 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"config"})
@JsonPropertyOrder({"enabled", "config"})
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaBridgeProducerSpec extends KafkaBridgeClientSpec {
public static final String FORBIDDEN_PREFIXES = "ssl., bootstrap.servers, sasl., security.";
public static final String FORBIDDEN_PREFIX_EXCEPTIONS = "ssl.endpoint.identification.algorithm, ssl.cipher.suites, ssl.protocol, ssl.enabled.protocols";

private boolean enabled = true;

@Override
@Description("The Kafka producer configuration used for producer instances created by the bridge. Properties with the following prefixes cannot be set: " + FORBIDDEN_PREFIXES + " (with the exception of: " + FORBIDDEN_PREFIX_EXCEPTIONS + ").")
public Map<String, Object> getConfig() {
return config;
}

@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@Description("Whether the HTTP producer should be enabled or disabled, default is enabled.")
public boolean isEnabled() {
steffen-karlsson marked this conversation as resolved.
Show resolved Hide resolved
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ spec:
image: "foo"
bootstrapServers: "my-cluster-kafka:9092"
consumer:
enabled: true
timeoutSeconds: 60
config:
foo: "bur"
producer:
enabled: true
config:
foo: "buz"
enableMetrics: false
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ spec:
config:
foo: buz
consumer:
timeoutSeconds: 60
config:
foo: bur
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging

protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_HOST = "KAFKA_BRIDGE_HTTP_HOST";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_PORT = "KAFKA_BRIDGE_HTTP_PORT";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT = "KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT";
protected static final String ENV_VAR_KAFKA_BRIDGE_CONSUMER_ENABLED = "KAFKA_BRIDGE_CONSUMER_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_PRODUCER_ENABLED = "KAFKA_BRIDGE_PRODUCER_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED = "KAFKA_BRIDGE_CORS_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS = "KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS";
protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_METHODS = "KAFKA_BRIDGE_CORS_ALLOWED_METHODS";
Expand Down Expand Up @@ -401,10 +404,26 @@ protected List<EnvVar> getEnvVars() {

varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS, bootstrapServers));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG, kafkaBridgeAdminClient == null ? "" : new KafkaBridgeAdminClientConfiguration(reconciliation, kafkaBridgeAdminClient.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG, kafkaBridgeConsumer == null ? "" : new KafkaBridgeConsumerConfiguration(reconciliation, kafkaBridgeConsumer.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG, kafkaBridgeProducer == null ? "" : new KafkaBridgeProducerConfiguration(reconciliation, kafkaBridgeProducer.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_ID, cluster));

if (kafkaBridgeConsumer != null) {
steffen-karlsson marked this conversation as resolved.
Show resolved Hide resolved
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG, new KafkaBridgeConsumerConfiguration(reconciliation, kafkaBridgeConsumer.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_ENABLED, String.valueOf(kafkaBridgeConsumer.isEnabled())));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT, String.valueOf(kafkaBridgeConsumer.getTimeoutSeconds())));
} else {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG, ""));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_ENABLED, "true"));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT, String.valueOf(KafkaBridgeConsumerSpec.HTTP_DEFAULT_TIMEOUT)));
}

if (kafkaBridgeProducer != null) {
steffen-karlsson marked this conversation as resolved.
Show resolved Hide resolved
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG, new KafkaBridgeProducerConfiguration(reconciliation, kafkaBridgeProducer.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_ENABLED, String.valueOf(kafkaBridgeProducer.isEnabled())));
} else {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG, ""));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_ENABLED, "true"));
}

varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_HOST, KafkaBridgeHttpConfig.HTTP_DEFAULT_HOST));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PORT, String.valueOf(http != null ? http.getPort() : KafkaBridgeHttpConfig.HTTP_DEFAULT_PORT)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
import io.strimzi.api.kafka.model.bridge.KafkaBridge;
import io.strimzi.api.kafka.model.bridge.KafkaBridgeBuilder;
import io.strimzi.api.kafka.model.bridge.KafkaBridgeConsumerSpec;
import io.strimzi.api.kafka.model.bridge.KafkaBridgeConsumerSpecBuilder;
import io.strimzi.api.kafka.model.bridge.KafkaBridgeHttpConfig;
import io.strimzi.api.kafka.model.bridge.KafkaBridgeProducerSpecBuilder;
import io.strimzi.api.kafka.model.bridge.KafkaBridgeResources;
import io.strimzi.api.kafka.model.common.CertSecretSource;
import io.strimzi.api.kafka.model.common.CertSecretSourceBuilder;
Expand Down Expand Up @@ -136,9 +139,12 @@ protected List<EnvVar> getExpectedEnvVars() {
expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_STRIMZI_GC_LOG_ENABLED).withValue(String.valueOf(JvmOptions.DEFAULT_GC_LOGGING_ENABLED)).build());
expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS).withValue(bootstrapServers).build());
expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG).withValue(defaultAdminclientConfiguration).build());
expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_ID).withValue(cluster).build());
expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG).withValue(defaultConsumerConfiguration).build());
expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_CONSUMER_ENABLED).withValue(String.valueOf(true)).build());
expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT).withValue(String.valueOf(KafkaBridgeConsumerSpec.HTTP_DEFAULT_TIMEOUT)).build());
expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG).withValue(defaultProducerConfiguration).build());
expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_ID).withValue(cluster).build());
expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_PRODUCER_ENABLED).withValue(String.valueOf(true)).build());
steffen-karlsson marked this conversation as resolved.
Show resolved Hide resolved
expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_HTTP_HOST).withValue(KafkaBridgeHttpConfig.HTTP_DEFAULT_HOST).build());
expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_HTTP_PORT).withValue(String.valueOf(KafkaBridgeHttpConfig.HTTP_DEFAULT_PORT)).build());
expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED).withValue(String.valueOf(false)).build());
Expand Down Expand Up @@ -1300,4 +1306,22 @@ public void testJvmOptions() {
assertThat(javaOpts.getValue(), containsString("-Xmx256m"));
assertThat(javaOpts.getValue(), containsString("-XX:InitiatingHeapOccupancyPercent=36"));
}

@ParallelTest
public void testConsumerProducerOptions() {
KafkaBridge resource = new KafkaBridgeBuilder(this.resource)
.editSpec()
.withConsumer(new KafkaBridgeConsumerSpecBuilder().withTimeoutSeconds(60).build())
.withProducer(new KafkaBridgeProducerSpecBuilder().build())
.endSpec()
.build();

KafkaBridgeCluster kb = KafkaBridgeCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, SHARED_ENV_PROVIDER);
Deployment deployment = kb.generateDeployment(new HashMap<>(), true, null, null);
Container container = deployment.getSpec().getTemplate().getSpec().getContainers().get(0);

assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(container).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT), is("60"));
assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(container).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_CONSUMER_ENABLED), is("true"));
assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(container).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_PRODUCER_ENABLED), is("true"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ All other supported options are forwarded to Kafka Bridge, including the followi

* Any `ssl` configuration for xref:con-common-configuration-ssl-reference[supported TLS versions and cipher suites]

[id='property-kafka-bridge-consumer-enabled-config-{context}']
= `enabled`

Enablement of the consumer can be controlled by setting the `enabled` field to `true` or `false`. Default is `true`.

[id='property-kafka-bridge-consumer-timeout-config-{context}']
= `timeoutSeconds`

The timeout for deleting inactive consumers can be configured using `timeoutSeconds`, by default it is not enabled.

.Example Kafka Bridge consumer configuration
[source,yaml,subs="attributes+"]
----
Expand All @@ -39,6 +49,8 @@ metadata:
spec:
# ...
consumer:
enabled: true
timeoutSeconds: 60
config:
auto.offset.reset: earliest
enable.auto.commit: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ All other supported options are forwarded to Kafka Bridge, including the followi

* Any `ssl` configuration for xref:con-common-configuration-ssl-reference[supported TLS versions and cipher suites]

[id='property-kafka-bridge-producer-enabled-config-{context}']
= `enabled`

Enablement of the producer can be controlled by setting the `enabled` field to `true` or `false`. Default is `true`.

.Example Kafka Bridge producer configuration
[source,yaml,subs="attributes+"]
----
Expand All @@ -38,6 +43,7 @@ metadata:
spec:
# ...
producer:
enabled: true
config:
acks: 1
delivery.timeout.ms: 300000
Expand Down
9 changes: 9 additions & 0 deletions documentation/modules/appendix_crds.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3669,6 +3669,12 @@ include::../api/io.strimzi.api.kafka.model.bridge.KafkaBridgeConsumerSpec.adoc[l
[cols="2,2,3a",options="header"]
|====
|Property |Property type |Description
|enabled
|boolean
|Whether the HTTP consumer should be enabled or disabled, default is enabled.
|timeoutSeconds
|integer
|The timeout in seconds for deleting inactive consumers, default is -1 (disabled).
|config
|map
|The Kafka consumer configuration used for consumer instances created by the bridge. Properties with the following prefixes cannot be set: ssl., bootstrap.servers, group.id, sasl., security. (with the exception of: ssl.endpoint.identification.algorithm, ssl.cipher.suites, ssl.protocol, ssl.enabled.protocols).
Expand All @@ -3690,6 +3696,9 @@ include::../api/io.strimzi.api.kafka.model.bridge.KafkaBridgeProducerSpec.adoc[l
[cols="2,2,3a",options="header"]
|====
|Property |Property type |Description
|enabled
|boolean
|Whether the HTTP producer should be enabled or disabled, default is enabled.
|config
|map
|The Kafka producer configuration used for producer instances created by the bridge. Properties with the following prefixes cannot be set: ssl., bootstrap.servers, sasl., security. (with the exception of: ssl.endpoint.identification.algorithm, ssl.cipher.suites, ssl.protocol, ssl.enabled.protocols).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ spec:
consumer:
type: object
properties:
enabled:
type: boolean
description: "Whether the HTTP consumer should be enabled or disabled, default is enabled."
timeoutSeconds:
type: integer
description: "The timeout in seconds for deleting inactive consumers, default is -1 (disabled)."
config:
x-kubernetes-preserve-unknown-fields: true
type: object
Expand All @@ -273,6 +279,9 @@ spec:
producer:
type: object
properties:
enabled:
type: boolean
description: "Whether the HTTP producer should be enabled or disabled, default is enabled."
config:
x-kubernetes-preserve-unknown-fields: true
type: object
Expand Down
9 changes: 9 additions & 0 deletions packaging/install/cluster-operator/046-Crd-kafkabridge.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ spec:
consumer:
type: object
properties:
enabled:
type: boolean
description: "Whether the HTTP consumer should be enabled or disabled, default is enabled."
timeoutSeconds:
type: integer
description: "The timeout in seconds for deleting inactive consumers, default is -1 (disabled)."
config:
x-kubernetes-preserve-unknown-fields: true
type: object
Expand All @@ -272,6 +278,9 @@ spec:
producer:
type: object
properties:
enabled:
type: boolean
description: "Whether the HTTP producer should be enabled or disabled, default is enabled."
config:
x-kubernetes-preserve-unknown-fields: true
type: object
Expand Down
Loading