Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev Services for Kafka configuration for image providers #29897

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions docs/src/main/asciidoc/kafka-dev-services.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,32 @@ Note that the Kafka advertised address is automatically configured with the chos
[[configuring-the-image]]
== Configuring the image

Dev Services for Kafka supports https://redpanda.com[Redpanda] and https://strimzi.io[Strimzi] (in https://github.com/apache/kafka/blob/trunk/config/kraft/README.md[Kraft] mode).
Dev Services for Kafka supports https://redpanda.com[Redpanda], https://github/ozangunalp/kafka-native[kafka-native]
and https://strimzi.io[Strimzi] (in https://github.com/apache/kafka/blob/trunk/config/kraft/README.md[Kraft] mode) images.

Redpanda is a Kafka compatible event streaming platform.
Because it provides a faster startup time dev services defaults to `vectorized/redpanda` images.
**Redpanda** is a Kafka compatible event streaming platform.
Because it provides a fast startup times, dev services defaults to Redpanda images from `vectorized/redpanda`.
You can select any version from https://hub.docker.com/r/vectorized/redpanda.

Strimzi provides container images and Operators for running Apache Kafka on Kubernetes.
**kafka-native** provides images of standard Apache Kafka distribution compiled to native binary using Quarkus and GraalVM.
While still being _experimental_, it provides very fast startup times with small footprint.

Image type can be configured using

[source, properties]
----
quarkus.kafka.devservices.provider=kafka-native
----

**Strimzi** provides container images and Operators for running Apache Kafka on Kubernetes.
While Strimzi is optimized for Kubernetes, the images work perfectly in classic container environments.
Strimzi container images run "genuine" Kafka broker on JVM, which is slower to start.

[source, properties]
----
quarkus.kafka.devservices.provider=strimzi
----

For Strimzi, you can select any image with a Kafka version which has Kraft support (2.8.1 and higher) from https://quay.io/repository/strimzi-test-container/test-container?tab=tags

[source, properties]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,39 +225,53 @@ private RunningDevService startKafka(DockerStatusBuildItem dockerStatusBuildItem

// Starting the broker
final Supplier<RunningDevService> defaultKafkaBrokerSupplier = () -> {
if (config.imageName.contains("strimzi")) {
StrimziKafkaContainer container = new StrimziKafkaContainer(config.imageName)
.withBrokerId(1)
.withKraft()
.waitForRunning();
ConfigureUtil.configureSharedNetwork(container, "kafka");
if (config.serviceName != null) {
container.withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, config.serviceName);
}
if (config.fixedExposedPort != 0) {
container.withPort(config.fixedExposedPort);
}
timeout.ifPresent(container::withStartupTimeout);

container.start();
return new RunningDevService(Feature.KAFKA_CLIENT.getName(),
container.getContainerId(),
container::close,
KAFKA_BOOTSTRAP_SERVERS, container.getBootstrapServers());
} else {
RedPandaKafkaContainer container = new RedPandaKafkaContainer(
DockerImageName.parse(config.imageName).asCompatibleSubstituteFor("vectorized/redpanda"),
config.fixedExposedPort,
launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null,
useSharedNetwork, config.redpanda);
timeout.ifPresent(container::withStartupTimeout);
container.start();

return new RunningDevService(Feature.KAFKA_CLIENT.getName(),
container.getContainerId(),
container::close,
KAFKA_BOOTSTRAP_SERVERS, container.getBootstrapServers());
switch (config.provider) {
case REDPANDA:
RedPandaKafkaContainer redpanda = new RedPandaKafkaContainer(
DockerImageName.parse(config.imageName).asCompatibleSubstituteFor("vectorized/redpanda"),
config.fixedExposedPort,
launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null,
useSharedNetwork, config.redpanda);
timeout.ifPresent(redpanda::withStartupTimeout);
redpanda.start();

return new RunningDevService(Feature.KAFKA_CLIENT.getName(),
redpanda.getContainerId(),
redpanda::close,
KAFKA_BOOTSTRAP_SERVERS, redpanda.getBootstrapServers());
case STRIMZI:
StrimziKafkaContainer strimzi = new StrimziKafkaContainer(config.imageName)
.withBrokerId(1)
.withKraft()
.waitForRunning();
ConfigureUtil.configureSharedNetwork(strimzi, "kafka");
if (config.serviceName != null) {
strimzi.withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, config.serviceName);
}
if (config.fixedExposedPort != 0) {
strimzi.withPort(config.fixedExposedPort);
}
timeout.ifPresent(strimzi::withStartupTimeout);

strimzi.start();
return new RunningDevService(Feature.KAFKA_CLIENT.getName(),
strimzi.getContainerId(),
strimzi::close,
KAFKA_BOOTSTRAP_SERVERS, strimzi.getBootstrapServers());
case KAFKA_NATIVE:
KafkaNativeContainer kafkaNative = new KafkaNativeContainer(DockerImageName.parse(config.imageName),
config.fixedExposedPort,
launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null,
useSharedNetwork);
timeout.ifPresent(kafkaNative::withStartupTimeout);
kafkaNative.start();

return new RunningDevService(Feature.KAFKA_CLIENT.getName(),
kafkaNative.getContainerId(),
kafkaNative::close,
KAFKA_BOOTSTRAP_SERVERS, kafkaNative.getBootstrapServers());
}
return null;
};

