From 16df4e9ac65e8caf28c7f92ddfed01161fbb2c69 Mon Sep 17 00:00:00 2001 From: Guillaume Smet Date: Tue, 28 Apr 2020 14:18:39 +0200 Subject: [PATCH] Get rid of runtime lambdas in Kafka Streams --- .../runtime/KafkaStreamsPropertiesUtil.java | 4 +- .../streams/runtime/KafkaStreamsRecorder.java | 11 +++-- .../runtime/KafkaStreamsTopologyManager.java | 49 +++++++++++++------ .../KafkaStreamsHotReplacementSetup.java | 17 ++++--- 4 files changed, 56 insertions(+), 25 deletions(-) diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsPropertiesUtil.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsPropertiesUtil.java index eb7037f1b0e6d7..fe70ba2eab0da3 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsPropertiesUtil.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsPropertiesUtil.java @@ -22,7 +22,9 @@ private static boolean isKafkaStreamsProperty(String prefix, String property) { private static void includeKafkaStreamsProperty(Config config, Properties kafkaStreamsProperties, String prefix, String property) { Optional value = config.getOptionalValue(property, String.class); - value.ifPresent(s -> kafkaStreamsProperties.setProperty(property.substring(prefix.length()), s)); + if (value.isPresent()) { + kafkaStreamsProperties.setProperty(property.substring(prefix.length()), value.get()); + } } private static void addHotReplacementInterceptor(Properties kafkaStreamsProperties) { diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRecorder.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRecorder.java index 23fb68c10b9ed9..8e381f38a99064 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRecorder.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRecorder.java @@ -5,6 +5,7 @@ import org.rocksdb.RocksDB; import io.quarkus.arc.Arc; +import io.quarkus.arc.runtime.BeanContainer; import io.quarkus.arc.runtime.BeanContainerListener; import io.quarkus.runtime.annotations.Recorder; @@ -20,9 +21,13 @@ public void configureRuntimeProperties(KafkaStreamsRuntimeConfig runtimeConfig) } public BeanContainerListener configure(Properties properties) { - return container -> { - KafkaStreamsTopologyManager instance = container.instance(KafkaStreamsTopologyManager.class); - instance.configure(properties); + return new BeanContainerListener() { + + @Override + public void created(BeanContainer container) { + KafkaStreamsTopologyManager instance = container.instance(KafkaStreamsTopologyManager.class); + instance.configure(properties); + } }; } } diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java index 2d490452b80806..3a0bb8212a59c5 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java @@ -93,10 +93,14 @@ private static Properties getStreamsProperties(Properties properties, String boo streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, runtimeConfig.applicationId); // app id - runtimeConfig.applicationServer.ifPresent(s -> streamsProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, s)); + if (runtimeConfig.applicationServer.isPresent()) { + streamsProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, runtimeConfig.applicationServer.get()); + } // schema registry - runtimeConfig.schemaRegistryUrl.ifPresent(s -> streamsProperties.put(runtimeConfig.schemaRegistryKey, s)); + if (runtimeConfig.schemaRegistryUrl.isPresent()) { + streamsProperties.put(runtimeConfig.schemaRegistryKey, runtimeConfig.schemaRegistryUrl.get()); + } // sasl SaslConfig sc = runtimeConfig.sasl; @@ -118,9 +122,10 @@ private static Properties getStreamsProperties(Properties properties, String boo setProperty(sc.loginRefreshWindowFactor, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR); setProperty(sc.loginRefreshWindowJitter, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER); - Function d2s = d -> String.valueOf(d.getSeconds()); - setProperty(sc.loginRefreshMinPeriod, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, d2s); - setProperty(sc.loginRefreshBuffer, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, d2s); + setProperty(sc.loginRefreshMinPeriod, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, + DurationToSecondsFunction.INSTANCE); + setProperty(sc.loginRefreshBuffer, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, + DurationToSecondsFunction.INSTANCE); } // ssl @@ -158,7 +163,9 @@ private static void setProperty(Optional property, Properties properties, } private static void setProperty(Optional property, Properties properties, String key, Function fn) { - property.ifPresent(p -> properties.put(key, fn.apply(p))); + if (property.isPresent()) { + properties.put(key, fn.apply(property.get())); + } } private static String asString(List addresses) { @@ -179,15 +186,19 @@ void onStart(@Observes StartupEvent ev) { streams = new KafkaStreams(topology.get(), streamsProperties); adminClientConfig = getAdminClientConfig(streamsProperties); - executor.execute(() -> { - try { - waitForTopicsToBeCreated(runtimeConfig.getTrimmedTopics()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; + executor.execute(new Runnable() { + + @Override + public void run() { + try { + waitForTopicsToBeCreated(runtimeConfig.getTrimmedTopics()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + LOGGER.debug("Starting Kafka Streams pipeline"); + streams.start(); } - LOGGER.debug("Starting Kafka Streams pipeline"); - streams.start(); }); } @@ -271,4 +282,14 @@ public void setRuntimeConfig(KafkaStreamsRuntimeConfig runtimeConfig) { public void configure(Properties properties) { this.properties = properties; } + + private static final class DurationToSecondsFunction implements Function { + + private static final DurationToSecondsFunction INSTANCE = new DurationToSecondsFunction(); + + @Override + public String apply(Duration d) { + return String.valueOf(d.getSeconds()); + } + } } diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devmode/KafkaStreamsHotReplacementSetup.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devmode/KafkaStreamsHotReplacementSetup.java index dc2374faea2a2d..d5bbcac879bd15 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devmode/KafkaStreamsHotReplacementSetup.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devmode/KafkaStreamsHotReplacementSetup.java @@ -28,13 +28,16 @@ public void run() { if (nextUpdate < System.currentTimeMillis()) { synchronized (this) { if (nextUpdate < System.currentTimeMillis()) { - executor.execute(() -> { - try { - context.doScan(true); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); + executor.execute(new Runnable() { + @Override + public void run() { + try { + context.doScan(true); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } } }); // we update at most once every 2s