Skip to content

Commit

Permalink
Add OAuth support for Kafka client in native mode
Browse files Browse the repository at this point in the history
  • Loading branch information
dejanb committed Nov 4, 2020
1 parent ce8c1b6 commit 25f5178
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import org.apache.kafka.common.security.authenticator.AbstractLogin;
import org.apache.kafka.common.security.authenticator.DefaultLogin;
import org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
Expand Down Expand Up @@ -118,6 +121,12 @@ public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer<Reflectiv
collectImplementors(toRegister, indexBuildItem, ConsumerInterceptor.class);
collectImplementors(toRegister, indexBuildItem, ProducerInterceptor.class);

reflectiveClass.produce(new ReflectiveClassBuildItem(false, false,
OAuthBearerSaslClient.class,
OAuthBearerSaslClient.OAuthBearerSaslClientFactory.class,
OAuthBearerToken.class,
OAuthBearerRefreshingLogin.class));

for (Class<?> i : BUILT_INS) {
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, i.getName()));
collectSubclasses(toRegister, indexBuildItem, i);
Expand Down Expand Up @@ -227,6 +236,28 @@ public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer<Reflectiv
}
}

try {
Class.forName("io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler", false,
Thread.currentThread().getContextClassLoader());

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, true,
"io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"));

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, true,
"org.keycloak.jose.jws.JWSHeader",
"org.keycloak.representations.AccessToken",
"org.keycloak.representations.AccessToken$Access",
"org.keycloak.representations.AccessTokenResponse",
"org.keycloak.representations.IDToken",
"org.keycloak.representations.JsonWebToken",
"org.keycloak.jose.jwk.JSONWebKeySet",
"org.keycloak.jose.jwk.JWK",
"org.keycloak.json.StringOrArrayDeserializer",
"org.keycloak.json.StringListMapDeserializer"));
} catch (ClassNotFoundException e) {
//ignore, Strimzi OAuth Client is not on the classpath
}

}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.quarkus.kafka.client.runtime.graal;

import java.security.AccessControlContext;
import java.security.AccessController;
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;

import javax.security.auth.AuthPermission;
import javax.security.auth.Subject;
import javax.security.auth.SubjectDomainCombiner;

import com.oracle.svm.core.annotate.Substitute;
import com.oracle.svm.core.annotate.TargetClass;

@TargetClass(className = "javax.security.auth.Subject")
final class Target_javax_security_auth_Subject {

@Substitute
public static <T> T doAs(final Subject subject,
final java.security.PrivilegedExceptionAction<T> action)
throws java.security.PrivilegedActionException {

java.lang.SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(AuthPermissionHolder.DO_AS_PERMISSION);
}
if (action == null) {
throw new NullPointerException("Invalid null action provided");
}

final AccessControlContext currentAcc = AccessController.getContext();

SubjectHolder.subjects.put(currentAcc, subject);

return java.security.AccessController.doPrivileged(action,
createContext(subject, currentAcc));
}

@Substitute
public static Subject getSubject(final AccessControlContext acc) {

java.lang.SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(AuthPermissionHolder.GET_SUBJECT_PERMISSION);
}

if (acc == null) {
throw new NullPointerException("Invalid null AccessControlContext provided");
}

// return the Subject from the DomainCombiner of the provided context
return AccessController.doPrivileged(new java.security.PrivilegedAction<Subject>() {
public Subject run() {
return SubjectHolder.subjects.remove(acc);
}
});
}

@Substitute
private static AccessControlContext createContext(final Subject subject,
final AccessControlContext acc) {

return java.security.AccessController.doPrivileged(new java.security.PrivilegedAction<AccessControlContext>() {
public AccessControlContext run() {
if (subject == null)
return new AccessControlContext(acc, null);
else
return new AccessControlContext(acc,
new SubjectDomainCombiner(subject));
}
});
}

final static class AuthPermissionHolder {
static final AuthPermission DO_AS_PERMISSION = new AuthPermission("doAs");

static final AuthPermission DO_AS_PRIVILEGED_PERMISSION = new AuthPermission("doAsPrivileged");

static final AuthPermission SET_READ_ONLY_PERMISSION = new AuthPermission("setReadOnly");

static final AuthPermission GET_SUBJECT_PERMISSION = new AuthPermission("getSubject");

static final AuthPermission MODIFY_PRINCIPALS_PERMISSION = new AuthPermission("modifyPrincipals");

static final AuthPermission MODIFY_PUBLIC_CREDENTIALS_PERMISSION = new AuthPermission("modifyPublicCredentials");

static final AuthPermission MODIFY_PRIVATE_CREDENTIALS_PERMISSION = new AuthPermission("modifyPrivateCredentials");
}

final static class SubjectHolder {
static final Map<AccessControlContext, Subject> subjects = Collections.synchronizedMap(new WeakHashMap<>());
}
}

0 comments on commit 25f5178

Please sign in to comment.