Skip to content

Commit

Permalink
Introduce a new authorizer class KeycloakAuthorizer
Browse files Browse the repository at this point in the history
+ Add authorization tests for Kraft mode

Signed-off-by: Marko Strukelj <[email protected]>
  • Loading branch information
mstruk committed Apr 12, 2023
1 parent e81749d commit 1179723
Show file tree
Hide file tree
Showing 42 changed files with 2,686 additions and 194 deletions.
8 changes: 4 additions & 4 deletions .travis/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,24 @@ elif [[ "$arch" != 'ppc64le' ]]; then
exitIfError

clearDockerEnv
mvn -e -V -B clean install -f testsuite -Pkafka-3_2_3
mvn -e -V -B clean install -f testsuite -Pkafka-3_2_3 -DfailIfNoTests=false -Dtest=\!KeycloakRaftAuthorizationTests
EXIT=$?
exitIfError

# Excluded by default to not exceed Travis job timeout
if [ "SKIP_DISABLED" == "false" ]; then
clearDockerEnv
mvn -e -V -B clean install -f testsuite -Pkafka-3_1_2
mvn -e -V -B clean install -f testsuite -Pkafka-3_1_2 -DfailIfNoTests=false -Dtest=\!KeycloakRaftAuthorizationTests
EXIT=$?
exitIfError

clearDockerEnv
mvn -e -V -B clean install -f testsuite -Pkafka-3_0_0
mvn -e -V -B clean install -f testsuite -Pkafka-3_0_0 -DfailIfNoTests=false -Dtest=\!KeycloakRaftAuthorizationTests
EXIT=$?
exitIfError

clearDockerEnv
mvn -e -V -B clean install -f testsuite -Pkafka-2_8_1
mvn -e -V -B clean install -f testsuite -Pkafka-2_8_1 -DfailIfNoTests=false -Dtest=\!KeycloakRaftAuthorizationTests
EXIT=$?
exitIfError
fi
Expand Down
81 changes: 81 additions & 0 deletions examples/docker/kafka-oauth-strimzi/compose-kraft.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
version: '3.5'

services:

#################################### KAFKA BROKER ####################################
kafka:
image: strimzi/example-kafka
build: kafka-oauth-strimzi/kafka/target
container_name: kafka
ports:
- 9091:9091
- 9092:9092

# javaagent debug port
#- 5005:5005
command:
- /bin/bash
- -c
- cd /opt/kafka && ./start.sh --kraft

environment:

# Java Debug
#KAFKA_DEBUG: y
#DEBUG_SUSPEND_FLAG: y
#JAVA_DEBUG_PORT: 5005

#
# KAFKA Configuration
#
LOG_DIR: /home/kafka/logs
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_NODE_ID: "1"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9091"
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL: PLAIN

KAFKA_LISTENERS: "CONTROLLER://kafka:9091,CLIENT://kafka:9092"
KAFKA_ADVERTISED_LISTENERS: "CLIENT://kafka:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT"

KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: OAUTHBEARER

KAFKA_PRINCIPAL_BUILDER_CLASS: "io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder"

KAFKA_LISTENER_NAME_CONTROLLER_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-password\" user_admin=\"admin-password\" user_bobby=\"bobby-secret\" ;"

KAFKA_LISTENER_NAME_CLIENT_SASL_ENABLED_MECHANISMS: OAUTHBEARER
KAFKA_LISTENER_NAME_CLIENT_OAUTHBEARER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
KAFKA_LISTENER_NAME_CLIENT_OAUTHBEARER_SASL_LOGIN_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
KAFKA_LISTENER_NAME_CLIENT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler

KAFKA_SUPER_USERS: "User:admin,User:service-account-kafka-broker"

KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1


#
# Strimzi OAuth Configuration
#

# Authentication config
OAUTH_CLIENT_ID: "kafka-broker"
OAUTH_CLIENT_SECRET: "kafka-broker-secret"
OAUTH_TOKEN_ENDPOINT_URI: "http://${KEYCLOAK_HOST:-keycloak}:8080/auth/realms/${REALM:-demo}/protocol/openid-connect/token"

# Validation config
OAUTH_VALID_ISSUER_URI: "http://${KEYCLOAK_HOST:-keycloak}:8080/auth/realms/${REALM:-demo}"
OAUTH_JWKS_ENDPOINT_URI: "http://${KEYCLOAK_HOST:-keycloak}:8080/auth/realms/${REALM:-demo}/protocol/openid-connect/certs"
#OAUTH_INTROSPECTION_ENDPOINT_URI: "http://${KEYCLOAK_HOST:-keycloak}:8080/auth/realms/${REALM:-demo}/protocol/openid-connect/token/introspect"


# username extraction from JWT token claim
OAUTH_USERNAME_CLAIM: preferred_username
OAUTH_CONNECT_TIMEOUT_SECONDS: "20"

