diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryManager.java index fb67f911be..c24371f99d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryManager.java @@ -38,6 +38,8 @@ import java.util.Base64; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.naming.AuthenticationException; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -99,8 +101,11 @@ public String authenticate(FullHttpRequest request) throws SchemaStorageExceptio throw new SchemaStorageException("Pulsar is not configured for Token auth"); } try { + AuthData authData = AuthData.of(password.getBytes(StandardCharsets.UTF_8)); final AuthenticationState authState = authenticationProvider - .newAuthState(AuthData.of(password.getBytes(StandardCharsets.UTF_8)), null, null); + .newAuthState(authData, null, null); + // TODO: Use the configurable timeout + authState.authenticateAsync(authData).get(10, TimeUnit.SECONDS); final String role = authState.getAuthRole(); final String tenant; @@ -118,7 +123,7 @@ public String authenticate(FullHttpRequest request) throws SchemaStorageExceptio performAuthorizationValidation(username, role, tenant); return tenant; - } catch (AuthenticationException err) { + } catch (ExecutionException | InterruptedException | TimeoutException | AuthenticationException err) { throw new SchemaStorageException(err); }