From abc20edaf8dc1ef272786c08b04f093af58422bd Mon Sep 17 00:00:00 2001 From: Logan HAUSPIE Date: Thu, 24 Oct 2019 00:38:00 +0200 Subject: [PATCH] change topics type from string to list --- .../runtime/KafkaStreamsRuntimeConfig.java | 9 +++++++-- .../runtime/KafkaStreamsTopologyManager.java | 16 +++------------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java index 4259f88634c3f..a18e3cec8a0ed 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java @@ -3,6 +3,7 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import io.quarkus.runtime.annotations.ConfigItem; import io.quarkus.runtime.annotations.ConfigPhase; @@ -30,15 +31,19 @@ public class KafkaStreamsRuntimeConfig { public Optional applicationServer; /** - * A comma-separated list of topic names processed by this stream processing application. + * A comma-separated list of topic names. * The pipeline will only be started once all these topics are present in the Kafka cluster. */ @ConfigItem - public Optional topics; + public List topics; @Override public String toString() { return "KafkaStreamsRuntimeConfig [applicationId=" + applicationId + ", bootstrapServers=" + bootstrapServers + ", applicationServer=" + applicationServer + ", topics=" + topics + "]"; } + + public List getTrimmedTopics() { + return topics.stream().map(String::trim).collect(Collectors.toList()); + } } diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java index e7ffbcb6a9b5a..b1ae0f5af5ea6 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java @@ -1,8 +1,7 @@ package io.quarkus.kafka.streams.runtime; import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.Collections; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -14,7 +13,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.enterprise.context.ApplicationScoped; @@ -46,7 +44,6 @@ public class KafkaStreamsTopologyManager { private static final Logger LOGGER = Logger.getLogger(KafkaStreamsTopologyManager.class.getName()); - private static final Pattern COMMA_PATTERN = Pattern.compile(","); private final ExecutorService executor; private KafkaStreams streams; @@ -107,18 +104,11 @@ void onStart(@Observes StartupEvent ev) { Properties streamsProperties = getStreamsProperties(properties, bootstrapServersConfig, runtimeConfig); - Set topicsToAwait = runtimeConfig.topics - .map(n -> COMMA_PATTERN.split(n)) - .map(Arrays::asList) - .map(HashSet::new) - .map(Collections::unmodifiableSet) - .orElseGet(Collections::emptySet); - streams = new KafkaStreams(topology.get(), streamsProperties); executor.execute(() -> { try { - waitForTopicsToBeCreated(topicsToAwait, bootstrapServersConfig); + waitForTopicsToBeCreated(runtimeConfig.getTrimmedTopics(), bootstrapServersConfig); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; @@ -141,7 +131,7 @@ public KafkaStreams getStreams() { return streams; } - private void waitForTopicsToBeCreated(Set topicsToAwait, String bootstrapServersConfig) + private void waitForTopicsToBeCreated(Collection topicsToAwait, String bootstrapServersConfig) throws InterruptedException { final Map config = new HashMap<>(); config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);