diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java index 5e797c4b83c9e..c05cd1ba098cb 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java @@ -18,12 +18,14 @@ import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.internals.DefaultPartitioner; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.authenticator.AbstractLogin; import org.apache.kafka.common.security.authenticator.DefaultLogin; import org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler; import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin; import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient; +import org.apache.kafka.common.security.scram.internals.ScramSaslClient; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteBufferDeserializer; @@ -44,6 +46,7 @@ import org.apache.kafka.common.serialization.ShortSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.jandex.ClassInfo; import org.jboss.jandex.DotName; import org.jboss.jandex.Type; @@ -63,6 +66,7 @@ import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.AdditionalIndexedClassesBuildItem; import io.quarkus.deployment.builditem.CombinedIndexBuildItem; +import io.quarkus.deployment.builditem.EnableAllSecurityServicesBuildItem; import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.IndexDependencyBuildItem; @@ -408,12 +412,21 @@ public AdditionalBeanBuildItem runtimeConfig() { @BuildStep public void withSasl(BuildProducer reflectiveClass, - BuildProducer reflectiveHierarchy) { + BuildProducer reflectiveHierarchy, + BuildProducer allsecurityServices) { reflectiveClass .produce(new ReflectiveClassBuildItem(false, false, AbstractLogin.DefaultLoginCallbackHandler.class)); reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, SaslClientCallbackHandler.class)); reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, DefaultLogin.class)); + reflectiveClass + .produce(new ReflectiveClassBuildItem(true, false, false, ScramSaslClient.ScramSaslClientFactory.class)); + + // Enable SSL support if kafka.security.protocol is set to something other than PLAINTEXT, which is the default + String securityProtocol = ConfigProvider.getConfig().getConfigValue("kafka.security.protocol").getValue(); + if (securityProtocol != null && SecurityProtocol.forName(securityProtocol) != SecurityProtocol.PLAINTEXT) { + allsecurityServices.produce(new EnableAllSecurityServicesBuildItem()); + } final Type loginModuleType = Type .create(DotName.createSimple(LoginModule.class.getName()), Kind.CLASS);