Skip to content

Commit

Permalink
Allow Kafka Streams to be configured using "kafka.bootstrap.servers"
Browse files Browse the repository at this point in the history
"kafka.bootstrap.servers" is the property used by the Reactive Messaging Kafka connector.

Related to quarkusio#13889.
  • Loading branch information
cescoffier authored and cem.nura committed Jan 20, 2021
1 parent 6a80002 commit ac80ac3
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
4 changes: 4 additions & 0 deletions docs/src/main/asciidoc/kafka-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,8 @@ e.g. via environment variables or system properties.
`topics` is specific to Quarkus: the application will wait for all the given topics to exist before launching the Kafka Streams engine.
This is to done to gracefully await the creation of topics that don't yet exist at application startup time.

TIP: Alternatively, you can use `kafka.bootstrap.servers` instead of `quarkus.kafka-streams.bootstrap-servers` as you did in the _generator_ project above.

All the properties within the `kafka-streams` namespace are passed through as-is to the Kafka Streams engine.
Changing their values requires a rebuild of the application.

Expand Down Expand Up @@ -554,6 +556,8 @@ networks:
To launch all the containers, building the `producer` and `aggregator` container images,
run `docker-compose up --build`.

TIP: Instead of `QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS`, you can use `KAFKA_BOOTSTRAP_SERVERS`.

You should see log statements from the `producer` application about messages being sent to the "temperature-values" topic.

Now run an instance of the _debezium/tooling_ image, attaching to the same network all the other containers run in.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkus.kafka.streams.runtime;

import static io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig.DEFAULT_KAFKA_BROKER;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
Expand Down Expand Up @@ -36,6 +38,7 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;

import io.quarkus.arc.Arc;
Expand Down Expand Up @@ -79,6 +82,11 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream
Properties buildTimeProperties = kafkaStreamsSupport.getProperties();

String bootstrapServersConfig = asString(runtimeConfig.bootstrapServers);
if (DEFAULT_KAFKA_BROKER.equalsIgnoreCase(bootstrapServersConfig)) {
// Try to see if kafka.bootstrap.servers is set, if so, use that value, if not, keep localhost:9092
bootstrapServersConfig = ConfigProvider.getConfig().getOptionalValue("kafka.bootstrap.servers", String.class)
.orElse(bootstrapServersConfig);
}
Properties kafkaStreamsProperties = getStreamsProperties(buildTimeProperties, bootstrapServersConfig, runtimeConfig);
this.kafkaAdminClient = Admin.create(getAdminClientConfig(kafkaStreamsProperties));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
@ConfigRoot(name = "kafka-streams", phase = ConfigPhase.RUN_TIME)
public class KafkaStreamsRuntimeConfig {

/**
* Default Kafka bootstrap server.
*/
public static final String DEFAULT_KAFKA_BROKER = "localhost:9012";

/**
* A unique identifier for this Kafka Streams application.
* If not set, defaults to quarkus.application.name.
Expand All @@ -20,9 +25,10 @@ public class KafkaStreamsRuntimeConfig {
public String applicationId;

/**
* A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s)
* A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s).
* If not set, fallback to {@code kafka.bootstrap.servers}, and if not set either use {@code localhost:9012}.
*/
@ConfigItem(defaultValue = "localhost:9012")
@ConfigItem(defaultValue = DEFAULT_KAFKA_BROKER)
public List<InetSocketAddress> bootstrapServers;

/**
Expand Down

0 comments on commit ac80ac3

Please sign in to comment.