Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compatibility with Kafka >= 3.8 #194

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;

import io.aiven.kafka.auth.audit.AuditorAPI;
import io.aiven.kafka.auth.audit.Session;
import io.aiven.kafka.auth.json.AivenAcl;
import io.aiven.kafka.auth.json.reader.AclJsonReader;
import io.aiven.kafka.auth.json.reader.JsonReaderException;
import io.aiven.kafka.auth.nameformatters.LegacyOperationNameFormatter;
import io.aiven.kafka.auth.nameformatters.LegacyResourceTypeNameFormatter;
import io.aiven.kafka.auth.nativeacls.AclAivenToNativeConverter;

import kafka.network.RequestChannel.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
1 change: 0 additions & 1 deletion src/main/java/io/aiven/kafka/auth/audit/Auditor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.security.auth.KafkaPrincipal;

import kafka.network.RequestChannel.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
4 changes: 1 addition & 3 deletions src/main/java/io/aiven/kafka/auth/audit/AuditorAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.resource.ResourcePattern;

import kafka.network.RequestChannel;

public interface AuditorAPI extends Configurable {
void addActivity(final RequestChannel.Session session,
void addActivity(final Session session,
final AclOperation operation,
final ResourcePattern resource,
final boolean hasAccess);
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/io/aiven/kafka/auth/audit/NoAuditor.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.resource.ResourcePattern;

import kafka.network.RequestChannel;

/**
* A no-op {@link AuditorAPI}.
*/
Expand All @@ -33,7 +31,7 @@ public NoAuditor() {
}

@Override
public void addActivity(final RequestChannel.Session session,
public void addActivity(final Session session,
final AclOperation operation,
final ResourcePattern resource,
final boolean hasAccess) {
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/io/aiven/kafka/auth/audit/Session.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024 Aiven Oy https://aiven.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.auth.audit;

import java.net.InetAddress;

import org.apache.kafka.common.security.auth.KafkaPrincipal;


public class Session {
private final KafkaPrincipal principal;
private final InetAddress clientAddress;

public Session(final KafkaPrincipal principal, final InetAddress clientAddress) {
this.principal = principal;
this.clientAddress = clientAddress;
}

public KafkaPrincipal getPrincipal() {
return principal;
}

public InetAddress getClientAddress() {
return clientAddress;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.resource.ResourcePattern;

import kafka.network.RequestChannel;
import org.slf4j.Logger;

public class UserActivityAuditor extends Auditor {
Expand All @@ -46,11 +45,11 @@ protected UserActivityAuditor(final Logger logger) {
}

@Override
protected void addActivity0(final RequestChannel.Session session,
protected void addActivity0(final Session session,
final AclOperation operation,
final ResourcePattern resource,
final boolean hasAccess) {
final AuditKey auditKey = new AuditKey(session.principal(), session.clientAddress());
final AuditKey auditKey = new AuditKey(session.getPrincipal(), session.getClientAddress());

auditStorage.compute(auditKey, (key, userActivity) -> Objects.isNull(userActivity)
? new UserActivity.UserActivityOperations()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.resource.ResourcePattern;

import kafka.network.RequestChannel;
import org.slf4j.Logger;

public class UserOperationsActivityAuditor extends Auditor {
Expand All @@ -35,7 +34,7 @@ protected UserOperationsActivityAuditor(final Logger logger) {
}

@Override
protected void addActivity0(final RequestChannel.Session session,
protected void addActivity0(final Session session,
final AclOperation operation,
final ResourcePattern resource,
final boolean hasAccess) {
Expand All @@ -46,18 +45,18 @@ protected void addActivity0(final RequestChannel.Session session,
} else {
ua = userActivity;
}
ua.addOperation(new UserOperation(session.clientAddress(), operation, resource, hasAccess));
ua.addOperation(new UserOperation(session.getClientAddress(), operation, resource, hasAccess));
return ua;
});
}

private AuditKey createAuditKey(final RequestChannel.Session session) {
private AuditKey createAuditKey(final Session session) {
final var grouping = auditorConfig.getAggregationGrouping();
switch (grouping) {
case USER:
return new AuditKey(session.principal(), null);
return new AuditKey(session.getPrincipal(), null);
case USER_AND_IP:
return new AuditKey(session.principal(), session.clientAddress());
return new AuditKey(session.getPrincipal(), session.getClientAddress());
default:
throw new IllegalArgumentException("Unknown aggregation grouping type: " + grouping);
}
Expand Down
20 changes: 9 additions & 11 deletions src/test/java/io/aiven/kafka/auth/audit/FormatterTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;

import kafka.network.RequestChannel;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class FormatterTestBase {

protected RequestChannel.Session session;
protected Session session;

protected AclOperation operation;

Expand All @@ -48,7 +46,7 @@ public class FormatterTestBase {

protected ResourcePattern anotherResource;

protected RequestChannel.Session anotherSession;
protected Session anotherSession;

protected InetAddress anotherInetAddress;

Expand All @@ -60,9 +58,9 @@ protected FormatterTestBase(final AuditorConfig.AggregationGrouping aggregationG

void setUp() throws Exception {
final KafkaPrincipal principal = new KafkaPrincipal("PRINCIPAL_TYPE", "PRINCIPAL_NAME");
session = new RequestChannel.Session(principal, InetAddress.getLocalHost());
session = new Session(principal, InetAddress.getLocalHost());
anotherInetAddress = InetAddress.getByName("192.168.0.1");
anotherSession = new RequestChannel.Session(principal, anotherInetAddress);
anotherSession = new Session(principal, anotherInetAddress);
resource =
new ResourcePattern(
ResourceType.CLUSTER,
Expand All @@ -88,20 +86,20 @@ protected void zeroOperations(final ZonedDateTime now, final String expected) {
protected void twoOperations(final ZonedDateTime now, final String expected) {
final Map<Auditor.AuditKey, UserActivity> dump = new HashMap<>();
final UserActivity userActivity = createUserActivity(now);
userActivity.addOperation(new UserOperation(session.clientAddress(), operation, resource, false));
userActivity.addOperation(new UserOperation(session.getClientAddress(), operation, resource, false));
userActivity.addOperation(
new UserOperation(session.clientAddress(), anotherOperation, anotherResource, true));
new UserOperation(session.getClientAddress(), anotherOperation, anotherResource, true));
dump.put(createAuditKey(session), userActivity);

formatAndAssert(dump, expected);
}

protected Auditor.AuditKey createAuditKey(final RequestChannel.Session session) {
protected Auditor.AuditKey createAuditKey(final Session session) {
switch (aggregationGrouping) {
case USER:
return new Auditor.AuditKey(session.principal(), null);
return new Auditor.AuditKey(session.getPrincipal(), null);
case USER_AND_IP:
return new Auditor.AuditKey(session.principal(), session.clientAddress());
return new Auditor.AuditKey(session.getPrincipal(), session.getClientAddress());
default:
throw new IllegalArgumentException("Unknown aggregation grouping: " + aggregationGrouping);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ protected void twoOperationsTwoIpAddresses(final ZonedDateTime now, final String
final Map<Auditor.AuditKey, UserActivity> dump = new HashMap<>();

final UserActivity userActivity = createUserActivity(now);
userActivity.addOperation(new UserOperation(session.clientAddress(), operation, resource, false));
userActivity.addOperation(new UserOperation(session.getClientAddress(), operation, resource, false));
dump.put(createAuditKey(session), userActivity);

final UserActivity anotherUserActivity = createUserActivity(now);
anotherUserActivity.addOperation(
new UserOperation(anotherSession.clientAddress(), anotherOperation, anotherResource, true));
new UserOperation(anotherSession.getClientAddress(), anotherOperation, anotherResource, true));
dump.put(createAuditKey(anotherSession), anotherUserActivity);

formatAndAssert(dump, expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ protected void twoOperationsTwoIpAddresses(final ZonedDateTime now, final String

final UserActivity userActivity = createUserActivity(now);
userActivity.addOperation(
new UserOperation(session.clientAddress(), operation, resource, false));
new UserOperation(session.getClientAddress(), operation, resource, false));
userActivity.addOperation(
new UserOperation(anotherSession.clientAddress(), anotherOperation, anotherResource, true));
new UserOperation(anotherSession.getClientAddress(), anotherOperation, anotherResource, true));
dump.put(createAuditKey(session), userActivity);

formatAndAssert(dump, expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;

import kafka.network.RequestChannel.Session;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;

import kafka.network.RequestChannel.Session;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -115,16 +114,16 @@ void shouldAggregateOperationsForSameUser() throws Exception {
2,
cast(auditor.auditStorage.get(
new Auditor.AuditKey(
session.principal(),
session.clientAddress())
session.getPrincipal(),
session.getClientAddress())
), UserActivity.UserActivityOperations.class).operations.size()
);
assertEquals(
1,
cast(auditor.auditStorage.get(
new Auditor.AuditKey(
anotherSession.principal(),
anotherSession.clientAddress())
anotherSession.getPrincipal(),
anotherSession.getClientAddress())
), UserActivity.UserActivityOperations.class).operations.size()
);
auditor.dump();
Expand Down Expand Up @@ -152,7 +151,7 @@ void shouldAggregateOperationsForSameUserAndPrincipalGrouping() throws Exception
2,
cast(auditor.auditStorage.get(
new Auditor.AuditKey(
session.principal(),
session.getPrincipal(),
null)
), UserActivity.UserActivityOperationsGropedByIP.class).operations.size()
);
Expand Down
Loading