From 388938ef41360e0c49f6c8c929884af49b36c064 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 29 Sep 2022 18:48:38 +0800 Subject: [PATCH] [fix][proxy] Fix refresh client auth (#17831) * [fix][proxy] Fix refresh client auth Signed-off-by: Zixuan Liu * Fix style Signed-off-by: Zixuan Liu (cherry picked from commit c952f3c9f891f85ff4b6cee6e28b6f68db3b5bcd) Signed-off-by: Zixuan Liu --- .../apache/pulsar/client/impl/ClientCnx.java | 17 +- .../pulsar/client/impl/ConnectionPool.java | 8 + pulsar-proxy/pom.xml | 5 + .../pulsar/proxy/server/ProxyClientCnx.java | 77 ++++++-- .../pulsar/proxy/server/ProxyConnection.java | 73 +++++-- .../proxy/server/ProxyRefreshAuthTest.java | 186 ++++++++++++++++++ 6 files changed, 338 insertions(+), 28 deletions(-) create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 7fbe1faa38faa..4ce291c9d0038 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -100,7 +100,7 @@ public class ClientCnx extends PulsarHandler { protected final Authentication authentication; - private State state; + protected State state; @Getter private final ConcurrentLongHashMap> pendingRequests = @@ -145,7 +145,7 @@ public class ClientCnx extends PulsarHandler { private final int maxNumberOfRejectedRequestPerConnection; private final int rejectedRequestResetTimeSec = 60; - private final int protocolVersion; + protected final int protocolVersion; private final long operationTimeoutMs; protected String proxyToTargetBrokerAddress = null; @@ -161,7 +161,10 @@ public class ClientCnx extends PulsarHandler { protected AuthenticationDataProvider authenticationDataProvider; private TransactionBufferHandler transactionBufferHandler; - enum State { + @Getter + private long lastDisconnectedTimestamp; + + protected enum State { None, SentConnectFrame, Ready, Failed, Connecting } @@ -265,6 +268,7 @@ protected ByteBuf newConnectCommand() throws Exception { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); + lastDisconnectedTimestamp = System.currentTimeMillis(); log.info("{} Disconnected", ctx.channel()); if (!connectionFuture.isDone()) { connectionFuture.completeExceptionally(new PulsarClientException("Connection already closed")); @@ -1160,6 +1164,13 @@ public void close() { } } + protected void closeWithException(Throwable e) { + if (ctx != null) { + connectionFuture.completeExceptionally(e); + ctx.close(); + } + } + private void checkRequestTimeout() { while (!requestTimeoutQueue.isEmpty()) { RequestTime request = requestTimeoutQueue.peek(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 1e3331f66c22f..753354e49cea6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -33,15 +33,18 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL; @@ -355,6 +358,11 @@ int getPoolSize() { return pool.values().stream().mapToInt(Map::size).sum(); } + public Set> getConnections() { + return Collections.unmodifiableSet( + pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet())); + } + private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class); } diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml index a8159e48f207d..80699e1b0948c 100644 --- a/pulsar-proxy/pom.xml +++ b/pulsar-proxy/pom.xml @@ -179,6 +179,11 @@ ipaddress ${seancfoley.ipaddress.version} + + org.awaitility + awaitility + test + diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java index 50a77d33683b4..283b835fff54f 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java @@ -18,30 +18,35 @@ */ package org.apache.pulsar.proxy.server; +import static com.google.common.base.Preconditions.checkArgument; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; +import java.util.Arrays; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.AuthData; +import org.apache.pulsar.common.api.proto.CommandAuthChallenge; import org.apache.pulsar.common.protocol.Commands; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +@Slf4j public class ProxyClientCnx extends ClientCnx { - - String clientAuthRole; - AuthData clientAuthData; - String clientAuthMethod; - int protocolVersion; + private final boolean forwardClientAuthData; + private final String clientAuthMethod; + private final String clientAuthRole; + private final AuthData clientAuthData; + private final ProxyConnection proxyConnection; public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole, - AuthData clientAuthData, String clientAuthMethod, int protocolVersion) { - super(conf, eventLoopGroup); + AuthData clientAuthData, String clientAuthMethod, int protocolVersion, + boolean forwardClientAuthData, ProxyConnection proxyConnection) { + super(conf, eventLoopGroup, protocolVersion); this.clientAuthRole = clientAuthRole; this.clientAuthData = clientAuthData; this.clientAuthMethod = clientAuthMethod; - this.protocolVersion = protocolVersion; + this.forwardClientAuthData = forwardClientAuthData; + this.proxyConnection = proxyConnection; } @Override @@ -54,10 +59,54 @@ protected ByteBuf newConnectCommand() throws Exception { authenticationDataProvider = authentication.getAuthData(remoteHostName); AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); - return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, - PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, - clientAuthMethod); + return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, + PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, + clientAuthMethod); } - private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class); + @Override + protected void handleAuthChallenge(CommandAuthChallenge authChallenge) { + checkArgument(authChallenge.hasChallenge()); + checkArgument(authChallenge.getChallenge().hasAuthData()); + + boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData()); + if (!forwardClientAuthData || !isRefresh) { + super.handleAuthChallenge(authChallenge); + return; + } + + try { + if (log.isDebugEnabled()) { + log.debug("Proxy {} request to refresh the original client authentication data for " + + "the proxy client {}", proxyConnection.ctx().channel(), ctx.channel()); + } + + proxyConnection.ctx().writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA, + protocolVersion)) + .addListener(writeFuture -> { + if (writeFuture.isSuccess()) { + if (log.isDebugEnabled()) { + log.debug("Proxy {} sent the auth challenge to original client to refresh credentials " + + "with method {} for the proxy client {}", + proxyConnection.ctx().channel(), clientAuthMethod, ctx.channel()); + } + } else { + log.error("Failed to send the auth challenge to original client by the proxy {} " + + "for the proxy client {}", + proxyConnection.ctx().channel(), + ctx.channel(), + writeFuture.cause()); + closeWithException(writeFuture.cause()); + } + }); + + if (state == State.SentConnectFrame) { + state = State.Connecting; + } + } catch (Exception e) { + log.error("Failed to send the auth challenge to origin client by the proxy {} for the proxy client {}", + proxyConnection.ctx().channel(), ctx.channel(), e); + closeWithException(e); + } + } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 6ae0ac5e02deb..a01e3bce02d57 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -41,6 +41,7 @@ import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; import lombok.Getter; +import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; @@ -266,12 +267,11 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie this.clientAuthData = clientData; this.clientAuthMethod = authMethod; } - clientCnxSupplier = - () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, - clientAuthMethod, protocolVersionToAdvertise); + clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, + clientAuthData, clientAuthMethod, protocolVersionToAdvertise, + service.getConfiguration().isForwardAuthorizationCredentials(), this); } else { - clientCnxSupplier = - () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise); + clientCnxSupplier = () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise); } if (this.connectionPool == null) { @@ -372,16 +372,22 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec } // According to auth result, send newConnected or newAuthChallenge command. - private void doAuthentication(AuthData clientData) throws Exception { + private void doAuthentication(AuthData clientData) + throws Exception { AuthData brokerData = authState.authenticate(clientData); // authentication has completed, will send newConnected command. if (authState.isComplete()) { clientAuthRole = authState.getAuthRole(); if (LOG.isDebugEnabled()) { LOG.debug("[{}] Client successfully authenticated with {} role {}", - remoteAddress, authMethod, clientAuthRole); + remoteAddress, authMethod, clientAuthRole); + } + + // First connection + if (this.connectionPool == null || state == State.Connecting) { + // authentication has completed, will send newConnected command. + completeConnect(clientData); } - completeConnect(clientData); return; } @@ -390,7 +396,7 @@ private void doAuthentication(AuthData clientData) throws Exception { .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); if (LOG.isDebugEnabled()) { LOG.debug("[{}] Authentication in progress client by method {}.", - remoteAddress, authMethod); + remoteAddress, authMethod); } state = State.Connecting; } @@ -472,18 +478,63 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), @Override protected void handleAuthResponse(CommandAuthResponse authResponse) { - checkArgument(state == State.Connecting); checkArgument(authResponse.hasResponse()); checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName()); if (LOG.isDebugEnabled()) { LOG.debug("Received AuthResponse from {}, auth method: {}", - remoteAddress, authResponse.getResponse().getAuthMethodName()); + remoteAddress, authResponse.getResponse().getAuthMethodName()); } try { AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData()); doAuthentication(clientData); + if (service.getConfiguration().isForwardAuthorizationCredentials() + && connectionPool != null && state == State.ProxyLookupRequests) { + connectionPool.getConnections().forEach(toBrokerCnxFuture -> { + String clientVersion; + if (authResponse.hasClientVersion()) { + clientVersion = authResponse.getClientVersion(); + } else { + clientVersion = PulsarVersion.getVersion(); + } + int protocolVersion; + if (authResponse.hasProtocolVersion()) { + protocolVersion = authResponse.getProtocolVersion(); + } else { + protocolVersion = Commands.getCurrentProtocolVersion(); + } + + ByteBuf cmd = + Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion); + toBrokerCnxFuture.thenAccept(toBrokerCnx -> toBrokerCnx.ctx().writeAndFlush(cmd) + .addListener(writeFuture -> { + if (writeFuture.isSuccess()) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} authentication is refreshed successfully by {}, " + + "auth method: {} ", + toBrokerCnx.ctx().channel(), ctx.channel(), clientAuthMethod); + } + } else { + LOG.error("Failed to forward the auth response " + + "from the proxy to the broker through the proxy client, " + + "proxy: {}, proxy client: {}", + ctx.channel(), + toBrokerCnx.ctx().channel(), + writeFuture.cause()); + toBrokerCnx.ctx().channel().pipeline() + .fireExceptionCaught(writeFuture.cause()); + } + })) + .whenComplete((__, ex) -> { + if (ex != null) { + LOG.error("Failed to forward the auth response from the proxy to " + + "the broker through the proxy client, proxy: {}", + ctx().channel(), ex); + } + }); + }); + } } catch (Exception e) { String msg = "Unable to handleAuthResponse"; LOG.warn("[{}] {} ", remoteAddress, msg, e); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java new file mode 100644 index 0000000000000..9ccb067adbf10 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertTrue; +import com.google.common.collect.Sets; +import io.jsonwebtoken.SignatureAlgorithm; +import java.util.Calendar; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import javax.crypto.SecretKey; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +public class ProxyRefreshAuthTest extends ProducerConsumerBase { + private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + + private ProxyService proxyService; + private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + + // enable tls and auth&auth at broker + conf.setAuthenticationEnabled(true); + conf.setAuthorizationEnabled(false); + conf.setTopicLevelPoliciesEnabled(false); + conf.setProxyRoles(Collections.singleton("Proxy")); + conf.setAdvertisedAddress(null); + conf.setAuthenticateOriginalAuthData(true); + conf.setBrokerServicePort(Optional.of(0)); + conf.setWebServicePort(Optional.of(0)); + + Set superUserRoles = new HashSet<>(); + superUserRoles.add("superUser"); + conf.setSuperUserRoles(superUserRoles); + + conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName())); + Properties properties = new Properties(); + properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); + conf.setProperties(properties); + + conf.setClusterName("proxy-authorization"); + conf.setNumExecutorThreadPoolSize(5); + + conf.setAuthenticationRefreshCheckSeconds(1); + } + + @BeforeClass + @Override + protected void setup() throws Exception { + super.init(); + + admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress()) + .authentication(new AuthenticationToken( + () -> AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.empty()))).build(); + String namespaceName = "my-tenant/my-ns"; + admin.clusters().createCluster("proxy-authorization", + ClusterData.builder().serviceUrlTls(brokerUrlTls.toString()).build()); + admin.tenants().createTenant("my-tenant", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization"))); + admin.namespaces().createNamespace(namespaceName); + + // start proxy service + proxyConfig.setAuthenticationEnabled(true); + proxyConfig.setAuthorizationEnabled(false); + proxyConfig.setForwardAuthorizationCredentials(true); + proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + proxyConfig.setAdvertisedAddress(null); + + proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); + proxyConfig.setWebServicePort(Optional.of(0)); + + proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + proxyConfig.setBrokerClientAuthenticationParameters( + AuthTokenUtils.createToken(SECRET_KEY, "Proxy", Optional.empty())); + proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName())); + Properties properties = new Properties(); + properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); + proxyConfig.setProperties(properties); + + proxyService = Mockito.spy(new ProxyService(proxyConfig, + new AuthenticationService( + PulsarConfigurationLoader.convertFrom(proxyConfig)))); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + proxyService.close(); + } + + private void startProxy(boolean forwardAuthData) throws Exception { + pulsar.getConfiguration().setAuthenticateOriginalAuthData(forwardAuthData); + proxyConfig.setForwardAuthorizationCredentials(forwardAuthData); + proxyService.start(); + } + + @DataProvider + Object[] forwardAuthDataProvider() { + return new Object[]{true, false}; + } + + @Test(dataProvider = "forwardAuthDataProvider") + public void testAuthDataRefresh(boolean forwardAuthData) throws Exception { + log.info("-- Starting {} test --", methodName); + + startProxy(forwardAuthData); + + AuthenticationToken authenticationToken = new AuthenticationToken(() -> { + Calendar calendar = Calendar.getInstance(); + calendar.add(Calendar.SECOND, 1); + return AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.of(calendar.getTime())); + }); + + pulsarClient = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) + .authentication(authenticationToken) + .build(); + + String topic = "persistent://my-tenant/my-ns/my-topic1"; + @Cleanup + Producer ignored = spy(pulsarClient.newProducer() + .topic(topic).create()); + + PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient; + Set> connections = pulsarClientImpl.getCnxPool().getConnections(); + + Awaitility.await().during(4, SECONDS).untilAsserted(() -> { + pulsarClient.getPartitionsForTopic(topic).get(); + assertTrue(connections.stream().allMatch(n -> { + try { + ClientCnx clientCnx = n.get(); + long timestamp = clientCnx.getLastDisconnectedTimestamp(); + return timestamp == 0; + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + }); + } +}