diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 1278109aded1e..c7326ab056019 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -37,6 +37,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -311,13 +313,9 @@ protected static boolean isTlsChannel(Channel channel) { return channel.pipeline().get(ServiceChannelInitializer.TLS_HANDLER) != null; } - private synchronized void completeConnect(AuthData clientData) throws PulsarClientException { + private synchronized void completeConnect() throws PulsarClientException { Supplier clientCnxSupplier; if (service.getConfiguration().isAuthenticationEnabled()) { - if (service.getConfiguration().isForwardAuthorizationCredentials()) { - this.clientAuthData = clientData; - this.clientAuthMethod = authMethod; - } clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersionToAdvertise, service.getConfiguration().isForwardAuthorizationCredentials(), this); @@ -423,31 +421,65 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec // According to auth result, send newConnected or newAuthChallenge command. private void doAuthentication(AuthData clientData) throws Exception { - AuthData brokerData = authState.authenticate(clientData); - // authentication has completed, will send newConnected command. - if (authState.isComplete()) { - clientAuthRole = authState.getAuthRole(); - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Client successfully authenticated with {} role {}", - remoteAddress, authMethod, clientAuthRole); + CompletableFuture authChallengeFuture = authState.authenticateAsync(clientData); + if (authChallengeFuture.isDone()) { + if (!authChallengeFuture.isCompletedExceptionally()) { + authChallengeSuccessCallback(authChallengeFuture.get()); + } else { + try { + authChallengeFuture.get(); + } catch (ExecutionException e) { + authenticationFailedCallback(e.getCause()); + } } + } else { + state = State.Connecting; + authChallengeFuture.whenCompleteAsync((authChallenge, throwable) -> { + if (throwable == null) { + authChallengeSuccessCallback(authChallenge); + } else { + authenticationFailedCallback(throwable); + } + }, ctx.executor()); + } + } + + protected void authenticationFailedCallback(Throwable t) { + LOG.warn("[{}] Unable to authenticate: ", remoteAddress, t); + final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"); + writeAndFlushAndClose(msg); + } + + // Always run in this class's event loop. + protected void authChallengeSuccessCallback(AuthData authChallenge) { + try { + // authentication has completed, will send newConnected command. + if (authChallenge == null) { + clientAuthRole = authState.getAuthRole(); + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Client successfully authenticated with {} role {}", + remoteAddress, authMethod, clientAuthRole); + } - // First connection - if (this.connectionPool == null || state == State.Connecting) { - // authentication has completed, will send newConnected command. - completeConnect(clientData); + // First connection + if (this.connectionPool == null || state == State.Connecting) { + // authentication has completed, will send newConnected command. + completeConnect(); + } + return; } - return; - } - // auth not complete, continue auth with client side. - final ByteBuf msg = Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise); - writeAndFlush(msg); - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Authentication in progress client by method {}.", - remoteAddress, authMethod); + // auth not complete, continue auth with client side. + final ByteBuf msg = Commands.newAuthChallenge(authMethod, authChallenge, protocolVersionToAdvertise); + writeAndFlush(msg); + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Authentication in progress client by method {}.", + remoteAddress, authMethod); + } + state = State.Connecting; + } catch (Exception e) { + authenticationFailedCallback(e); } - state = State.Connecting; } @Override @@ -479,7 +511,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), // authn not enabled, complete if (!service.getConfiguration().isAuthenticationEnabled()) { - completeConnect(null); + completeConnect(); return; } @@ -493,6 +525,14 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), authMethod = "none"; } + if (service.getConfiguration().isForwardAuthorizationCredentials()) { + // We store the first clientData here. Before this commit, we stored the last clientData. + // Since this only works when forwarding single staged authentication, first == last is true. + // Here is an issue to fix the protocol: https://github.com/apache/pulsar/issues/19291. + this.clientAuthData = clientData; + this.clientAuthMethod = authMethod; + } + authenticationProvider = service .getAuthenticationService() .getAuthenticationProvider(authMethod); @@ -504,7 +544,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), .orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured")); - completeConnect(clientData); + completeConnect(); return; } @@ -518,9 +558,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession); doAuthentication(clientData); } catch (Exception e) { - LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e); - final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"); - writeAndFlushAndClose(msg); + authenticationFailedCallback(e); } } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index eea5c26e66728..8229d929ee5e3 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -31,6 +31,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import javax.naming.AuthenticationException; @@ -136,7 +137,7 @@ public String getAuthMethodName() { } @Override - public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { + public CompletableFuture authenticateAsync(AuthenticationDataSource authData) { String commandData = null; if (authData.hasDataFromCommand()) { commandData = authData.getCommandData(); @@ -150,9 +151,12 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat long currentTimeInMillis = System.currentTimeMillis(); if (expiryTimeInMillis < currentTimeInMillis) { log.warn("Auth failed due to timeout"); - throw new AuthenticationException("Authentication data has been expired"); + return CompletableFuture + .failedFuture(new AuthenticationException("Authentication data has been expired")); } - return element.get("entityType").getAsString(); + final String result = element.get("entityType").getAsString(); + // Run in another thread to attempt to test the async logic + return CompletableFuture.supplyAsync(() -> result); } }