Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Kafka Streams to be configured using "kafka.bootstrap.servers" #14095

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/src/main/asciidoc/kafka-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,8 @@ e.g. via environment variables or system properties.
`topics` is specific to Quarkus: the application will wait for all the given topics to exist before launching the Kafka Streams engine.
This is to done to gracefully await the creation of topics that don't yet exist at application startup time.

TIP: Alternatively, you can use `kafka.bootstrap.servers` instead of `quarkus.kafka-streams.bootstrap-servers` as you did in the _generator_ project above.

All the properties within the `kafka-streams` namespace are passed through as-is to the Kafka Streams engine.
Changing their values requires a rebuild of the application.

Expand Down Expand Up @@ -554,6 +556,8 @@ networks:
To launch all the containers, building the `producer` and `aggregator` container images,
run `docker-compose up --build`.

TIP: Instead of `QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS`, you can use `KAFKA_BOOTSTRAP_SERVERS`.

You should see log statements from the `producer` application about messages being sent to the "temperature-values" topic.

Now run an instance of the _debezium/tooling_ image, attaching to the same network all the other containers run in.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkus.kafka.streams.runtime;

import static io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig.DEFAULT_KAFKA_BROKER;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
Expand Down Expand Up @@ -36,6 +38,7 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;

import io.quarkus.arc.Arc;
Expand Down Expand Up @@ -79,6 +82,11 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream
Properties buildTimeProperties = kafkaStreamsSupport.getProperties();

String bootstrapServersConfig = asString(runtimeConfig.bootstrapServers);
if (DEFAULT_KAFKA_BROKER.equalsIgnoreCase(bootstrapServersConfig)) {
// Try to see if kafka.bootstrap.servers is set, if so, use that value, if not, keep localhost:9092
bootstrapServersConfig = ConfigProvider.getConfig().getOptionalValue("kafka.bootstrap.servers", String.class)
.orElse(bootstrapServersConfig);
}
Properties kafkaStreamsProperties = getStreamsProperties(buildTimeProperties, bootstrapServersConfig, runtimeConfig);
this.kafkaAdminClient = Admin.create(getAdminClientConfig(kafkaStreamsProperties));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
@ConfigRoot(name = "kafka-streams", phase = ConfigPhase.RUN_TIME)
public class KafkaStreamsRuntimeConfig {

/**
* Default Kafka bootstrap server.
*/
public static final String DEFAULT_KAFKA_BROKER = "localhost:9012";

/**
* A unique identifier for this Kafka Streams application.
* If not set, defaults to quarkus.application.name.
Expand All @@ -20,9 +25,10 @@ public class KafkaStreamsRuntimeConfig {
public String applicationId;

/**
* A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s)
* A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s).
* If not set, fallback to {@code kafka.bootstrap.servers}, and if not set either use {@code localhost:9012}.
*/
@ConfigItem(defaultValue = "localhost:9012")
@ConfigItem(defaultValue = DEFAULT_KAFKA_BROKER)
public List<InetSocketAddress> bootstrapServers;

/**
Expand Down