diff --git a/docs/src/main/asciidoc/kafka-streams.adoc b/docs/src/main/asciidoc/kafka-streams.adoc index fb239a02f6058..cc7bd9588e86c 100644 --- a/docs/src/main/asciidoc/kafka-streams.adoc +++ b/docs/src/main/asciidoc/kafka-streams.adoc @@ -468,6 +468,8 @@ e.g. via environment variables or system properties. `topics` is specific to Quarkus: the application will wait for all the given topics to exist before launching the Kafka Streams engine. This is to done to gracefully await the creation of topics that don't yet exist at application startup time. +TIP: Alternatively, you can use `kafka.bootstrap.servers` instead of `quarkus.kafka-streams.bootstrap-servers` as you did in the _generator_ project above. + All the properties within the `kafka-streams` namespace are passed through as-is to the Kafka Streams engine. Changing their values requires a rebuild of the application. @@ -554,6 +556,8 @@ networks: To launch all the containers, building the `producer` and `aggregator` container images, run `docker-compose up --build`. +TIP: Instead of `QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS`, you can use `KAFKA_BOOTSTRAP_SERVERS`. + You should see log statements from the `producer` application about messages being sent to the "temperature-values" topic. Now run an instance of the _debezium/tooling_ image, attaching to the same network all the other containers run in. diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java index bf90ed4a76dfb..0da408c03c29d 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java @@ -1,5 +1,7 @@ package io.quarkus.kafka.streams.runtime; +import static io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig.DEFAULT_KAFKA_BROKER; + import java.net.InetSocketAddress; import java.time.Duration; import java.util.Collection; @@ -36,6 +38,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.StateRestoreListener; +import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.logging.Logger; import io.quarkus.arc.Arc; @@ -79,6 +82,11 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream Properties buildTimeProperties = kafkaStreamsSupport.getProperties(); String bootstrapServersConfig = asString(runtimeConfig.bootstrapServers); + if (DEFAULT_KAFKA_BROKER.equalsIgnoreCase(bootstrapServersConfig)) { + // Try to see if kafka.bootstrap.servers is set, if so, use that value, if not, keep localhost:9092 + bootstrapServersConfig = ConfigProvider.getConfig().getOptionalValue("kafka.bootstrap.servers", String.class) + .orElse(bootstrapServersConfig); + } Properties kafkaStreamsProperties = getStreamsProperties(buildTimeProperties, bootstrapServersConfig, runtimeConfig); this.kafkaAdminClient = Admin.create(getAdminClientConfig(kafkaStreamsProperties)); diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java index 3fe3264717734..c42fc5a58f488 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java @@ -12,6 +12,11 @@ @ConfigRoot(name = "kafka-streams", phase = ConfigPhase.RUN_TIME) public class KafkaStreamsRuntimeConfig { + /** + * Default Kafka bootstrap server. + */ + public static final String DEFAULT_KAFKA_BROKER = "localhost:9012"; + /** * A unique identifier for this Kafka Streams application. * If not set, defaults to quarkus.application.name. @@ -20,9 +25,10 @@ public class KafkaStreamsRuntimeConfig { public String applicationId; /** - * A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s) + * A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s). + * If not set, fallback to {@code kafka.bootstrap.servers}, and if not set either use {@code localhost:9012}. */ - @ConfigItem(defaultValue = "localhost:9012") + @ConfigItem(defaultValue = DEFAULT_KAFKA_BROKER) public List bootstrapServers; /**