diff --git a/docs/configuration.md b/docs/configuration.md
index 74fdfafffe..33a4390335 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -181,6 +181,8 @@ This section lists configurations about the authorization.
| Name | Description | Range | Default |
|-------------------------------------------|--------------------------------------------------------------------------------------------------------|-------------|---------|
| kafkaEnableAuthorizationForceGroupIdCheck | Whether to enable authorization force group ID check. Note: It only support for OAuth2 authentication. | true, false | false |
+| kopAuthorizationCacheRefreshMs | If it's configured with a positive value N, each connection will cache the authorization results of PRODUCE and FETCH requests for at least N ms. It could help improve the performance when authorization is enabled, but the permission revoke will also take N ms to take effect. | 1 .. 2147483647 | 30000 |
+| kopAuthorizationCacheMaxCountPerConnection | If it's configured with a positive value N, each connection will cache at most N entries for PRODUCE or FETCH requests. If it's non-positive, the cache size will be the default value. | 1 .. 2147483647 | 100 |
## SSL encryption
diff --git a/kafka-impl/pom.xml b/kafka-impl/pom.xml
index 694dacf2ff..eaef9656d5 100644
--- a/kafka-impl/pom.xml
+++ b/kafka-impl/pom.xml
@@ -117,6 +117,12 @@
test-listener
test
+
+
+ org.awaitility
+ awaitility
+ test
+
diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java
index 25fe5f3be0..30931431d0 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java
@@ -13,12 +13,14 @@
*/
package io.streamnative.pulsar.handlers.kop;
+import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
@@ -564,6 +566,24 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private boolean skipMessagesWithoutIndex = false;
+ @FieldContext(
+ category = CATEGORY_KOP,
+ doc = "If it's configured with a positive value N, each connection will cache the authorization results "
+ + "of PRODUCE and FETCH requests for at least N ms.\n"
+ + "It could help improve the performance when authorization is enabled, but the permission revoke "
+ + "will also take N ms to take effect.\nDefault: 30000 (30 seconds)"
+ )
+ private int kopAuthorizationCacheRefreshMs = 30000;
+
+ @FieldContext(
+ category = CATEGORY_KOP,
+ doc = "If it's configured with a positive value N, each connection will cache at most N "
+ + "entries for PRODUCE or FETCH requests.\n"
+ + "Default: 100\n"
+ + "If it's non-positive, the cache size will be the default value."
+ )
+ private int kopAuthorizationCacheMaxCountPerConnection = 100;
+
private String checkAdvertisedListeners(String advertisedListeners) {
StringBuilder listenersReBuilder = new StringBuilder();
for (String listener : advertisedListeners.split(EndPoint.END_POINT_SEPARATOR)) {
@@ -629,4 +649,14 @@ public String getListeners() {
return kopAllowedNamespaces;
}
+ public Caffeine getAuthorizationCacheBuilder() {
+ if (kopAuthorizationCacheRefreshMs <= 0) {
+ return Caffeine.newBuilder().maximumSize(0);
+ } else {
+ int maximumSize = (kopAuthorizationCacheMaxCountPerConnection >= 0)
+ ? kopAuthorizationCacheMaxCountPerConnection : 100;
+ return Caffeine.newBuilder().maximumSize(maximumSize)
+ .expireAfterWrite(Duration.ofMillis(kopAuthorizationCacheRefreshMs));
+ }
+ }
}
diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java
index cf2337e9ab..eaeb61f2e4 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java
@@ -13,12 +13,15 @@
*/
package io.streamnative.pulsar.handlers.kop.security.auth;
+import com.github.benmanes.caffeine.cache.Cache;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.security.KafkaPrincipal;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -39,11 +42,18 @@ public class SimpleAclAuthorizer implements Authorizer {
private final AuthorizationService authorizationService;
private final boolean forceCheckGroupId;
+ // Cache the authorization results to avoid authorizing PRODUCE or FETCH requests each time.
+ // key is (topic, role)
+ private final Cache, Boolean> produceCache;
+ // key is (topic, role, group)
+ private final Cache, Boolean> fetchCache;
public SimpleAclAuthorizer(PulsarService pulsarService, KafkaServiceConfiguration config) {
this.pulsarService = pulsarService;
this.authorizationService = pulsarService.getBrokerService().getAuthorizationService();
this.forceCheckGroupId = config.isKafkaEnableAuthorizationForceGroupIdCheck();
+ this.produceCache = config.getAuthorizationCacheBuilder().build();
+ this.fetchCache = config.getAuthorizationCacheBuilder().build();
}
protected PulsarService getPulsarService() {
@@ -151,7 +161,16 @@ public CompletableFuture canGetTopicList(KafkaPrincipal principal, Reso
public CompletableFuture canProduceAsync(KafkaPrincipal principal, Resource resource) {
checkResourceType(resource, ResourceType.TOPIC);
TopicName topicName = TopicName.get(resource.getName());
- return authorizationService.canProduceAsync(topicName, principal.getName(), principal.getAuthenticationData());
+ final Pair key = Pair.of(topicName, principal.getName());
+ final Boolean authorized = produceCache.getIfPresent(key);
+ if (authorized != null) {
+ return CompletableFuture.completedFuture(authorized);
+ }
+ return authorizationService.canProduceAsync(topicName, principal.getName(), principal.getAuthenticationData())
+ .thenApply(__ -> {
+ produceCache.put(key, __);
+ return __;
+ });
}
@Override
@@ -161,8 +180,17 @@ public CompletableFuture canConsumeAsync(KafkaPrincipal principal, Reso
if (forceCheckGroupId && StringUtils.isBlank(principal.getGroupId())) {
return CompletableFuture.completedFuture(false);
}
+ final Triple key = Triple.of(topicName, principal.getName(), principal.getGroupId());
+ final Boolean authorized = fetchCache.getIfPresent(key);
+ if (authorized != null) {
+ return CompletableFuture.completedFuture(authorized);
+ }
return authorizationService.canConsumeAsync(
- topicName, principal.getName(), principal.getAuthenticationData(), principal.getGroupId());
+ topicName, principal.getName(), principal.getAuthenticationData(), principal.getGroupId())
+ .thenApply(__ -> {
+ fetchCache.put(key, __);
+ return __;
+ });
}
@Override
diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java
index 55af1c9ea6..989846e9e8 100644
--- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java
+++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java
@@ -19,8 +19,10 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import com.github.benmanes.caffeine.cache.Cache;
import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
import java.io.File;
@@ -31,10 +33,13 @@
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.PulsarService;
@@ -42,6 +47,7 @@
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
+import org.awaitility.Awaitility;
import org.testng.annotations.Test;
/**
@@ -283,4 +289,36 @@ public void testKopMigrationServiceConfiguration() {
assertTrue(configuration.isKopMigrationEnable());
assertEquals(port, configuration.getKopMigrationServicePort());
}
+
+ @Test(timeOut = 10000)
+ public void testKopAuthorizationCache() throws InterruptedException {
+ KafkaServiceConfiguration configuration = new KafkaServiceConfiguration();
+ configuration.setKopAuthorizationCacheRefreshMs(500);
+ configuration.setKopAuthorizationCacheMaxCountPerConnection(5);
+ Cache cache = configuration.getAuthorizationCacheBuilder().build();
+ for (int i = 0; i < 5; i++) {
+ assertNull(cache.getIfPresent(1));
+ }
+ for (int i = 0; i < 10; i++) {
+ cache.put(i, i + 100);
+ }
+ Awaitility.await().atMost(Duration.ofMillis(100)).pollInterval(Duration.ofMillis(1))
+ .until(() -> IntStream.range(0, 10).mapToObj(cache::getIfPresent)
+ .filter(Objects::nonNull).count() <= 5);
+ IntStream.range(0, 10).mapToObj(cache::getIfPresent).filter(Objects::nonNull).map(i -> i - 100).forEach(key ->
+ assertEquals(cache.getIfPresent(key), Integer.valueOf(key + 100)));
+
+ Thread.sleep(600); // wait until the cache expired
+ for (int i = 0; i < 10; i++) {
+ assertNull(cache.getIfPresent(i));
+ }
+
+ configuration.setKopAuthorizationCacheRefreshMs(0);
+ Cache cache2 = configuration.getAuthorizationCacheBuilder().build();
+ for (int i = 0; i < 5; i++) {
+ cache2.put(i, i);
+ }
+ Awaitility.await().atMost(Duration.ofMillis(10)).pollInterval(Duration.ofMillis(1))
+ .until(() -> IntStream.range(0, 5).mapToObj(cache2::getIfPresent).noneMatch(Objects::nonNull));
+ }
}
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java
index edeada2ce6..e6aa506721 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java
@@ -13,130 +13,25 @@
*/
package io.streamnative.pulsar.handlers.kop.security.auth;
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import com.google.common.collect.Sets;
-import io.jsonwebtoken.SignatureAlgorithm;
-import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import javax.crypto.SecretKey;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
-import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-/**
- * Unit test for Authorization with `entryFormat=pulsar`.
- */
-public class KafkaAuthorizationMockTest extends KopProtocolHandlerTestBase {
-
- protected static final String TENANT = "KafkaAuthorizationTest";
- protected static final String NAMESPACE = "ns1";
- private static final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
-
- protected static final String ADMIN_USER = "pass.pass";
+public class KafkaAuthorizationMockTest extends KafkaAuthorizationMockTestBase {
@BeforeClass
- @Override
- protected void setup() throws Exception {
- Properties properties = new Properties();
- properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));
-
- String adminToken = AuthTokenUtils.createToken(secretKey, ADMIN_USER, Optional.empty());
-
- conf.setSaslAllowedMechanisms(Sets.newHashSet("PLAIN"));
- conf.setKafkaMetadataTenant("internal");
- conf.setKafkaMetadataNamespace("__kafka");
- conf.setKafkaTenant(TENANT);
- conf.setKafkaNamespace(NAMESPACE);
-
- conf.setClusterName(super.configClusterName);
- conf.setAuthorizationEnabled(true);
- conf.setAuthenticationEnabled(true);
- conf.setAuthorizationAllowWildcardsMatching(true);
- conf.setAuthorizationProvider(KafkaMockAuthorizationProvider.class.getName());
- conf.setAuthenticationProviders(
- Sets.newHashSet(AuthenticationProviderToken.class.getName()));
- conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
- conf.setBrokerClientAuthenticationParameters("token:" + adminToken);
- conf.setProperties(properties);
-
- super.internalSetup();
+ public void setup() throws Exception {
+ super.setup();
}
- @AfterClass
- @Override
- protected void cleanup() throws Exception {
- super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
- .authentication(this.conf.getBrokerClientAuthenticationPlugin(),
- this.conf.getBrokerClientAuthenticationParameters()).build());
+ @AfterClass(alwaysRun = true)
+ public void cleanup() throws Exception {
+ super.cleanup();
}
- @Override
- protected void createAdmin() throws Exception {
- super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
- .authentication(this.conf.getBrokerClientAuthenticationPlugin(),
- this.conf.getBrokerClientAuthenticationParameters()).build());
- }
-
-
- @Test(timeOut = 30 * 1000)
+ @Test(timeOut = 30000)
public void testSuperUserProduceAndConsume() throws PulsarAdminException {
- String superUserToken = AuthTokenUtils.createToken(secretKey, "pass.pass", Optional.empty());
- String topic = "testSuperUserProduceAndConsumeTopic";
- String fullNewTopicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" + topic;
- KProducer kProducer = new KProducer(topic, false, "localhost", getKafkaBrokerPort(),
- TENANT + "/" + NAMESPACE, "token:" + superUserToken);
- int totalMsgs = 10;
- String messageStrPrefix = topic + "_message_";
-
- for (int i = 0; i < totalMsgs; i++) {
- String messageStr = messageStrPrefix + i;
- kProducer.getProducer().send(new ProducerRecord<>(topic, i, messageStr));
- }
- KConsumer kConsumer = new KConsumer(topic, "localhost", getKafkaBrokerPort(), false,
- TENANT + "/" + NAMESPACE, "token:" + superUserToken, "DemoKafkaOnPulsarConsumer");
- kConsumer.getConsumer().subscribe(Collections.singleton(topic));
-
- int i = 0;
- while (i < totalMsgs) {
- ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1));
- for (ConsumerRecord record : records) {
- Integer key = record.key();
- assertEquals(messageStrPrefix + key.toString(), record.value());
- i++;
- }
- }
- assertEquals(i, totalMsgs);
-
- // no more records
- ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofMillis(200));
- assertTrue(records.isEmpty());
-
- // ensure that we can list the topic
- Map> result = kConsumer.getConsumer().listTopics(Duration.ofSeconds(1));
- assertEquals(result.size(), 1);
- assertTrue(result.containsKey(topic),
- "list of topics " + result.keySet() + " does not contains " + topic);
-
- // Cleanup
- kProducer.close();
- kConsumer.close();
- admin.topics().deletePartitionedTopic(fullNewTopicName);
+ super.testSuperUserProduceAndConsume();
}
}
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTestBase.java
new file mode 100644
index 0000000000..032c538e77
--- /dev/null
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTestBase.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.pulsar.handlers.kop.security.auth;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import javax.crypto.SecretKey;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+
+/**
+ * Unit test for Authorization with `entryFormat=pulsar`.
+ */
+public class KafkaAuthorizationMockTestBase extends KopProtocolHandlerTestBase {
+
+ protected static final String TENANT = "KafkaAuthorizationTest";
+ protected static final String NAMESPACE = "ns1";
+ protected static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+ protected static final String ADMIN_USER = "pass.pass";
+ protected String authorizationProviderClassName = KafkaMockAuthorizationProvider.class.getName();
+
+ @Override
+ protected void setup() throws Exception {
+ Properties properties = new Properties();
+ properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+
+ String adminToken = AuthTokenUtils.createToken(SECRET_KEY, ADMIN_USER, Optional.empty());
+
+ conf.setSaslAllowedMechanisms(Sets.newHashSet("PLAIN"));
+ conf.setKafkaMetadataTenant("internal");
+ conf.setKafkaMetadataNamespace("__kafka");
+ conf.setKafkaTenant(TENANT);
+ conf.setKafkaNamespace(NAMESPACE);
+
+ conf.setClusterName(super.configClusterName);
+ conf.setAuthorizationEnabled(true);
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationAllowWildcardsMatching(true);
+ conf.setAuthorizationProvider(authorizationProviderClassName);
+ conf.setAuthenticationProviders(
+ Sets.newHashSet(AuthenticationProviderToken.class.getName()));
+ conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ conf.setBrokerClientAuthenticationParameters("token:" + adminToken);
+ conf.setProperties(properties);
+
+ super.internalSetup();
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(this.conf.getBrokerClientAuthenticationPlugin(),
+ this.conf.getBrokerClientAuthenticationParameters()).build());
+ }
+
+ @Override
+ protected void createAdmin() throws Exception {
+ super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(this.conf.getBrokerClientAuthenticationPlugin(),
+ this.conf.getBrokerClientAuthenticationParameters()).build());
+ }
+
+ public void testSuperUserProduceAndConsume() throws PulsarAdminException {
+ String superUserToken = AuthTokenUtils.createToken(SECRET_KEY, "pass.pass", Optional.empty());
+ String topic = "testSuperUserProduceAndConsumeTopic";
+ String fullNewTopicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" + topic;
+ KProducer kProducer = new KProducer(topic, false, "localhost", getKafkaBrokerPort(),
+ TENANT + "/" + NAMESPACE, "token:" + superUserToken);
+ int totalMsgs = 10;
+ String messageStrPrefix = topic + "_message_";
+
+ for (int i = 0; i < totalMsgs; i++) {
+ String messageStr = messageStrPrefix + i;
+ kProducer.getProducer().send(new ProducerRecord<>(topic, i, messageStr));
+ }
+ KConsumer kConsumer = new KConsumer(topic, "localhost", getKafkaBrokerPort(), false,
+ TENANT + "/" + NAMESPACE, "token:" + superUserToken, "DemoKafkaOnPulsarConsumer");
+ kConsumer.getConsumer().subscribe(Collections.singleton(topic));
+
+ int i = 0;
+ while (i < totalMsgs) {
+ ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1));
+ for (ConsumerRecord record : records) {
+ Integer key = record.key();
+ assertEquals(messageStrPrefix + key.toString(), record.value());
+ i++;
+ }
+ }
+ assertEquals(i, totalMsgs);
+
+ // no more records
+ ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofMillis(200));
+ assertTrue(records.isEmpty());
+
+ // ensure that we can list the topic
+ Map> result = kConsumer.getConsumer().listTopics(Duration.ofSeconds(1));
+ assertEquals(result.size(), 1);
+ assertTrue(result.containsKey(topic),
+ "list of topics " + result.keySet() + " does not contains " + topic);
+
+ // Cleanup
+ kProducer.close();
+ kConsumer.close();
+ admin.topics().deletePartitionedTopic(fullNewTopicName);
+ }
+}
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SlowAuthorizationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SlowAuthorizationTest.java
new file mode 100644
index 0000000000..c1b1f9202f
--- /dev/null
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SlowAuthorizationTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.pulsar.handlers.kop.security.auth;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class SlowAuthorizationTest extends KafkaAuthorizationMockTestBase {
+
+ @BeforeClass
+ public void setup() throws Exception {
+ super.authorizationProviderClassName = SlowMockAuthorizationProvider.class.getName();
+ super.setup();
+ }
+
+ @AfterClass
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Test(timeOut = 60000)
+ public void testManyMessages() throws Exception {
+ String superUserToken = AuthTokenUtils.createToken(SECRET_KEY, "normal-user", Optional.empty());
+ final String topic = "test-many-messages";
+ @Cleanup
+ final KProducer kProducer = new KProducer(topic, false, "localhost", getKafkaBrokerPort(),
+ TENANT + "/" + NAMESPACE, "token:" + superUserToken);
+ long start = System.currentTimeMillis();
+ log.info("Before send");
+ for (int i = 0; i < 1000; i++) {
+ kProducer.getProducer().send(new ProducerRecord(topic, "msg-" + i)).get();
+ }
+ log.info("After send ({} ms)", System.currentTimeMillis() - start);
+ @Cleanup
+ KConsumer kConsumer = new KConsumer(topic, "localhost", getKafkaBrokerPort(), false,
+ TENANT + "/" + NAMESPACE, "token:" + superUserToken, "DemoKafkaOnPulsarConsumer");
+ kConsumer.getConsumer().subscribe(Collections.singleton(topic));
+ int i = 0;
+ start = System.currentTimeMillis();
+ log.info("Before poll");
+ while (i < 1000) {
+ final ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1));
+ i += records.count();
+ }
+ log.info("After poll ({} ms)", System.currentTimeMillis() - start);
+ }
+
+ public static class SlowMockAuthorizationProvider extends KafkaMockAuthorizationProvider {
+
+ @Override
+ public CompletableFuture isSuperUser(String role, ServiceConfiguration serviceConfiguration) {
+ return CompletableFuture.completedFuture(role.equals("pass.pass"));
+ }
+
+ @Override
+ public CompletableFuture isSuperUser(String role, AuthenticationDataSource authenticationData,
+ ServiceConfiguration serviceConfiguration) {
+ return CompletableFuture.completedFuture(role.equals("pass.pass"));
+ }
+
+ @Override
+ public CompletableFuture canProduceAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
+ return authorizeSlowly();
+ }
+
+ @Override
+ public CompletableFuture canConsumeAsync(
+ TopicName topicName, String role, AuthenticationDataSource authenticationData, String subscription) {
+ return authorizeSlowly();
+ }
+
+ private static CompletableFuture authorizeSlowly() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ CompletableFuture roleAuthorizedAsync(String role) {
+ return CompletableFuture.completedFuture(true);
+ }
+ }
+}
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersTest.java
index d261d9c8fb..a0293e5264 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersTest.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersTest.java
@@ -103,6 +103,7 @@ protected void setup() throws Exception {
conf.setSaslAllowedMechanisms(Sets.newHashSet("OAUTHBEARER"));
conf.setKopOauth2AuthenticateCallbackHandler(OauthValidatorCallbackHandler.class.getName());
conf.setKopOauth2ConfigFile("src/test/resources/kop-handler-oauth2.properties");
+ conf.setKopAuthorizationCacheRefreshMs(0);
super.internalSetup();
}
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersWithMultiTenantTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersWithMultiTenantTest.java
index f8632a712c..ae6a92e94d 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersWithMultiTenantTest.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/SaslOAuthKopHandlersWithMultiTenantTest.java
@@ -75,6 +75,7 @@ protected void setup() throws Exception {
conf.setSaslAllowedMechanisms(Sets.newHashSet("OAUTHBEARER"));
conf.setKopOauth2AuthenticateCallbackHandler(OauthValidatorCallbackHandler.class.getName());
conf.setKopOauth2ConfigFile("src/test/resources/kop-handler-oauth2.properties");
+ conf.setKopAuthorizationCacheRefreshMs(0);
super.internalSetup();
}