Skip to content

Commit

Permalink
Merge pull request #4775 from lhauspie/feature/4706_kafka_stream_conf…
Browse files Browse the repository at this point in the history
…ig_topics_should_be_a_list

Change Kafka Stream config topics type from String to List<String>
  • Loading branch information
gsmet authored Oct 24, 2019
2 parents 7e5a87a + abc20ed commit f1651c3
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
Expand Down Expand Up @@ -30,15 +31,19 @@ public class KafkaStreamsRuntimeConfig {
public Optional<String> applicationServer;

/**
* A comma-separated list of topic names processed by this stream processing application.
* A comma-separated list of topic names.
* The pipeline will only be started once all these topics are present in the Kafka cluster.
*/
@ConfigItem
public Optional<String> topics;
public List<String> topics;

@Override
public String toString() {
return "KafkaStreamsRuntimeConfig [applicationId=" + applicationId + ", bootstrapServers=" + bootstrapServers
+ ", applicationServer=" + applicationServer + ", topics=" + topics + "]";
}

public List<String> getTrimmedTopics() {
return topics.stream().map(String::trim).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package io.quarkus.kafka.streams.runtime;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -14,7 +13,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import javax.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -46,7 +44,6 @@
public class KafkaStreamsTopologyManager {

private static final Logger LOGGER = Logger.getLogger(KafkaStreamsTopologyManager.class.getName());
private static final Pattern COMMA_PATTERN = Pattern.compile(",");

private final ExecutorService executor;
private KafkaStreams streams;
Expand Down Expand Up @@ -107,18 +104,11 @@ void onStart(@Observes StartupEvent ev) {

Properties streamsProperties = getStreamsProperties(properties, bootstrapServersConfig, runtimeConfig);

Set<String> topicsToAwait = runtimeConfig.topics
.map(n -> COMMA_PATTERN.split(n))
.map(Arrays::asList)
.map(HashSet::new)
.map(Collections::unmodifiableSet)
.orElseGet(Collections::emptySet);

streams = new KafkaStreams(topology.get(), streamsProperties);

executor.execute(() -> {
try {
waitForTopicsToBeCreated(topicsToAwait, bootstrapServersConfig);
waitForTopicsToBeCreated(runtimeConfig.getTrimmedTopics(), bootstrapServersConfig);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
Expand All @@ -141,7 +131,7 @@ public KafkaStreams getStreams() {
return streams;
}

private void waitForTopicsToBeCreated(Set<String> topicsToAwait, String bootstrapServersConfig)
private void waitForTopicsToBeCreated(Collection<String> topicsToAwait, String bootstrapServersConfig)
throws InterruptedException {
final Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
Expand Down

0 comments on commit f1651c3

Please sign in to comment.