# For start.sh script to know where the keycloak is listening
KEYCLOAK_HOST: ${KEYCLOAK_HOST:-keycloak}
REALM: ${REALM:-demo}
70 changes: 49 additions & 21 deletions examples/docker/kafka-oauth-strimzi/kafka/simple_kafka_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,57 @@ done
#
# Generate output
#
echo "#"
echo "# strimzi.properties"
echo "#"

echo broker.id=`pop_value broker.id 0`
echo num.network.threads=`pop_value num.network.threads 3`
echo num.io.threads=`pop_value num.io.threads 8`
echo socket.send.buffer.bytes=`pop_value socket.send.buffer.bytes 102400`
echo socket.receive.buffer.bytes=`pop_value socket.receive.buffer.bytes 102400`
echo socket.request.max.bytes=`pop_value socket.request.max.bytes 104857600`
echo log.dirs=`pop_value log.dirs /tmp/kafka-logs`
echo num.partitions=`pop_value num.partitions 1`
echo num.recovery.threads.per.data.dir=`pop_value num.recovery.threads.per.data.dir 1`
echo offsets.topic.replication.factor=`pop_value offsets.topic.replication.factor 1`
echo transaction.state.log.replication.factor=`pop_value transaction.state.log.replication.factor 1`
echo transaction.state.log.min.isr=`pop_value transaction.state.log.min.isr 1`
echo log.retention.hours=`pop_value log.retention.hours 168`
echo log.segment.bytes=`pop_value log.segment.bytes 1073741824`
echo log.retention.check.interval.ms=`pop_value log.retention.check.interval.ms 300000`
echo zookeeper.connect=`pop_value zookeeper.connect localhost:2181`
echo zookeeper.connection.timeout.ms=`pop_value zookeeper.connection.timeout.ms 6000`
echo group.initial.rebalance.delay.ms=`pop_value group.initial.rebalance.delay.ms 0`
if [[ "$1" == "--kraft" ]]; then
#
# Output kraft version of server.properties
#
echo "#"
echo "# strimzi.properties (kraft)"
echo "#"

echo process.roles=`pop_value process.roles broker,controller`
echo node.id=`pop_value node.id 1`
echo num.network.threads=`pop_value num.network.threads 3`
echo num.io.threads=`pop_value num.io.threads 8`
echo socket.send.buffer.bytes=`pop_value socket.send.buffer.bytes 102400`
echo socket.receive.buffer.bytes=`pop_value socket.receive.buffer.bytes 102400`
echo socket.request.max.bytes=`pop_value socket.request.max.bytes 104857600`
echo log.dirs=`pop_value log.dirs /tmp/kraft-combined-logs`
echo num.partitions=`pop_value num.partitions 1`
echo num.recovery.threads.per.data.dir=`pop_value num.recovery.threads.per.data.dir 1`
echo offsets.topic.replication.factor=`pop_value offsets.topic.replication.factor 1`
echo transaction.state.log.replication.factor=`pop_value transaction.state.log.replication.factor 1`
echo transaction.state.log.min.isr=`pop_value transaction.state.log.min.isr 1`
echo log.retention.hours=`pop_value log.retention.hours 168`
echo log.segment.bytes=`pop_value log.segment.bytes 1073741824`
echo log.retention.check.interval.ms=`pop_value log.retention.check.interval.ms 300000`

elif [[ "$1" == "" ]]; then
echo "#"
echo "# strimzi.properties"
echo "#"

echo broker.id=`pop_value broker.id 0`
echo num.network.threads=`pop_value num.network.threads 3`
echo num.io.threads=`pop_value num.io.threads 8`
echo socket.send.buffer.bytes=`pop_value socket.send.buffer.bytes 102400`
echo socket.receive.buffer.bytes=`pop_value socket.receive.buffer.bytes 102400`
echo socket.request.max.bytes=`pop_value socket.request.max.bytes 104857600`
echo log.dirs=`pop_value log.dirs /tmp/kafka-logs`
echo num.partitions=`pop_value num.partitions 1`
echo num.recovery.threads.per.data.dir=`pop_value num.recovery.threads.per.data.dir 1`
echo offsets.topic.replication.factor=`pop_value offsets.topic.replication.factor 1`
echo transaction.state.log.replication.factor=`pop_value transaction.state.log.replication.factor 1`
echo transaction.state.log.min.isr=`pop_value transaction.state.log.min.isr 1`
echo log.retention.hours=`pop_value log.retention.hours 168`
echo log.segment.bytes=`pop_value log.segment.bytes 1073741824`
echo log.retention.check.interval.ms=`pop_value log.retention.check.interval.ms 300000`
echo group.initial.rebalance.delay.ms=`pop_value group.initial.rebalance.delay.ms 0`
else
echo "Unsupported argument: $1"
exit 1
fi
#
# Add what remains of KAFKA_* env vars
#
Expand Down
7 changes: 6 additions & 1 deletion examples/docker/kafka-oauth-strimzi/kafka/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ wait_for_url "$URI/realms/${REALM:-demo}" "Waiting for realm '${REALM}' to be av

