From d3d550bbee83c2e315e942e3848aa4090f5b9a85 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Mon, 24 Feb 2020 16:54:41 +0100 Subject: [PATCH] Handle #4961 - Kafka Streams sasl, ssl config. --- .../deployment/KafkaStreamsProcessor.java | 46 +------ .../runtime/KafkaStreamsPropertiesUtil.java | 72 +++++++++++ .../streams/runtime/KafkaStreamsRecorder.java | 11 +- .../runtime/KafkaStreamsRuntimeConfig.java | 40 +++++- .../runtime/KafkaStreamsTopologyManager.java | 122 +++++++++++++++--- .../kafka/streams/runtime/SaslConfig.java | 91 +++++++++++++ .../kafka/streams/runtime/SslConfig.java | 73 +++++++++++ .../kafka/streams/runtime/StoreConfig.java | 27 ++++ .../KafkaStreamsHotReplacementSetup.java | 17 ++- .../src/main/resources/application.properties | 14 ++ .../src/main/resources/ks-keystore.p12 | Bin 0 -> 5253 bytes .../src/main/resources/ks-truststore.p12 | Bin 0 -> 1490 bytes .../streams/KafkaStreamsPropertiesTest.java | 37 ++++++ .../it/kafka/streams/KafkaStreamsTest.java | 22 ++++ .../it/kafka/streams/KafkaTestResource.java | 60 ++++++++- 15 files changed, 558 insertions(+), 74 deletions(-) create mode 100644 extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsPropertiesUtil.java create mode 100644 extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SaslConfig.java create mode 100644 extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SslConfig.java create mode 100644 extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/StoreConfig.java create mode 100644 integration-tests/kafka-streams/src/main/resources/ks-keystore.p12 create mode 100644 integration-tests/kafka-streams/src/main/resources/ks-truststore.p12 create mode 100644 integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsPropertiesTest.java diff --git a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java index 38ee19609cc32..79d5aff0c5053 100644 --- a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java +++ b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java @@ -1,9 +1,10 @@ package io.quarkus.kafka.streams.deployment; +import static io.quarkus.kafka.streams.runtime.KafkaStreamsPropertiesUtil.buildKafkaStreamsProperties; + import java.io.IOException; import java.util.Properties; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes.ByteArraySerde; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; @@ -11,8 +12,6 @@ import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; -import org.eclipse.microprofile.config.Config; -import org.eclipse.microprofile.config.ConfigProvider; import org.rocksdb.RocksDBException; import org.rocksdb.Status; import org.rocksdb.util.Environment; @@ -30,17 +29,13 @@ import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.builditem.nativeimage.RuntimeReinitializedClassBuildItem; import io.quarkus.deployment.pkg.NativeConfig; -import io.quarkus.kafka.streams.runtime.HotReplacementInterceptor; import io.quarkus.kafka.streams.runtime.KafkaStreamsRecorder; import io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig; import io.quarkus.kafka.streams.runtime.KafkaStreamsTopologyManager; -import io.quarkus.runtime.LaunchMode; import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; class KafkaStreamsProcessor { - private static final String STREAMS_OPTION_PREFIX = "kafka-streams."; - @BuildStep void build(BuildProducer feature, BuildProducer reflectiveClasses, @@ -149,43 +144,6 @@ BeanContainerListenerBuildItem processBuildTimeConfig(KafkaStreamsRecorder recor return new BeanContainerListenerBuildItem(recorder.configure(kafkaStreamsProperties)); } - private Properties buildKafkaStreamsProperties(LaunchMode launchMode) { - Config config = ConfigProvider.getConfig(); - Properties kafkaStreamsProperties = new Properties(); - for (String property : config.getPropertyNames()) { - if (isKafkaStreamsProperty(property)) { - includeKafkaStreamsProperty(config, kafkaStreamsProperties, property); - } - } - - if (launchMode == LaunchMode.DEVELOPMENT) { - addHotReplacementInterceptor(kafkaStreamsProperties); - } - - return kafkaStreamsProperties; - } - - private void addHotReplacementInterceptor(Properties kafkaStreamsProperties) { - String interceptorConfig = HotReplacementInterceptor.class.getName(); - Object originalInterceptorConfig = kafkaStreamsProperties - .get(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG)); - - if (originalInterceptorConfig != null) { - interceptorConfig = interceptorConfig + "," + originalInterceptorConfig; - } - - kafkaStreamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), interceptorConfig); - } - - private boolean isKafkaStreamsProperty(String property) { - return property.startsWith(STREAMS_OPTION_PREFIX); - } - - private void includeKafkaStreamsProperty(Config config, Properties kafkaStreamsProperties, String property) { - kafkaStreamsProperties.setProperty(property.substring(STREAMS_OPTION_PREFIX.length()), - config.getValue(property, String.class)); - } - @BuildStep @Record(ExecutionTime.RUNTIME_INIT) void configureAndLoadRocksDb(KafkaStreamsRecorder recorder, KafkaStreamsRuntimeConfig runtimeConfig) { diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsPropertiesUtil.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsPropertiesUtil.java new file mode 100644 index 0000000000000..fe70ba2eab0da --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsPropertiesUtil.java @@ -0,0 +1,72 @@ +package io.quarkus.kafka.streams.runtime; + +import java.util.Optional; +import java.util.Properties; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.StreamsConfig; +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; + +import io.quarkus.runtime.LaunchMode; + +public class KafkaStreamsPropertiesUtil { + + private static final String STREAMS_OPTION_PREFIX = "kafka-streams."; + private static final String QUARKUS_STREAMS_OPTION_PREFIX = "quarkus." + STREAMS_OPTION_PREFIX; + + private static boolean isKafkaStreamsProperty(String prefix, String property) { + return property.startsWith(prefix); + } + + private static void includeKafkaStreamsProperty(Config config, Properties kafkaStreamsProperties, String prefix, + String property) { + Optional value = config.getOptionalValue(property, String.class); + if (value.isPresent()) { + kafkaStreamsProperties.setProperty(property.substring(prefix.length()), value.get()); + } + } + + private static void addHotReplacementInterceptor(Properties kafkaStreamsProperties) { + String interceptorConfig = HotReplacementInterceptor.class.getName(); + Object originalInterceptorConfig = kafkaStreamsProperties + .get(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG)); + + if (originalInterceptorConfig != null) { + interceptorConfig = interceptorConfig + "," + originalInterceptorConfig; + } + + kafkaStreamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), interceptorConfig); + } + + private static Properties kafkaStreamsProperties(String prefix) { + Properties kafkaStreamsProperties = new Properties(); + Config config = ConfigProvider.getConfig(); + for (String property : config.getPropertyNames()) { + if (isKafkaStreamsProperty(prefix, property)) { + includeKafkaStreamsProperty(config, kafkaStreamsProperties, prefix, property); + } + } + + return kafkaStreamsProperties; + } + + public static Properties appKafkaStreamsProperties() { + return kafkaStreamsProperties(STREAMS_OPTION_PREFIX); + } + + public static Properties quarkusKafkaStreamsProperties() { + return kafkaStreamsProperties(QUARKUS_STREAMS_OPTION_PREFIX); + } + + public static Properties buildKafkaStreamsProperties(LaunchMode launchMode) { + Properties kafkaStreamsProperties = appKafkaStreamsProperties(); + + if (launchMode == LaunchMode.DEVELOPMENT) { + addHotReplacementInterceptor(kafkaStreamsProperties); + } + + return kafkaStreamsProperties; + } + +} diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRecorder.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRecorder.java index 23fb68c10b9ed..8e381f38a9906 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRecorder.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRecorder.java @@ -5,6 +5,7 @@ import org.rocksdb.RocksDB; import io.quarkus.arc.Arc; +import io.quarkus.arc.runtime.BeanContainer; import io.quarkus.arc.runtime.BeanContainerListener; import io.quarkus.runtime.annotations.Recorder; @@ -20,9 +21,13 @@ public void configureRuntimeProperties(KafkaStreamsRuntimeConfig runtimeConfig) } public BeanContainerListener configure(Properties properties) { - return container -> { - KafkaStreamsTopologyManager instance = container.instance(KafkaStreamsTopologyManager.class); - instance.configure(properties); + return new BeanContainerListener() { + + @Override + public void created(BeanContainer container) { + KafkaStreamsTopologyManager instance = container.instance(KafkaStreamsTopologyManager.class); + instance.configure(properties); + } }; } } diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java index a18e3cec8a0ed..4e4442df60641 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java @@ -37,10 +37,46 @@ public class KafkaStreamsRuntimeConfig { @ConfigItem public List topics; + /** + * The schema registry key. + * + * e.g. to diff between different registry impls / instances + * as they have this registry url under different property key. + * + * Red Hat / Apicurio - apicurio.registry.url + * Confluent - schema.registry.url + */ + @ConfigItem(defaultValue = "schema.registry.url") + public String schemaRegistryKey; + + /** + * The schema registry url. + */ + @ConfigItem + public Optional schemaRegistryUrl; + + /** + * The SASL JAAS config. + */ + public SaslConfig sasl; + + /** + * Kafka SSL config + */ + public SslConfig ssl; + @Override public String toString() { - return "KafkaStreamsRuntimeConfig [applicationId=" + applicationId + ", bootstrapServers=" + bootstrapServers - + ", applicationServer=" + applicationServer + ", topics=" + topics + "]"; + return "KafkaStreamsRuntimeConfig{" + + "applicationId='" + applicationId + '\'' + + ", bootstrapServers=" + bootstrapServers + + ", applicationServer=" + applicationServer + + ", topics=" + topics + + ", schemaRegistryKey='" + schemaRegistryKey + '\'' + + ", schemaRegistryUrl=" + schemaRegistryUrl + + ", sasl=" + sasl + + ", ssl=" + ssl + + '}'; } public List getTrimmedTopics() { diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java index 7c82607b0f288..a0ea1eda84632 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java @@ -1,12 +1,13 @@ package io.quarkus.kafka.streams.runtime; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -14,6 +15,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import java.util.stream.Collectors; import javax.enterprise.context.ApplicationScoped; @@ -26,6 +28,8 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.StateListener; @@ -54,7 +58,7 @@ public class KafkaStreamsTopologyManager { private volatile KafkaStreamsRuntimeConfig runtimeConfig; private volatile Instance topology; private volatile Properties properties; - private volatile Map adminClientConfig; + private volatile Properties adminClientConfig; private volatile Instance kafkaClientSupplier; private volatile Instance stateListener; @@ -91,17 +95,90 @@ private static Properties getStreamsProperties(Properties properties, String boo // build-time options streamsProperties.putAll(properties); + // dynamic add -- back-compatibility + streamsProperties.putAll(KafkaStreamsPropertiesUtil.quarkusKafkaStreamsProperties()); + streamsProperties.putAll(KafkaStreamsPropertiesUtil.appKafkaStreamsProperties()); + // add runtime options streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, runtimeConfig.applicationId); + // app id if (runtimeConfig.applicationServer.isPresent()) { streamsProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, runtimeConfig.applicationServer.get()); } + // schema registry + if (runtimeConfig.schemaRegistryUrl.isPresent()) { + streamsProperties.put(runtimeConfig.schemaRegistryKey, runtimeConfig.schemaRegistryUrl.get()); + } + + // sasl + SaslConfig sc = runtimeConfig.sasl; + if (sc != null) { + setProperty(sc.jaasConfig, streamsProperties, SaslConfigs.SASL_JAAS_CONFIG); + + setProperty(sc.clientCallbackHandlerClass, streamsProperties, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS); + + setProperty(sc.loginCallbackHandlerClass, streamsProperties, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS); + setProperty(sc.loginClass, streamsProperties, SaslConfigs.SASL_LOGIN_CLASS); + + setProperty(sc.kerberosServiceName, streamsProperties, SaslConfigs.SASL_KERBEROS_SERVICE_NAME); + setProperty(sc.kerberosKinitCmd, streamsProperties, SaslConfigs.SASL_KERBEROS_KINIT_CMD); + setProperty(sc.kerberosTicketRenewWindowFactor, streamsProperties, + SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR); + setProperty(sc.kerberosTicketRenewJitter, streamsProperties, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER); + setProperty(sc.kerberosMinTimeBeforeRelogin, streamsProperties, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN); + + setProperty(sc.loginRefreshWindowFactor, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR); + setProperty(sc.loginRefreshWindowJitter, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER); + + setProperty(sc.loginRefreshMinPeriod, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, + DurationToSecondsFunction.INSTANCE); + setProperty(sc.loginRefreshBuffer, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, + DurationToSecondsFunction.INSTANCE); + } + + // ssl + SslConfig ssl = runtimeConfig.ssl; + if (ssl != null) { + setProperty(ssl.protocol, streamsProperties, SslConfigs.SSL_PROTOCOL_CONFIG); + setProperty(ssl.provider, streamsProperties, SslConfigs.SSL_PROVIDER_CONFIG); + setProperty(ssl.cipherSuites, streamsProperties, SslConfigs.SSL_CIPHER_SUITES_CONFIG); + setProperty(ssl.enabledProtocols, streamsProperties, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG); + + setStoreConfig(ssl.truststore, streamsProperties, "ssl.truststore"); + setStoreConfig(ssl.keystore, streamsProperties, "ssl.keystore"); + setStoreConfig(ssl.key, streamsProperties, "ssl.key"); + + setProperty(ssl.keymanagerAlgorithm, streamsProperties, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG); + setProperty(ssl.trustmanagerAlgorithm, streamsProperties, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG); + Optional eia = Optional.of(ssl.endpointIdentificationAlgorithm.orElse("")); + setProperty(eia, streamsProperties, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); + setProperty(ssl.secureRandomImplementation, streamsProperties, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG); + } + return streamsProperties; } + private static void setStoreConfig(StoreConfig sc, Properties properties, String key) { + if (sc != null) { + setProperty(sc.type, properties, key + ".type"); + setProperty(sc.location, properties, key + ".location"); + setProperty(sc.password, properties, key + ".password"); + } + } + + private static void setProperty(Optional property, Properties properties, String key) { + setProperty(property, properties, key, Objects::toString); + } + + private static void setProperty(Optional property, Properties properties, String key, Function fn) { + if (property.isPresent()) { + properties.put(key, fn.apply(property.get())); + } + } + private static String asString(List addresses) { return addresses.stream() .map(InetSocketAddress::toString) @@ -130,17 +207,21 @@ void onStart(@Observes StartupEvent ev) { streams.setGlobalStateRestoreListener(globalStateRestoreListener.get()); } - adminClientConfig = getAdminClientConfig(bootstrapServersConfig); + adminClientConfig = getAdminClientConfig(streamsProperties); + + executor.execute(new Runnable() { - executor.execute(() -> { - try { - waitForTopicsToBeCreated(runtimeConfig.getTrimmedTopics()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; + @Override + public void run() { + try { + waitForTopicsToBeCreated(runtimeConfig.getTrimmedTopics()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + LOGGER.debug("Starting Kafka Streams pipeline"); + streams.start(); } - LOGGER.debug("Starting Kafka Streams pipeline"); - streams.start(); }); } @@ -191,7 +272,7 @@ public Set getMissingTopics(Collection topicsToCheck) Set topicNames = topics.names().get(10, TimeUnit.SECONDS); if (topicNames.containsAll(topicsToCheck)) { - return Collections.EMPTY_SET; + return Collections.emptySet(); } else { missing.removeAll(topicNames); } @@ -202,9 +283,8 @@ public Set getMissingTopics(Collection topicsToCheck) return missing; } - private Map getAdminClientConfig(String bootstrapServersConfig) { - Map adminClientConfig = new HashMap<>(); - adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + private Properties getAdminClientConfig(Properties properties) { + Properties adminClientConfig = new Properties(properties); // include other AdminClientConfig(s) that have been configured for (final String knownAdminClientConfig : AdminClientConfig.configNames()) { // give preference to admin. first @@ -225,4 +305,14 @@ public void setRuntimeConfig(KafkaStreamsRuntimeConfig runtimeConfig) { public void configure(Properties properties) { this.properties = properties; } + + private static final class DurationToSecondsFunction implements Function { + + private static final DurationToSecondsFunction INSTANCE = new DurationToSecondsFunction(); + + @Override + public String apply(Duration d) { + return String.valueOf(d.getSeconds()); + } + } } diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SaslConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SaslConfig.java new file mode 100644 index 0000000000000..66666957dff84 --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SaslConfig.java @@ -0,0 +1,91 @@ +package io.quarkus.kafka.streams.runtime; + +import java.time.Duration; +import java.util.Optional; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.quarkus.runtime.annotations.ConfigItem; + +@ConfigGroup +public class SaslConfig { + + /** + * JAAS login context parameters for SASL connections in the format used by JAAS configuration files + */ + @ConfigItem + public Optional jaasConfig; + + /** + * The fully qualified name of a SASL client callback handler class + */ + @ConfigItem + public Optional clientCallbackHandlerClass; + + /** + * The fully qualified name of a SASL login callback handler class + */ + @ConfigItem + public Optional loginCallbackHandlerClass; + + /** + * The fully qualified name of a class that implements the Login interface + */ + @ConfigItem + public Optional loginClass; + + /** + * The Kerberos principal name that Kafka runs as + */ + @ConfigItem + public Optional kerberosServiceName; + + /** + * Kerberos kinit command path + */ + @ConfigItem + public Optional kerberosKinitCmd; + + /** + * Login thread will sleep until the specified window factor of time from last refresh + */ + @ConfigItem + public Optional kerberosTicketRenewWindowFactor; + + /** + * Percentage of random jitter added to the renewal time + */ + @ConfigItem + public Optional kerberosTicketRenewJitter; + + /** + * Percentage of random jitter added to the renewal time + */ + @ConfigItem + public Optional kerberosMinTimeBeforeRelogin; + + /** + * Login refresh thread will sleep until the specified window factor relative to the + * credential's lifetime has been reached- + */ + @ConfigItem + public Optional loginRefreshWindowFactor; + + /** + * The maximum amount of random jitter relative to the credential's lifetime + */ + @ConfigItem + public Optional loginRefreshWindowJitter; + + /** + * The desired minimum duration for the login refresh thread to wait before refreshing a credential + */ + @ConfigItem + public Optional loginRefreshMinPeriod; + + /** + * The amount of buffer duration before credential expiration to maintain when refreshing a credential + */ + @ConfigItem + public Optional loginRefreshBuffer; + +} diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SslConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SslConfig.java new file mode 100644 index 0000000000000..e2703c337fefe --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SslConfig.java @@ -0,0 +1,73 @@ +package io.quarkus.kafka.streams.runtime; + +import java.util.Optional; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.quarkus.runtime.annotations.ConfigItem; + +@ConfigGroup +public class SslConfig { + + /** + * The SSL protocol used to generate the SSLContext + */ + @ConfigItem + public Optional protocol; + + /** + * The name of the security provider used for SSL connections + */ + @ConfigItem + public Optional provider; + + /** + * A list of cipher suites + */ + @ConfigItem + public Optional cipherSuites; + + /** + * The list of protocols enabled for SSL connections + */ + @ConfigItem + public Optional enabledProtocols; + + /** + * Truststore config + */ + public StoreConfig truststore; + + /** + * Keystore config + */ + public StoreConfig keystore; + + /** + * Key config + */ + public StoreConfig key; + + /** + * The algorithm used by key manager factory for SSL connections + */ + @ConfigItem + public Optional keymanagerAlgorithm; + + /** + * The algorithm used by trust manager factory for SSL connections + */ + @ConfigItem + public Optional trustmanagerAlgorithm; + + /** + * The endpoint identification algorithm to validate server hostname using server certificate + */ + @ConfigItem(defaultValue = "https") + public Optional endpointIdentificationAlgorithm; + + /** + * The SecureRandom PRNG implementation to use for SSL cryptography operations + */ + @ConfigItem + public Optional secureRandomImplementation; +} diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/StoreConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/StoreConfig.java new file mode 100644 index 0000000000000..149c69705ec95 --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/StoreConfig.java @@ -0,0 +1,27 @@ +package io.quarkus.kafka.streams.runtime; + +import java.util.Optional; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.quarkus.runtime.annotations.ConfigItem; + +@ConfigGroup +public class StoreConfig { + /** + * Store type + */ + @ConfigItem + public Optional type; + + /** + * Store location + */ + @ConfigItem + public Optional location; + + /** + * Store password + */ + @ConfigItem + public Optional password; +} diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devmode/KafkaStreamsHotReplacementSetup.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devmode/KafkaStreamsHotReplacementSetup.java index dc2374faea2a2..d5bbcac879bd1 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devmode/KafkaStreamsHotReplacementSetup.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devmode/KafkaStreamsHotReplacementSetup.java @@ -28,13 +28,16 @@ public void run() { if (nextUpdate < System.currentTimeMillis()) { synchronized (this) { if (nextUpdate < System.currentTimeMillis()) { - executor.execute(() -> { - try { - context.doScan(true); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); + executor.execute(new Runnable() { + @Override + public void run() { + try { + context.doScan(true); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } } }); // we update at most once every 2s diff --git a/integration-tests/kafka-streams/src/main/resources/application.properties b/integration-tests/kafka-streams/src/main/resources/application.properties index d9f640c3c192c..32b766bd73b8b 100644 --- a/integration-tests/kafka-streams/src/main/resources/application.properties +++ b/integration-tests/kafka-streams/src/main/resources/application.properties @@ -6,8 +6,22 @@ quarkus.kafka-streams.bootstrap-servers=localhost:19092 quarkus.kafka-streams.application-id=streams-test-pipeline quarkus.kafka-streams.topics=streams-test-categories,streams-test-customers +quarkus.kafka-streams.schema-registry-key=apicurio.registry.url +quarkus.kafka-streams.schema-registry-url=http://localhost:8080 + +quarkus.kafka-streams.security.protocol=SSL +quarkus.kafka-streams.ssl.truststore.location=./target/classes/ks-truststore.p12 +quarkus.kafka-streams.ssl.truststore.password=Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L +quarkus.kafka-streams.ssl.truststore.type=PKCS12 +quarkus.kafka-streams.ssl.endpoint-identification-algorithm= + +quarkus.kafka-streams.sasl.kerberos-ticket-renew-jitter=0.06 +quarkus.kafka-streams.sasl.login-refresh-buffer=PT20S + # streams options kafka-streams.cache.max.bytes.buffering=10240 kafka-streams.commit.interval.ms=1000 kafka-streams.metadata.max.age.ms=500 kafka-streams.auto.offset.reset=earliest + +kafka-streams.some-property=dummy diff --git a/integration-tests/kafka-streams/src/main/resources/ks-keystore.p12 b/integration-tests/kafka-streams/src/main/resources/ks-keystore.p12 new file mode 100644 index 0000000000000000000000000000000000000000..abfdbcb2053a3ef5e92f9038eba3711c3b6a606f GIT binary patch literal 5253 zcmY+EWmFV^x9u5VXof~QqY@B&1tl2nl6q7-^91loAja8tLv(KvHTbX#^au z_pbN;@7)h)?X&hiXMg;GVdP=x01PmU9D;?*9i5pm%HP1>5^lcVG_*99!lh@X_!3_S zPEQ-%s^rko5C9nxZblzs4r&0A^hAHvbf9);2{g>^&se%pUZ{f12b;P+lZ%X*x2nkM z=(M%4RD;Dc$`66dO{A7*_UB_S9)~+h<4C=zPzEg~_eV*SziL9XSLnK{;*>@@Q4#1$ z9C<+Lm5hYh&7h;@s710INq?OwDHnY5xJ`ko=M!3y^_uwW_o2s`FeWT8Q^40IG(^XJAXJ?R zzmI9@*GL~4&7P{CXn)Mi*Iezq;7eomniH3_NZIdGUF?Jfo!)t&PlF3X+8JkfXX+#tk3+D@1`abHvzr&Firy~QPe{p@vNfA8 zq;q<%opuGlWZt~KpA`#v3+bDxZM^U7c1JhxPa!PxPclauh3(%y)1J;JTNk5TkQ}$Y zqu6FE*C9@LGKg-}1dcfl!l*(+lD57vSr%A0K;nKW&{C<*As_vefWZ+bek{FMd5nl( zuerIeU9oRNUOPRFC=yX=t+Xej*UP5KljI)8>*TaG84UHg^X@+t1te?!5YFdIJEx-% z<}$<5fN9r)T%s~_MYD0M)TR%*B|X{B528m zS63?VA(wKc5P#KO-cGpL!;L7{wp{on13L5%-!Zg?{;#Rbj@0(@Ek_(-r`cd`xL1$K zBSOt;dDcf1-AJ$fS6u~{uHjXsYe7q%Byu9`m2Rt&_vB@qUJZ*DWT01Tn{*tamKgdszF#Ze)?D9%~jBesc*+N51Fx=;JShTCQQ@BLeAM{Kk{H>(xV}jUjsOi9#gD;{GOz zR|Qwu>gtZaD_1>^Y^bmUdbH0tKjku;0gl<9G8PkSP!|cX#|aG#7P6|YuMtp zvEh6>F`%d;F+iIUyjZBSxn`D{Jltaau5YGZj#lH%#pICy!$Iq;d23&foF@8|N#|0Y z9F!(7ww}4lA%YDmeuCrFYXs}N({y`0fxzdt(F%scr*QwYyNa8AY!!%pVf|8G!D%cD zn@z=_%MS|Nd)Px`_yuZrTev-R7$n6Q)Y}6H&wla+VquH+$+)imc@8?loX|&(RWv#B z&WZCHqXOH$Kxb)G@0Cjlc%yc)S7N3_fJP}LT)Cf!6HN0X83zp{tqH?&1XaMy|3M`# zxga?MDViIaC)yh{n}6!`Z}UO({r^uW1j#W;4V~TXnFK||!C)Z~A#q`G2{AAXRP&!6 zY^*{UDDR(@g$_XbCn^6g0sJq{V*NK~jXe1lQy<3Rxzt1M!dZ+tXR;pL|HIi2Fp%l0 zzrd?l`5I=iwKKX>W29pUynvjSf*e@}aI#3}zH6`Vy$n8&MVguALk-*~w2#0(0!P~V z6utfF3+%qKK!3T)T$LJ!_q%mFg6h-$Jhp)iwh?x4_DzB=H-6vm81vk4PMMOf^HC1g zCRtOjdDpGmIOHK7lG8{Paq2iw@KamW16hpBkmnS-EDQVd6FEb75rIz|uDb%3tGj;IpBYP9^sELU~&f&4B$t(A}t)0ox9!EY|C74(Nr^3pn-hAr}l zXfMN6A<^HgWf2+rD281sO3*y>^(WOe2&wS3mS%ST$tRmyvC0 zh^doElXieJU9fYcndRVT(;{_M&Ns8$L8J1VB{b{2o{i9mKLVyo&)_E{LP9ql(%*;iY&J&6Fd#L zZou*6UA2G4>9O~zC^TC=xcoS>=|`ZsibxSP8DyG^6bH{GdX?{Hz&tFbgxOMs|L*O0 z@7IDM7rRDD+F&#)J; zKKr3|$lA>>XES%$2st#f< z?ezARU)oV&ZQ=~4#N^iJz6zB@Xm?h#XqeA2KJf*!Vghdv(FXwZ)~UBUfxXrhUiu z-A^AJ+1AONNSW1%3D&2X{<{H-)Vz=m{zf0&(4NMzODN~nn6?|-wOn);EeC3qEN~#7 zsA-l@6DdvT@gxJBMl&e=urKyanF4Z`)`qG`_>s-v0MSOs{kX02AA*|tb>~^9&7e#z z*7Xn`JD>m;BgF#ecU{oY~c%LZ@-jAgx{E<+OVTK|%NYlDH_&P9agO;4FbzL-%QlQ347^qkTOYIG# zhl2pyFlR*lZ6EnbBYz8Fm=9%kj`Oms98_H)(?vN<()HqmljJ!pc}X{Nd3jdx?z%2~ zI;}f-Eus;hW$x{B9-HoKPl*g>8J{A(XR*e=XL}H0BS&9%+%dDL(;0X&vd5nm?LgP< zfY9@eGjeOB<^Dm(57hzNec1zrrdF-1l<)cOgh|?1wX7>YTmdg7y&>~W%7M@R%mt3@ zX_He~YT!pE9}T3Q{Ma0$SqXIK`9ir>xG)p}to=ZPg+jqMw9J2~Ok`Fb*9Bc#By2j4nnx2l7Nr)32m`4&2^PziVR{cfY6?iVxscgNoxCMc zs?RVDb~y{Qba#_S@03f6CX79UQugFT^;M(?PqVT!|G04l-5$qxCDHVrg|y1WIa9M1 zau^D4^KS#k-~=%LR%$-x>fqxe>X!JkO=fO=Xut=qgQ~3t%PMVw?_purB|nHK@Y-jC zUMmavQxt^J=lC$*mn4~M2u?yk_>b<+jkWf-SvpG-et$(@mLRlTsXp*j1=-5b#$`JU z;wCQ;&iulzw+ci*nKoG=K7v$0e!OO98%`)J^r#V)G(#tVw%X$7u*OJBqo)x zXwm+iV)UyOa_H^(Fcel15~1OQ4*_m!1nLj+MP4KLkgz(+&YU>0Z`$2k{qiGp!(Xsd z!eu)K&P8G>Yu(^;exp9iFV;;MVlmX-mCZ>-T~|-Od8usZ#jCO|sYtx-9L85{)z^q! z)#EmA-8?uTS`JTVF6$@D6yY89{^G+I8%7xZO52FFU?s;i`(ahKD#4#co$betHhea5 z;@QnxE|!#uXmyIK7270+Oz3K>D1LzlJ12p;j_+HBm-v_>8AV6-=;9kHIry0XCS`Ch z`jsO_`9er|p*YE*@)OaJE8a@A#ixW{>|yW}7W(o@l9NovOqOUv>=V0!+4bZ-M*vg( zyHaqCsm|i(4ji%_Pb?fh%UC*Iz$t=A8d{<~@{W8@y|a^tdNb$eby799FM}Lbdee~K zLCMy(e!O+s3*D1V?pBdn&Xiaw0u~8{v6u0VCN9Q>5?Qzy8EHn{H#M-b2#+EtWIyl& z^SBsMH@2{DKr~uR;@{(eNNxBJww#x>B@Ky{+X088&vVf7Qx(FC;yNkb9%!odCeW;L z*0c^NooabrMeo*v5>qO)v}s-58CUC!mno~~E}fLq7iwNBzUD}R%4=^ch=q`+ei$*s zd|;ErNwced%}2E49W!Qw9kNk^D&1##=Gjg7{9{nOb(j4yLbcraz+ynSq3f;{q=9#X zsdqljh^69Nree1rY@JJe0pWBz=f=ZDHyJ8G^Lh40bG(0frLiS{<9*83O?)Cji??B` zE})JKkIss&py96LMXtdlEQz;q50}zY7k(w5koLG5_HJ~w>Zqv72hJOV>XRY=>!&R;kjXi7^I^Z`+w#bZ zHkwmefp@AMusnUZ$98>)gyj}kQaA%oJMe#YlrkkF?M6;JUrcHmU-Su~rxQOBk-ie(pE5fXYw4j>UzrS^XDB)>KO*snZ zGjqWxcbFmqK~$|jFEC+d+2C}&Rw@-{Vb>4u z2#F^jOJi-*W)nLT1D$P$y<=#qW0)u;CAqTCytSGS#@p*e9D=vZIVsw5wGeGj^KNAy z!bG1z=}X}I%oTa|AVitsxDFTNO_1pGB(0p!7z|AWRFaFl7p@v9B_PY%H7~q-h$^;8 zBC5^Zn8ihpY{X?af?nOwYb3@^7AH*XqR%F>Rn~Ug=+;F6Kx8dD%coijpn3Ko_EG%- zY!k_G2Y+gcFY+37s|k|G0@%dORKAc&f`=3r5-%InKRU)6TGK=hThnK~;|Rdo$UluE z2}3=+`RKfUkj0pQlgkFEJ)7Has-NA#D4;5Jd7-`;guEfkGmRky9pU|%AI{??;nb{zQoP$i0NFvTUZM@ zWLzoG{wjXAE5}!KLaa~y^VqCVs>gHWSY7>a0~PQ%5Ll9TmC)H(Z^1T^ePmyto}v6> z{cOh#e^-Y&$FBUhsOYn$GSvZ?l=4hEewQe315dT+7M)m{=-zfe%dT;# z8%CUaEB47y4l9>ltyTLHSZ}7Y67rl7F~IN|9&Sc53f!Os}jCny`tqZL%7JY3)4II?^S z9wU9!-qtKRY{Jn>xP?pQ(kVMO)m=Jc+zdzTm8V=x=Sel`8L{bstf)TmK~~M=SQpbU z8P4S^W&vnSi-x%&wb!?Spf|uGm)_9H=kF3@3$w1AeJvfu#FBW~qEm&B0<)N#|I};f zG{nY<-ktvGO0N_R(O2uZU^;C!MyjWp3RlHm4HQ(-$0k#*9LL_qMoJiISM9MkZi8qk z$Cj9~=`R)0rcZ`w;eygXcE01ZAxG39Ct~#2l@4T9kQsKq(4~HpxQ8fGvQm6Dqzz7W zmHA>Q4v;2umb02pgox)$dN5f^f#WUPAb{X+mR&tq2Fwh`#RBqxFwkj$Xqe=eYz7G`hDe6@ z4FLxRpn?TmFoFeH0s#Opf(23r2`Yw2hW8Bt2LUiC1_~;MNQUI{e>8#P7aS0nu#-+vo5#AtL&>jvC9&PJ1Bb{K56*D1K2dG33LmQ^ z*!|_b;n%>^d-reol%)0TuXndq$c8!C7eR=MF*i;}Ffw6_0{fscd$pE1PDEYR8*-h+ z<0>QSIF+x66;W{Lb7z{)=sLJ14uC-v$c25`XmIb66lZFlTFki!dv+X?_qwamtYtN! zV2ZKRM#!icAX77G)9MfWsRyCi;41}zM=+uF6qfkgH(5-`lTk4oVTvZ%*(sCJhl=p+ z`$p1>Vxk*06S;cfnIK(hQK9mY?!Q0ixC-P_;5>|AfX zxpS@eMbI=mpG~B>7FJtC(L1=fhf^nIK)J|TI3uKDG5Tp_I4$j`G?LsM1**cB?s%mj zGXsTrklUfbn^Z8C+S*I4E4l1#`J?(beRWj^+z{FBmaf;4hvF-zhq-#Xl1zE@L* zX*q(y`=2}S|Hbq$i-pOlHd8_5=5RA=&s52BdtlzuwozTMwk+pF-}~EL0+7#u!%szo zK@&@1LcHd}A$0Ug3U1ueMfFf7*?IS0q@BRLHB=zXy`>Cv*I+Xxx!Hp5Z$2{t1GQue zJ0P@DlXs!;Vd0yt{bHXRba6^h1z$)s3YRNIYIG1lPH^4?pY^I55N7a?mIRBBRlWU& zAU1X~iP1{EI(qNvqs58{N}wOy1%LzhCUPJcKmqweuHW3%H%aI>Wh7?Al(U(+dgwd_ z__iCQ(%hY+eUf)9EPyo7ntc4jpgTePpFGI?;*O z_btaSC}pG5h{6iLd@l~wa5W#hpfK+03}cyTPj&XS{Z`geUaIKpa}vbqb(GaE=8O|c z%-$n^ChN){R1gNB^G-#?-8`N5LK+PEskOiN7K?H&EksV-iJ`|_*4cv3xC29j`0xXC z{taaZQMh$9D_&&c=qW0-?*W2;SvJ*nFLf`%tB!5-A{;(7&GJvg7@ZSQio)=ZdWX`u zmX{WTpC0p8sFxfO!Rf#ie@3-Bvg`Ej&-&Ys8B3=nAnYq;!iWx_%LwZ~|2^RyRf}tO zi7voRJP3XqEYzuNsBHXl@uGj>N6)6Js^sxav-+h(-Mh(GcN0$u*m!_e>J81H+fPZ1 zR3=&7<4QJ7dPhs(MzvJTz|2#d9sr)lch6j<6+z!&6Cu~2Q}a5PM8#G!3QI5O2jrP? zd`;)7hvBXZTym4gUUJNHNBPk8ia*UBFC@w&p9|YE8LY-=A86cuE!P{hJoaF9db_}-(Azls&u`c* originals = config.originals(); + + Assertions.assertEquals("20", originals.get(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS)); + Assertions.assertEquals("http://localhost:8080", originals.get("apicurio.registry.url")); + Assertions.assertEquals("dummy", originals.get("some-property")); + } +} diff --git a/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java b/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java index e4203d401632e..7c68a740cc959 100644 --- a/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java +++ b/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java @@ -1,5 +1,8 @@ package io.quarkus.it.kafka.streams; +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -7,6 +10,7 @@ import java.util.Properties; import org.apache.http.HttpStatus; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -16,6 +20,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.hamcrest.CoreMatchers; @@ -32,12 +37,27 @@ @QuarkusTest public class KafkaStreamsTest { + private static void addSSL(Properties props) { + try { + File sslDir = KafkaTestResource.sslDir(null, false); + File tsFile = new File(sslDir, "ks-truststore.p12"); + props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tsFile.getPath()); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L"); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12"); + props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private static Producer createCustomerProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "streams-test-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectMapperSerializer.class.getName()); + addSSL(props); return new KafkaProducer<>(props); } @@ -48,6 +68,7 @@ private static Producer createCategoryProducer() { props.put(ProducerConfig.CLIENT_ID_CONFIG, "streams-test-category-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectMapperSerializer.class.getName()); + addSSL(props); return new KafkaProducer<>(props); } @@ -60,6 +81,7 @@ private static KafkaConsumer createConsumer() { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EnrichedCustomerDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + addSSL(props); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("streams-test-customers-processed")); diff --git a/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaTestResource.java b/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaTestResource.java index bdafe00a01d04..75c1702a71394 100644 --- a/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaTestResource.java +++ b/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaTestResource.java @@ -1,10 +1,17 @@ package io.quarkus.it.kafka.streams; import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.Collections; import java.util.Map; import java.util.Properties; +import org.apache.kafka.common.config.SslConfigs; + import io.debezium.kafka.KafkaCluster; import io.debezium.util.Testing; import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; @@ -13,13 +20,62 @@ public class KafkaTestResource implements QuarkusTestResourceLifecycleManager { private KafkaCluster kafka; + public static File sslDir(File directory, boolean removeExistingContent) throws IOException { + if (directory == null) { + directory = Testing.Files.createTestingDirectory("kafka-data", removeExistingContent); + } + + File targetDir = directory.getParentFile().getParentFile(); + File sslDir = new File(targetDir, "ssl_test"); + if (sslDir.exists() == false) { + //noinspection ResultOfMethodCallIgnored + sslDir.mkdir(); + } + return sslDir; + } + @Override public Map start() { try { + File directory = Testing.Files.createTestingDirectory("kafka-data", true); + File sslDir = sslDir(directory, true); + + Path ksPath = new File(sslDir, "ks-keystore.p12").toPath(); + try (InputStream ksStream = getClass().getResourceAsStream("/ks-keystore.p12")) { + Files.copy( + ksStream, + ksPath, + StandardCopyOption.REPLACE_EXISTING); + } + + Path tsPath = new File(sslDir, "ks-truststore.p12").toPath(); + try (InputStream tsStream = getClass().getResourceAsStream("/ks-truststore.p12")) { + Files.copy( + tsStream, + tsPath, + StandardCopyOption.REPLACE_EXISTING); + } + String password = "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L"; + String type = "PKCS12"; + Properties props = new Properties(); props.setProperty("zookeeper.connection.timeout.ms", "45000"); - File directory = Testing.Files.createTestingDirectory("kafka-data", true); - kafka = new KafkaCluster().withPorts(2182, 19092) + + // http://kafka.apache.org/documentation.html#security_ssl + props.setProperty("listener.security.protocol.map", "CLIENT:SSL"); + props.setProperty("listeners", "CLIENT://:19092"); + props.setProperty("inter.broker.listener.name", "CLIENT"); + props.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ksPath.toString()); + props.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password); + props.setProperty(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, type); + props.setProperty(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tsPath.toString()); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, type); + props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + + kafka = new KafkaCluster() + .withPorts(2182, 19092) .addBrokers(1) .usingDirectory(directory) .deleteDataUponShutdown(true)