Skip to content

Commit

Permalink
Get rid of runtime lambdas in Kafka Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
gsmet committed Apr 28, 2020
1 parent 9805549 commit 16df4e9
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ private static boolean isKafkaStreamsProperty(String prefix, String property) {
private static void includeKafkaStreamsProperty(Config config, Properties kafkaStreamsProperties, String prefix,
String property) {
Optional<String> value = config.getOptionalValue(property, String.class);
value.ifPresent(s -> kafkaStreamsProperties.setProperty(property.substring(prefix.length()), s));
if (value.isPresent()) {
kafkaStreamsProperties.setProperty(property.substring(prefix.length()), value.get());
}
}

private static void addHotReplacementInterceptor(Properties kafkaStreamsProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.rocksdb.RocksDB;

import io.quarkus.arc.Arc;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.arc.runtime.BeanContainerListener;
import io.quarkus.runtime.annotations.Recorder;

Expand All @@ -20,9 +21,13 @@ public void configureRuntimeProperties(KafkaStreamsRuntimeConfig runtimeConfig)
}

public BeanContainerListener configure(Properties properties) {
return container -> {
KafkaStreamsTopologyManager instance = container.instance(KafkaStreamsTopologyManager.class);
instance.configure(properties);
return new BeanContainerListener() {

@Override
public void created(BeanContainer container) {
KafkaStreamsTopologyManager instance = container.instance(KafkaStreamsTopologyManager.class);
instance.configure(properties);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ private static Properties getStreamsProperties(Properties properties, String boo
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, runtimeConfig.applicationId);

// app id
runtimeConfig.applicationServer.ifPresent(s -> streamsProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, s));
if (runtimeConfig.applicationServer.isPresent()) {
streamsProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, runtimeConfig.applicationServer.get());
}

// schema registry
runtimeConfig.schemaRegistryUrl.ifPresent(s -> streamsProperties.put(runtimeConfig.schemaRegistryKey, s));
if (runtimeConfig.schemaRegistryUrl.isPresent()) {
streamsProperties.put(runtimeConfig.schemaRegistryKey, runtimeConfig.schemaRegistryUrl.get());
}

// sasl
SaslConfig sc = runtimeConfig.sasl;
Expand All @@ -118,9 +122,10 @@ private static Properties getStreamsProperties(Properties properties, String boo
setProperty(sc.loginRefreshWindowFactor, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR);
setProperty(sc.loginRefreshWindowJitter, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER);

Function<Duration, String> d2s = d -> String.valueOf(d.getSeconds());
setProperty(sc.loginRefreshMinPeriod, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, d2s);
setProperty(sc.loginRefreshBuffer, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, d2s);
setProperty(sc.loginRefreshMinPeriod, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS,
DurationToSecondsFunction.INSTANCE);
setProperty(sc.loginRefreshBuffer, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS,
DurationToSecondsFunction.INSTANCE);
}

// ssl
Expand Down Expand Up @@ -158,7 +163,9 @@ private static <T> void setProperty(Optional<T> property, Properties properties,
}

private static <T> void setProperty(Optional<T> property, Properties properties, String key, Function<T, String> fn) {
property.ifPresent(p -> properties.put(key, fn.apply(p)));
if (property.isPresent()) {
properties.put(key, fn.apply(property.get()));
}
}

private static String asString(List<InetSocketAddress> addresses) {
Expand All @@ -179,15 +186,19 @@ void onStart(@Observes StartupEvent ev) {
streams = new KafkaStreams(topology.get(), streamsProperties);
adminClientConfig = getAdminClientConfig(streamsProperties);

executor.execute(() -> {
try {
waitForTopicsToBeCreated(runtimeConfig.getTrimmedTopics());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
executor.execute(new Runnable() {

@Override
public void run() {
try {
waitForTopicsToBeCreated(runtimeConfig.getTrimmedTopics());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
LOGGER.debug("Starting Kafka Streams pipeline");
streams.start();
}
LOGGER.debug("Starting Kafka Streams pipeline");
streams.start();
});
}

Expand Down Expand Up @@ -271,4 +282,14 @@ public void setRuntimeConfig(KafkaStreamsRuntimeConfig runtimeConfig) {
public void configure(Properties properties) {
this.properties = properties;
}

private static final class DurationToSecondsFunction implements Function<Duration, String> {

private static final DurationToSecondsFunction INSTANCE = new DurationToSecondsFunction();

@Override
public String apply(Duration d) {
return String.valueOf(d.getSeconds());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ public void run() {
if (nextUpdate < System.currentTimeMillis()) {
synchronized (this) {
if (nextUpdate < System.currentTimeMillis()) {
executor.execute(() -> {
try {
context.doScan(true);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
executor.execute(new Runnable() {
@Override
public void run() {
try {
context.doScan(true);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
// we update at most once every 2s
Expand Down

0 comments on commit 16df4e9

Please sign in to comment.