return maybeContainerAddress
Expand Down Expand Up @@ -300,11 +314,15 @@ private static final class KafkaDevServiceCfg {
private final String serviceName;
private final Map<String, Integer> topicPartitions;
private final Duration topicPartitionsTimeout;

private final KafkaDevServicesBuildTimeConfig.Provider provider;

private final RedPandaBuildTimeConfig redpanda;

public KafkaDevServiceCfg(KafkaDevServicesBuildTimeConfig config) {
this.devServicesEnabled = config.enabled.orElse(true);
this.imageName = config.imageName;
this.provider = config.provider;
this.imageName = config.imageName.orElseGet(provider::getDefaultImageName);
this.fixedExposedPort = config.port.orElse(0);
this.shared = config.shared;
this.serviceName = config.serviceName;
Expand All @@ -323,13 +341,15 @@ public boolean equals(Object o) {
return false;
}
KafkaDevServiceCfg that = (KafkaDevServiceCfg) o;
return devServicesEnabled == that.devServicesEnabled && Objects.equals(imageName, that.imageName)
return devServicesEnabled == that.devServicesEnabled
&& Objects.equals(provider, that.provider)
&& Objects.equals(imageName, that.imageName)
&& Objects.equals(fixedExposedPort, that.fixedExposedPort);
}

@Override
public int hashCode() {
return Objects.hash(devServicesEnabled, imageName, fixedExposedPort);
return Objects.hash(devServicesEnabled, provider, imageName, fixedExposedPort);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,47 @@ public class KafkaDevServicesBuildTimeConfig {
public Optional<Integer> port;

/**
* The Kafka container image to use.
* <p>
* Only Redpanda and Strimzi images are supported.
* Default image is Redpanda.
* Kafka dev service container type.
* <p>
* Note that Strimzi images are launched in Kraft mode.
* In order to use a Strimzi image you need to set a compatible image name such as
* {@code quay.io/strimzi-test-container/test-container:0.100.0-kafka-3.1.0} or
* {@code quay.io/strimzi/kafka:0.27.1-kafka-3.0.0}
* Redpanda, Strimzi and kafka-native container providers are supported. Default is redpanda.
* <p>
* For Redpanda:
* See https://vectorized.io/docs/quick-start-docker/ and https://hub.docker.com/r/vectorized/redpanda
* <p>
* For Strimzi:
* See https://github.com/strimzi/test-container and https://quay.io/repository/strimzi-test-container/test-container
* <p>
* For Kafka Native:
* See https://github.com/ozangunalp/kafka-native and https://quay.io/repository/ogunalp/kafka-native
* <p>
* Note that Strimzi and Kafka Native images are launched in Kraft mode.
*/
@ConfigItem(defaultValue = "docker.io/vectorized/redpanda:v22.3.4")
public String imageName;
@ConfigItem(defaultValue = "redpanda")
public Provider provider = Provider.REDPANDA;

public enum Provider {
REDPANDA("docker.io/vectorized/redpanda:v22.3.4"),
STRIMZI("quay.io/strimzi-test-container/test-container:latest-kafka-3.2.1"),
KAFKA_NATIVE("quay.io/ogunalp/kafka-native:latest");

private final String defaultImageName;

Provider(String imageName) {
this.defaultImageName = imageName;
}

public String getDefaultImageName() {
return defaultImageName;
}
}

/**
* The Kafka container image to use.
* <p>
* Dependent on the provider.
*/
@ConfigItem
public Optional<String> imageName;

/**
* Indicates if the Kafka broker managed by Quarkus Dev Services is shared.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.quarkus.kafka.client.deployment;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import com.github.dockerjava.api.command.InspectContainerResponse;

import io.quarkus.devservices.common.ConfigureUtil;

public class KafkaNativeContainer extends GenericContainer<KafkaNativeContainer> {

private static final String STARTER_SCRIPT = "/work/run.sh";

private final Integer fixedExposedPort;
private final boolean useSharedNetwork;

private String additionalArgs = null;
private int exposedPort = -1;

private String hostName = null;

public KafkaNativeContainer(DockerImageName dockerImageName, int fixedExposedPort, String serviceName,
boolean useSharedNetwork) {
super(dockerImageName);
this.fixedExposedPort = fixedExposedPort;
this.useSharedNetwork = useSharedNetwork;
if (serviceName != null) {
withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, serviceName);
}
String cmd = String.format("while [ ! -f %s ]; do sleep 0.1; done; sleep 0.1; %s", STARTER_SCRIPT, STARTER_SCRIPT);
withCommand("sh", "-c", cmd);
waitingFor(Wait.forLogMessage(".*Kafka broker started.*", 1));
}

@Override
protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
super.containerIsStarting(containerInfo, reused);
// Set exposed port
this.exposedPort = getMappedPort(DevServicesKafkaProcessor.KAFKA_PORT);
// follow output
// Start and configure the advertised address
String cmd = "#!/bin/bash\n";
cmd += "/work/kafka";
cmd += " -Dkafka.advertised.listeners=" + getBootstrapServers();
if (useSharedNetwork) {
cmd += " -Dkafka.listeners=BROKER://:9093,PLAINTEXT://:9092,CONTROLLER://:9094";
cmd += " -Dkafka.interbroker.listener.name=BROKER";
cmd += " -Dkafka.controller.listener.names=CONTROLLER";
cmd += " -Dkafka.listener.security.protocol.map=BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT";
cmd += " -Dkafka.early.start.listeners=BROKER,CONTROLLER,PLAINTEXT";
}
if (additionalArgs != null) {
cmd += " " + additionalArgs;
}

//noinspection OctalInteger
copyFileToContainer(
Transferable.of(cmd.getBytes(StandardCharsets.UTF_8), 0777),
STARTER_SCRIPT);
}

private String getKafkaAdvertisedListeners() {
List<String> addresses = new ArrayList<>();
if (useSharedNetwork) {
addresses.add(String.format("BROKER://%s:9093", hostName));
}
// See https://github.com/quarkusio/quarkus/issues/21819
// Kafka is always exposed to the Docker host network
addresses.add(String.format("PLAINTEXT://%s:%d", getHost(), getExposedKafkaPort()));
return String.join(",", addresses);
}

public int getExposedKafkaPort() {
return exposedPort;
}

@Override
protected void configure() {
super.configure();

addExposedPort(DevServicesKafkaProcessor.KAFKA_PORT);
hostName = ConfigureUtil.configureSharedNetwork(this, "kafka");

if (fixedExposedPort != null) {
addFixedExposedPort(fixedExposedPort, DevServicesKafkaProcessor.KAFKA_PORT);
}
}

public String getBootstrapServers() {
return getKafkaAdvertisedListeners();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN
# enable health check
quarkus.kafka.health.enabled=true

quarkus.kafka.devservices.provider=kafka-native
quarkus.kafka.devservices.topic-partitions.test=2
quarkus.kafka.devservices.topic-partitions.test-consumer=3
quarkus.kafka.devservices.topic-partitions-timeout=4S