Skip to content

Commit

Permalink
[feat][proxy] PIP 97: Implement for ProxyConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljmarshall committed Jan 19, 2023
1 parent 86205a9 commit 6db2232
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -311,13 +313,9 @@ protected static boolean isTlsChannel(Channel channel) {
return channel.pipeline().get(ServiceChannelInitializer.TLS_HANDLER) != null;
}

private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
private synchronized void completeConnect() throws PulsarClientException {
Supplier<ClientCnx> clientCnxSupplier;
if (service.getConfiguration().isAuthenticationEnabled()) {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
clientAuthData, clientAuthMethod, protocolVersionToAdvertise,
service.getConfiguration().isForwardAuthorizationCredentials(), this);
Expand Down Expand Up @@ -423,31 +421,65 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData)
throws Exception {
AuthData brokerData = authState.authenticate(clientData);
// authentication has completed, will send newConnected command.
if (authState.isComplete()) {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
CompletableFuture<AuthData> authChallengeFuture = authState.authenticateAsync(clientData);
if (authChallengeFuture.isDone()) {
if (!authChallengeFuture.isCompletedExceptionally()) {
authChallengeSuccessCallback(authChallengeFuture.get());
} else {
try {
authChallengeFuture.get();
} catch (ExecutionException e) {
authenticationFailedCallback(e.getCause());
}
}
} else {
state = State.Connecting;
authChallengeFuture.whenCompleteAsync((authChallenge, throwable) -> {
if (throwable == null) {
authChallengeSuccessCallback(authChallenge);
} else {
authenticationFailedCallback(throwable);
}
}, ctx.executor());
}
}

protected void authenticationFailedCallback(Throwable t) {
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, t);
final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate");
writeAndFlushAndClose(msg);
}

// Always run in this class's event loop.
protected void authChallengeSuccessCallback(AuthData authChallenge) {
try {
// authentication has completed, will send newConnected command.
if (authChallenge == null) {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}

// First connection
if (this.connectionPool == null || state == State.Connecting) {
// authentication has completed, will send newConnected command.
completeConnect(clientData);
// First connection
if (this.connectionPool == null || state == State.Connecting) {
// authentication has completed, will send newConnected command.
completeConnect();
}
return;
}
return;
}

// auth not complete, continue auth with client side.
final ByteBuf msg = Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise);
writeAndFlush(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
// auth not complete, continue auth with client side.
final ByteBuf msg = Commands.newAuthChallenge(authMethod, authChallenge, protocolVersionToAdvertise);
writeAndFlush(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
}
state = State.Connecting;
} catch (Exception e) {
authenticationFailedCallback(e);
}
state = State.Connecting;
}

@Override
Expand Down Expand Up @@ -479,7 +511,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),

// authn not enabled, complete
if (!service.getConfiguration().isAuthenticationEnabled()) {
completeConnect(null);
completeConnect();
return;
}

Expand All @@ -493,6 +525,14 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
authMethod = "none";
}

if (service.getConfiguration().isForwardAuthorizationCredentials()) {
// We store the first clientData here. Before this commit, we stored the last clientData.
// Since this only works when forwarding single staged authentication, first == last is true.
// Here is an issue to fix the protocol: https://github.com/apache/pulsar/issues/19291.
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}

authenticationProvider = service
.getAuthenticationService()
.getAuthenticationProvider(authMethod);
Expand All @@ -504,7 +544,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
.orElseThrow(() ->
new AuthenticationException("No anonymous role, and no authentication provider configured"));

completeConnect(clientData);
completeConnect();
return;
}

Expand All @@ -518,9 +558,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
doAuthentication(clientData);
} catch (Exception e) {
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate");
writeAndFlushAndClose(msg);
authenticationFailedCallback(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import javax.naming.AuthenticationException;

Expand Down Expand Up @@ -136,7 +137,7 @@ public String getAuthMethodName() {
}

@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) {
String commandData = null;
if (authData.hasDataFromCommand()) {
commandData = authData.getCommandData();
Expand All @@ -150,9 +151,12 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
long currentTimeInMillis = System.currentTimeMillis();
if (expiryTimeInMillis < currentTimeInMillis) {
log.warn("Auth failed due to timeout");
throw new AuthenticationException("Authentication data has been expired");
return CompletableFuture
.failedFuture(new AuthenticationException("Authentication data has been expired"));
}
return element.get("entityType").getAsString();
final String result = element.get("entityType").getAsString();
// Run in another thread to attempt to test the async logic
return CompletableFuture.supplyAsync(() -> result);
}
}

Expand Down

0 comments on commit 6db2232

Please sign in to comment.