Skip to content

Commit

Permalink
Add ScramSaslClientFactory to the reflective classes. Fixes quarkusio…
Browse files Browse the repository at this point in the history
…#18026

Enable all security services when kafka security protocol is set.
  • Loading branch information
ozangunalp committed Jun 24, 2021
1 parent 659ef7a commit 2d44d17
Showing 1 changed file with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.AbstractLogin;
import org.apache.kafka.common.security.authenticator.DefaultLogin;
import org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient;
import org.apache.kafka.common.security.scram.internals.ScramSaslClient;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
Expand All @@ -44,6 +46,7 @@
import org.apache.kafka.common.serialization.ShortSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.Type;
Expand All @@ -63,6 +66,7 @@
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.AdditionalIndexedClassesBuildItem;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.EnableAllSecurityServicesBuildItem;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.IndexDependencyBuildItem;
Expand Down Expand Up @@ -408,12 +412,21 @@ public AdditionalBeanBuildItem runtimeConfig() {

@BuildStep
public void withSasl(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<ReflectiveHierarchyBuildItem> reflectiveHierarchy) {
BuildProducer<ReflectiveHierarchyBuildItem> reflectiveHierarchy,
BuildProducer<EnableAllSecurityServicesBuildItem> allsecurityServices) {

reflectiveClass
.produce(new ReflectiveClassBuildItem(false, false, AbstractLogin.DefaultLoginCallbackHandler.class));
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, SaslClientCallbackHandler.class));
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, DefaultLogin.class));
reflectiveClass
.produce(new ReflectiveClassBuildItem(true, false, false, ScramSaslClient.ScramSaslClientFactory.class));

// Enable SSL support if kafka.security.protocol is set to something other than PLAINTEXT, which is the default
String securityProtocol = ConfigProvider.getConfig().getConfigValue("kafka.security.protocol").getValue();
if (securityProtocol != null && SecurityProtocol.forName(securityProtocol) != SecurityProtocol.PLAINTEXT) {
allsecurityServices.produce(new EnableAllSecurityServicesBuildItem());
}

final Type loginModuleType = Type
.create(DotName.createSimple(LoginModule.class.getName()), Kind.CLASS);
Expand Down

0 comments on commit 2d44d17

Please sign in to comment.