diff --git a/docs/src/main/asciidoc/kafka-dev-services.adoc b/docs/src/main/asciidoc/kafka-dev-services.adoc new file mode 100644 index 0000000000000..0a56659d8b331 --- /dev/null +++ b/docs/src/main/asciidoc/kafka-dev-services.adoc @@ -0,0 +1,40 @@ +//// +This guide is maintained in the main Quarkus repository +and pull requests should be submitted there: +https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc +//// += Dev Services for Kafka + +include::./attributes.adoc[] + +Dev Services for Kafka automatically starts a Kafka broker in dev mode and when running tests. +So, you don't have to start a broker manually. +The application is configured automatically. + +IMPORTANT: Because starting a Kafka broker can be long, Dev Services for Kafka uses https://vectorized.io/redpanda[Red Panda], a Kafka compatible broker which starts in ~1 second. + +== Enabling / Disabling Dev Services for Kafka + +Dev Services for Kafka is automatically enabled unless: + +- `quarkus.kafka.devservices.enabled` is set to `false` +- the `kafka.bootstrap.servers` is configured +- all the Reactive Messaging Kafka channels have the `bootstrap.servers` attribute set + +Dev Services for Kafka relies on Docker to start the broker. +If your environment does not support Docker, you will need to start the broker manually, or connect to an already running broker. +You can configure the broker address using `kafka.bootstrap.servers`. + +== Setting the port + +By default, Dev Services for Kafka picks a random port and configure the application. +You can set the port by configuring the `quarkus.kafka.devservices.port` property. + +Note that the Kafka advertised address is automatically configured with the chosen port. + +== Configuring the image + +By default, Dev Services for Kafka uses: `vectorized/redpanda:latest`. +You can select any version from https://hub.docker.com/r/vectorized/redpanda. + +IMPORTANT: Dev Services for Kafka only support Red Panda. diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc index a5b38963d2114..ec0eb7825a97b 100644 --- a/docs/src/main/asciidoc/kafka.adoc +++ b/docs/src/main/asciidoc/kafka.adoc @@ -74,48 +74,12 @@ This will add the following to your `pom.xml`: ---- -== Starting Kafka - -Then, we need a Kafka cluster. -You can follow the instructions from the https://kafka.apache.org/quickstart[Apache Kafka web site] or create a `docker-compose.yaml` file with the following content: - -[source, yaml] ----- -version: '2' - -services: - - zookeeper: - image: strimzi/kafka:0.19.0-kafka-2.5.0 - command: [ - "sh", "-c", - "bin/zookeeper-server-start.sh config/zookeeper.properties" - ] - ports: - - "2181:2181" - environment: - LOG_DIR: /tmp/logs - - kafka: - image: strimzi/kafka:0.19.0-kafka-2.5.0 - command: [ - "sh", "-c", - "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" - ] - depends_on: - - zookeeper - ports: - - "9092:9092" - environment: - LOG_DIR: "/tmp/logs" - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 ----- - -Once created, run `docker-compose up`. - -NOTE: This is a development cluster, do not use in production. +[TIP] +.Dev Services +==== +No need to start a Kafka broker when using the dev mode or for tests. Quarkus starts a broker for you automatically. +See xref:kafka-dev-services.adoc[Dev Services for Kafka] for details. +==== == The price generator @@ -256,9 +220,6 @@ The `channel-name` segment must match the value set in the `@Incoming` and `@Out [source,properties] ---- -# Configure the SmallRye Kafka connector -kafka.bootstrap.servers=localhost:9092 - # Configure the Kafka sink (we write to it) mp.messaging.outgoing.generated-price.connector=smallrye-kafka mp.messaging.outgoing.generated-price.topic=prices @@ -286,6 +247,10 @@ So, if you are running in an environment only routing traffic to containers that More details about health reporting is given in <>. ==== +The previous configuration does not set the Kafka _bootstrap.servers_. +Quarkus starts a Kafka broker automatically and configures the application. +See xref:kafka-dev-services.adoc[Dev Services for Kafka] for more details. + == The HTML page Final touch, the HTML page reading the converted prices using SSE. @@ -328,8 +293,7 @@ Nothing spectacular here. On each received price, it updates the page. == Get it running -If you followed the instructions, you should have Kafka running. -Then, you just need to run the application using: +You just need to run the application using: [source,bash] ---- @@ -338,15 +302,66 @@ Then, you just need to run the application using: Open `http://localhost:8080/prices.html` in your browser. -NOTE: If you started the Kafka broker with docker compose, stop it using `CTRL+C` followed by `docker-compose down`. +== Running in JVM or Native mode + +When not running in dev or test mode, you will need to start your Kafka broker. +You can follow the instructions from the https://kafka.apache.org/quickstart[Apache Kafka web site] or create a `docker-compose.yaml` file with the following content: + +[source, yaml] +---- +version: '2' + +services: + + zookeeper: + image: strimzi/kafka:0.19.0-kafka-2.5.0 + command: [ + "sh", "-c", + "bin/zookeeper-server-start.sh config/zookeeper.properties" + ] + ports: + - "2181:2181" + environment: + LOG_DIR: /tmp/logs + + kafka: + image: strimzi/kafka:0.19.0-kafka-2.5.0 + command: [ + "sh", "-c", + "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" + ] + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + LOG_DIR: "/tmp/logs" + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 +---- + +Once created, run `docker-compose up`. + +NOTE: This is a development cluster, do not use in production. + +You can build and run the application in JVM mode with: + +[source, bash] +---- +./mvnw package +java -jar target/quarkus-app/quarkus-run.jar +---- -== Running Native +NOTE: By default, the application tries to connect to a Kafka broker listening at `localhost:9092`. +You can configure the bootstrap server using: `java -Dkafka.bootstrap.servers=... -jar target/quarkus-app/quarkus-run.jar` -You can build the native executable with: +You can build and run the native executable with: [source,bash] ---- ./mvnw package -Pnative +./target/kafka-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092 ---- == Imperative usage diff --git a/extensions/kafka-client/deployment/pom.xml b/extensions/kafka-client/deployment/pom.xml index 91301064431a1..b065abca2722c 100644 --- a/extensions/kafka-client/deployment/pom.xml +++ b/extensions/kafka-client/deployment/pom.xml @@ -39,6 +39,10 @@ io.quarkus quarkus-caffeine-deployment + + org.testcontainers + testcontainers + diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaBrokerBuildItem.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaBrokerBuildItem.java new file mode 100644 index 0000000000000..9889c7c577e10 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaBrokerBuildItem.java @@ -0,0 +1,17 @@ +package io.quarkus.kafka.client.deployment; + +import io.quarkus.builder.item.SimpleBuildItem; + +public final class DevServicesKafkaBrokerBuildItem extends SimpleBuildItem { + + final String bootstrapServers; + + public DevServicesKafkaBrokerBuildItem(String bs) { + this.bootstrapServers = bs; + } + + public String getBootstrapServers() { + return bootstrapServers; + } + +} diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java new file mode 100644 index 0000000000000..89f868139a740 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java @@ -0,0 +1,283 @@ +package io.quarkus.kafka.client.deployment; + +import java.io.Closeable; +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; +import org.jboss.logging.Logger; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.utility.DockerImageName; + +import com.github.dockerjava.api.command.InspectContainerResponse; + +import io.quarkus.bootstrap.classloading.QuarkusClassLoader; +import io.quarkus.deployment.IsDockerWorking; +import io.quarkus.deployment.IsNormal; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.LaunchModeBuildItem; +import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; +import io.quarkus.deployment.builditem.ServiceStartBuildItem; +import io.quarkus.runtime.LaunchMode; +import io.quarkus.runtime.configuration.ConfigUtils; + +/** + * Starts a Kafka broker as dev service if needed. + */ +public class DevServicesKafkaProcessor { + + private static final Logger log = Logger.getLogger(DevServicesKafkaProcessor.class); + private static final int KAFKA_PORT = 9092; + private static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + + static volatile Closeable closeable; + static volatile KafkaDevServiceCfg cfg; + static volatile boolean first = true; + + private final IsDockerWorking isDockerWorking = new IsDockerWorking(true); + + @BuildStep(onlyIfNot = IsNormal.class) + public DevServicesKafkaBrokerBuildItem startKafkaDevService( + LaunchModeBuildItem launchMode, + KafkaBuildTimeConfig kafkaClientBuildTimeConfig, + BuildProducer runTimeConfiguration, + BuildProducer serviceStartBuildItemBuildProducer) { + + KafkaDevServiceCfg configuration = getConfiguration(kafkaClientBuildTimeConfig); + + if (closeable != null) { + boolean shouldShutdownTheBroker = launchMode.getLaunchMode() == LaunchMode.TEST; + if (!shouldShutdownTheBroker) { + shouldShutdownTheBroker = !configuration.equals(cfg); + } + if (!shouldShutdownTheBroker) { + return null; + } + shutdownBroker(); + cfg = null; + } + + KafkaBroker kafkaBroker = startKafka(configuration); + DevServicesKafkaBrokerBuildItem bootstrapServers = null; + if (kafkaBroker != null) { + closeable = kafkaBroker.getCloseable(); + runTimeConfiguration.produce(new RunTimeConfigurationDefaultBuildItem( + KAFKA_BOOTSTRAP_SERVERS, kafkaBroker.getBootstrapServers())); + bootstrapServers = new DevServicesKafkaBrokerBuildItem(kafkaBroker.getBootstrapServers()); + } + + // Configure the watch dog + if (first) { + first = false; + Runnable closeTask = new Runnable() { + @Override + public void run() { + if (closeable != null) { + shutdownBroker(); + } + first = true; + closeable = null; + cfg = null; + } + }; + QuarkusClassLoader cl = (QuarkusClassLoader) Thread.currentThread().getContextClassLoader(); + ((QuarkusClassLoader) cl.parent()).addCloseTask(closeTask); + Thread closeHookThread = new Thread(closeTask, "Kafka container shutdown thread"); + Runtime.getRuntime().addShutdownHook(closeHookThread); + ((QuarkusClassLoader) cl.parent()).addCloseTask(new Runnable() { + @Override + public void run() { + Runtime.getRuntime().removeShutdownHook(closeHookThread); + } + }); + } + cfg = configuration; + return bootstrapServers; + } + + private void shutdownBroker() { + if (closeable != null) { + try { + closeable.close(); + } catch (Throwable e) { + log.error("Failed to stop the Kafka broker", e); + } finally { + closeable = null; + } + } + } + + private KafkaBroker startKafka(KafkaDevServiceCfg config) { + if (!config.devServicesEnabled) { + // explicitly disabled + log.debug("Not starting dev services for Kafka, as it has been disabled in the config."); + return null; + } + + if (!isDockerWorking.getAsBoolean()) { + log.warn("Docker isn't working, please configure the Kafka bootstrap servers property (kafka.bootstrap.servers)."); + return null; + } + + // Check if kafka.bootstrap.servers is set + if (ConfigUtils.isPropertyPresent(KAFKA_BOOTSTRAP_SERVERS)) { + log.debug("Not starting dev services for Kafka, the kafka.bootstrap.servers is configured."); + return null; + } + + // Verify that we have kafka channels without bootstrap.servers + if (!hasKafkaChannelWithoutBootstrapServers()) { + log.debug("Not starting dev services for Kafka, all the channels are configured."); + return null; + } + + // Starting the broker + RedPandaKafkaContainer container = new RedPandaKafkaContainer( + DockerImageName.parse(config.imageName), + config.fixedExposedPort); + container.start(); + + return new KafkaBroker( + container.getBootstrapServers(), + new Closeable() { + @Override + public void close() { + container.close(); + } + }); + } + + private boolean hasKafkaChannelWithoutBootstrapServers() { + Config config = ConfigProvider.getConfig(); + for (String name : config.getPropertyNames()) { + boolean isIncoming = name.startsWith("mp.messaging.incoming."); + boolean isOutgoing = name.startsWith("mp.messaging.outgoing."); + boolean isConnector = name.endsWith(".connector"); + boolean isConfigured = false; + if ((isIncoming || isOutgoing) && isConnector) { + isConfigured = ConfigUtils.isPropertyPresent(name.replace(".connector", ".bootstrap.servers")); + } + if (!isConfigured) { + return true; + } + } + return false; + } + + private KafkaDevServiceCfg getConfiguration(KafkaBuildTimeConfig cfg) { + KafkaDevServicesBuildTimeConfig devServicesConfig = cfg.devservices; + boolean devServicesEnabled = devServicesConfig.enabled.orElse(true); + return new KafkaDevServiceCfg(devServicesEnabled, + devServicesConfig.imageName, + devServicesConfig.port.orElse(0)); + } + + private static class KafkaBroker { + private final String url; + private final Closeable closeable; + + public KafkaBroker(String url, Closeable closeable) { + this.url = url; + this.closeable = closeable; + } + + public String getBootstrapServers() { + return url; + } + + public Closeable getCloseable() { + return closeable; + } + } + + private static final class KafkaDevServiceCfg { + private final boolean devServicesEnabled; + private final String imageName; + private final Integer fixedExposedPort; + + public KafkaDevServiceCfg(boolean devServicesEnabled, String imageName, Integer fixedExposedPort) { + this.devServicesEnabled = devServicesEnabled; + this.imageName = imageName; + this.fixedExposedPort = fixedExposedPort; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KafkaDevServiceCfg that = (KafkaDevServiceCfg) o; + return devServicesEnabled == that.devServicesEnabled && Objects.equals(imageName, that.imageName) + && Objects.equals(fixedExposedPort, that.fixedExposedPort); + } + + @Override + public int hashCode() { + return Objects.hash(devServicesEnabled, imageName, fixedExposedPort); + } + } + + /** + * Container configuring and starting the Red Panda broker. + * See https://vectorized.io/docs/quick-start-docker/ + */ + private static final class RedPandaKafkaContainer extends GenericContainer { + + private final int port; + + private static final String STARTER_SCRIPT = "/redpanda.sh"; + + private RedPandaKafkaContainer(DockerImageName dockerImageName, int fixedExposedPort) { + super(dockerImageName); + this.port = fixedExposedPort; + withNetwork(Network.SHARED); + withExposedPorts(KAFKA_PORT); + // For red panda, we need to start the broker - see https://vectorized.io/docs/quick-start-docker/ + if (dockerImageName.getRepository().equals("vectorized/redpanda")) { + withCreateContainerCmdModifier(cmd -> { + cmd.withEntrypoint("sh"); + }); + withCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT); + waitingFor(Wait.forLogMessage(".*Started Kafka API server.*", 1)); + } else { + throw new IllegalArgumentException("Only vectorized/redpanda images are supported"); + } + } + + @Override + protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) { + super.containerIsStarting(containerInfo, reused); + + // Start and configure the advertised address + String command = "#!/bin/bash\n"; + command += "/usr/bin/rpk redpanda start --check=false --node-id 0 "; + command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 "; + command += "--advertise-kafka-addr PLAINTEXT://kafka:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092); + + //noinspection OctalInteger + copyFileToContainer( + Transferable.of(command.getBytes(StandardCharsets.UTF_8), 0777), + STARTER_SCRIPT); + } + + @Override + protected void configure() { + super.configure(); + if (port > 0) { + addFixedExposedPort(port, KAFKA_PORT); + } + } + + public String getBootstrapServers() { + return String.format("PLAINTEXT://%s:%s", getContainerIpAddress(), getMappedPort(KAFKA_PORT)); + } + } +} diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeConfig.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeConfig.java index 00b5ef429c662..2ef777a7c5ad1 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeConfig.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeConfig.java @@ -22,4 +22,10 @@ public class KafkaBuildTimeConfig { */ @ConfigItem(name = "snappy.enabled", defaultValue = "false") public boolean snappyEnabled; + + /** + * Configuration for DevServices. DevServices allows Quarkus to automatically start Kafka in dev and test mode. + */ + @ConfigItem + public KafkaDevServicesBuildTimeConfig devservices; } diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java new file mode 100644 index 0000000000000..4ddfe9b9e8317 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java @@ -0,0 +1,36 @@ +package io.quarkus.kafka.client.deployment; + +import java.util.Optional; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.quarkus.runtime.annotations.ConfigItem; + +@ConfigGroup +public class KafkaDevServicesBuildTimeConfig { + + /** + * If Dev Services for Kafka has been explicitly enabled or disabled. Dev Services are generally enabled + * by default, unless there is an existing configuration present. For Kafka, Dev Services starts a broker unless + * {@code kafka.bootstrap.servers} is set or if all the Reactive Messaging Kafka channel are configured with a + * {@code bootstrap.servers}. + */ + @ConfigItem + public Optional enabled = Optional.empty(); + + /** + * Optional fixed port the dev service will listen to. + *

+ * If not defined, the port will be chosen randomly. + */ + @ConfigItem + public Optional port; + + /** + * The Kafka image to use. + * Note that only Red Panda images are supported. + * See https://vectorized.io/docs/quick-start-docker/ and https://hub.docker.com/r/vectorized/redpanda + */ + @ConfigItem(defaultValue = "vectorized/redpanda:v21.5.5") + public String imageName; + +}