From daf56bf1ba6a633a8e803b7964167d881f6f0661 Mon Sep 17 00:00:00 2001 From: Kevin Viet Date: Thu, 18 Jun 2020 14:28:28 +0200 Subject: [PATCH] Fix "SSL Handshake failed" issue - Add security.protocol as quarkus runtime options and allow replaying the option as runtime init - Enable enableAllSecurityServices when building the native image for the integration test --- .../kafka/streams/runtime/KafkaStreamsRuntimeConfig.java | 7 +++++++ .../kafka/streams/runtime/KafkaStreamsTopologyManager.java | 4 ++++ integration-tests/kafka-streams/pom.xml | 1 + 3 files changed, 12 insertions(+) diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java index 178c06f66ba1e..3fe3264717734 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java @@ -56,6 +56,13 @@ public class KafkaStreamsRuntimeConfig { @ConfigItem public Optional schemaRegistryUrl; + /** + * The security protocol to use + * See https://docs.confluent.io/current/streams/developer-guide/security.html#security-example + */ + @ConfigItem(name = "security.protocol") + public Optional securityProtocol; + /** * The SASL JAAS config. */ diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java index 3c1e31e67253b..3d3661e561f65 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java @@ -25,6 +25,7 @@ import javax.inject.Inject; import javax.inject.Singleton; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ListTopicsResult; @@ -113,6 +114,9 @@ private static Properties getStreamsProperties(Properties properties, String boo streamsProperties.put(runtimeConfig.schemaRegistryKey, runtimeConfig.schemaRegistryUrl.get()); } + // set the security protocol (in case we are doing PLAIN_TEXT) + setProperty(runtimeConfig.securityProtocol, streamsProperties, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); + // sasl SaslConfig sc = runtimeConfig.sasl; if (sc != null) { diff --git a/integration-tests/kafka-streams/pom.xml b/integration-tests/kafka-streams/pom.xml index 17c6f3789a530..af68da89c0aab 100644 --- a/integration-tests/kafka-streams/pom.xml +++ b/integration-tests/kafka-streams/pom.xml @@ -135,6 +135,7 @@ true true ${graalvmHome} + true