if [ "$SERVER_PROPERTIES_FILE" == "" ]; then
echo "Generating a new strimzi.properties file using ENV vars"
./simple_kafka_config.sh | tee /tmp/strimzi.properties
./simple_kafka_config.sh $1 | tee /tmp/strimzi.properties
else
echo "Using provided server.properties file: $SERVER_PROPERTIES_FILE"
cp $SERVER_PROPERTIES_FILE /tmp/strimzi.properties
fi

if [[ "$1" == "--kraft" ]]; then
KAFKA_CLUSTER_ID="$(/opt/kafka/bin/kafka-storage.sh random-uuid)"
/opt/kafka/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /tmp/strimzi.properties
fi

# add Strimzi kafka-oauth-* jars and their dependencies to classpath
export CLASSPATH="/opt/kafka/libs/strimzi/*:$CLASSPATH"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package io.strimzi.kafka.oauth.common;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Set;
Expand All @@ -24,18 +25,18 @@
public interface BearerTokenWithPayload extends OAuthBearerToken {

/**
* Get the usage dependent object previously associated with this instance by calling {@link BearerTokenWithPayload#setPayload(Object)}
* Get the usage dependent object previously associated with this instance by calling {@link BearerTokenWithPayload#setPayload(com.fasterxml.jackson.databind.JsonNode)}
*
* @return The associated object
*/
Object getPayload();
JsonNode getPayload();

/**
* Associate a usage dependent object with this instance
*
* @param payload The object to associate with this instance
*/
void setPayload(Object payload);
void setPayload(JsonNode payload);

/**
* Get groups associated with this token (principal).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class Config {

private Map<String, ?> defaults;

Config delegate;

/**
* Use this construtor if you only want to lookup configuration in system properties and env
* without any default configuration.
Expand All @@ -104,6 +106,15 @@ public Config(Properties p) {
));
}

/**
* Use this constructor if you want to wrap another Config object and override some functionality that way
*
* @param delegate The Config object to delegate to
*/
public Config(Config delegate) {
this.delegate = delegate;
}

/**
* Use this constructor to provide default values in case some configuration is not set through system properties or ENV.
*
Expand Down Expand Up @@ -140,6 +151,9 @@ public void validate() {}
* @return Configuration value for specified key
*/
public String getValue(String key, String fallback) {
if (delegate != null) {
return delegate.getValue(key, fallback);
}

// try system properties first
String result = System.getProperty(key, null);
Expand Down Expand Up @@ -174,7 +188,7 @@ public String getValue(String key, String fallback) {
* @param key Config key
* @return Config value
*/
public String getValue(String key) {
public final String getValue(String key) {
return getValue(key, null);
}

Expand All @@ -185,7 +199,7 @@ public String getValue(String key) {
* @param fallback Fallback value
* @return Config value
*/
public int getValueAsInt(String key, int fallback) {
public final int getValueAsInt(String key, int fallback) {
String result = getValue(key);
return result != null ? Integer.parseInt(result) : fallback;
}
Expand All @@ -197,7 +211,7 @@ public int getValueAsInt(String key, int fallback) {
* @param fallback Fallback value
* @return Config value
*/
public long getValueAsLong(String key, long fallback) {
public final long getValueAsLong(String key, long fallback) {
String result = getValue(key);
return result != null ? Long.parseLong(result) : fallback;
}
Expand All @@ -211,7 +225,7 @@ public long getValueAsLong(String key, long fallback) {
* @param fallback Fallback value
* @return Config value
*/
public boolean getValueAsBoolean(String key, boolean fallback) {
public final boolean getValueAsBoolean(String key, boolean fallback) {
String result = getValue(key);
try {
return result != null ? isTrue(result) : fallback;
Expand All @@ -226,7 +240,7 @@ public boolean getValueAsBoolean(String key, boolean fallback) {
* @param key Config key
* @return Config value
*/
public URI getValueAsURI(String key) {
public final URI getValueAsURI(String key) {
String result = getValue(key);
try {
return URI.create(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.ByteArrayOutputStream;
Expand All @@ -14,6 +15,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -166,4 +168,27 @@ public static List<String> asListOfString(JsonNode arrayOrString, String delimit

return result;
}

/**
* Set an array attribute on a JSON object to a collection of Strings
*
* @param object The target JSON object
* @param attrName An attribute name
* @param elements The collection of strings
* @return Newly create ArrayNode
*/
public static ArrayNode setArrayOfStringsIfNotNull(JsonNode object, String attrName, Collection<String> elements) {
if (elements == null) {
return null;
}
if (!(object instanceof ObjectNode)) {
throw new IllegalArgumentException("Unexpected JSON Node type (not ObjectNode): " + object.getClass());
}

ArrayNode list = ((ObjectNode) object).putArray(attrName);
for (String g: elements) {
list.add(g);
}
return list;
}
}
Loading

0 comments on commit 1179723

Please sign in to comment.