Skip to content

Commit

Permalink
Fix KeycloakRBACAuthorizer to work with StandardAuthorizer when in …
Browse files Browse the repository at this point in the history
…KRAFT mode

Signed-off-by: Marko Strukelj <[email protected]>
  • Loading branch information
mstruk committed Mar 7, 2023
1 parent 6f997c1 commit 3fc6d8f
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 21 deletions.
5 changes: 5 additions & 0 deletions oauth-keycloak-authorizer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-metadata</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -160,8 +162,8 @@
* This authorizer honors the <em>super.users</em> configuration. Super users are automatically granted any authorization request.
* </p>
*/
@SuppressWarnings("deprecation")
public class KeycloakRBACAuthorizer extends AclAuthorizer {
@SuppressWarnings({"deprecation", "checkstyle:ClassFanOutComplexity"})
public class KeycloakRBACAuthorizer implements Authorizer {

private static final String PRINCIPAL_BUILDER_CLASS = OAuthKafkaPrincipalBuilder.class.getName();
private static final String DEPRECATED_PRINCIPAL_BUILDER_CLASS = JwtKafkaPrincipalBuilder.class.getName();
Expand Down Expand Up @@ -195,12 +197,8 @@ public class KeycloakRBACAuthorizer extends AclAuthorizer {
private SensorKeyProducer grantsSensorKeyProducer;
private final Semaphores<JsonNode> semaphores = new Semaphores<>();

/**
* Create a new instance
*/
public KeycloakRBACAuthorizer() {
super();
}
private Authorizer delegate;


@Override
public void configure(Map<String, ?> configs) {
Expand Down Expand Up @@ -234,6 +232,9 @@ public void configure(Map<String, ?> configs) {
}

delegateToKafkaACL = config.getValueAsBoolean(AuthzConfig.STRIMZI_AUTHORIZATION_DELEGATE_TO_KAFKA_ACL, false);
if (delegateToKafkaACL) {
setupDelegateAuthorizer(configs);
}

configureSuperUsers(configs);

Expand All @@ -260,8 +261,8 @@ public void configure(Map<String, ?> configs) {
authzSensorKeyProducer = new KeycloakAuthorizationSensorKeyProducer("keycloak-authorizer", tokenEndpointUrl);
grantsSensorKeyProducer = new GrantsHttpSensorKeyProducer("keycloak-authorizer", tokenEndpointUrl);

if (delegateToKafkaACL) {
super.configure(configs);
if (delegate != null) {
delegate.configure(configs);
}

if (log.isDebugEnabled()) {
Expand All @@ -283,6 +284,25 @@ public void configure(Map<String, ?> configs) {
}
}

private void setupDelegateAuthorizer(Map<String, ?> configs) {
// auto-detect KRAFT mode
Object prop = configs.get("process.roles");
String processRoles = prop != null ? String.valueOf(prop) : null;
if (processRoles != null && processRoles.length() > 0) {
try {
log.debug("Detected Kraft mode ('process.roles' configured)");
delegate = new StandardAuthorizer();
log.debug("Using StandardAuthorizer (Kraft based) as a delegate");
} catch (Exception e) {
throw new ConfigException("Kraft mode detected ('process.roles' configured), but failed to instantiate org.apache.kafka.metadata.authorizer.StandardAuthorizer", e);
}
}
if (delegate == null) {
log.debug("Using AclAuthorizer (ZooKeeper based) as a delegate");
delegate = new AclAuthorizer();
}
}

private void configureHttpTimeouts(AuthzConfig config) {
connectTimeoutSeconds = ConfigUtil.getTimeoutConfigWithFallbackLookup(config, AuthzConfig.STRIMZI_AUTHORIZATION_CONNECT_TIMEOUT_SECONDS, ClientConfig.OAUTH_CONNECT_TIMEOUT_SECONDS);
readTimeoutSeconds = ConfigUtil.getTimeoutConfigWithFallbackLookup(config, AuthzConfig.STRIMZI_AUTHORIZATION_READ_TIMEOUT_SECONDS, ClientConfig.OAUTH_READ_TIMEOUT_SECONDS);
Expand Down Expand Up @@ -655,8 +675,8 @@ static List<ScopesSpec.AuthzScope> validateScopes(List<String> scopes) {

private List<AuthorizationResult> delegateIfRequested(AuthorizableRequestContext context, List<Action> actions, JsonNode authz) {
String nonAuthMessageFragment = context.principal() instanceof OAuthKafkaPrincipal ? "" : " non-oauth";
if (delegateToKafkaACL) {
List<AuthorizationResult> results = super.authorize(context, actions);
if (delegate != null) {
List<AuthorizationResult> results = delegate.authorize(context, actions);

int i = 0;
for (AuthorizationResult result: results) {
Expand Down Expand Up @@ -907,40 +927,46 @@ public void close() {
} catch (Exception e) {
log.error("Failed to shutdown the worker pool", e);
}
super.close();
if (delegate != null) {
try {
delegate.close();
} catch (Exception e) {
log.error("Failed to close the delegate authorizer", e);
}
}
}

@Override
public java.util.Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if (!delegateToKafkaACL) {
if (delegate == null) {
return serverInfo.endpoints().stream().collect(Collectors.toMap(Function.identity(), e -> future));
}
return super.start(serverInfo);
return delegate.start(serverInfo);
}

@Override
public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
if (!delegateToKafkaACL) {
if (delegate == null) {
throw new UnsupportedOperationException("Simple ACL delegation not enabled");
}
return super.createAcls(requestContext, aclBindings);
return delegate.createAcls(requestContext, aclBindings);
}

@Override
public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
if (!delegateToKafkaACL) {
if (delegate == null) {
throw new UnsupportedOperationException("Simple ACL delegation not enabled");
}
return super.deleteAcls(requestContext, aclBindingFilters);
return delegate.deleteAcls(requestContext, aclBindingFilters);
}

@Override
public Iterable<AclBinding> acls(AclBindingFilter filter) {
if (!delegateToKafkaACL) {
if (delegate == null) {
throw new UnsupportedOperationException("Simple ACL delegation not enabled");
}
return super.acls(filter);
return delegate.acls(filter);
}

private void addAuthzMetricSuccessTime(long startTimeMs) {
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-metadata</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>kafka-oauth-common</artifactId>
Expand Down

0 comments on commit 3fc6d8f

Please sign in to comment.