Skip to content

Commit

Permalink
KUDU-2543 pt 3 java: pass around authz tokens
Browse files Browse the repository at this point in the history
Adds handling of authz tokens to the Java client. The Java client will
now cache tokens upon opening a table, and use them for RPCs that need
them (e.g. Writes and Scans), reacquiring them when receiving word that
they are expired.

This is tested as follows:
- TestAuthnTokenReacquire's test for scans and writes is repurposed to
  also test for reacquisition of authz tokens when they expire
- basic tests are added to test the token cache
- a test is added to test authz reacquisition in the case that a
  multi-master deployment undergoes a leadership change
- a test is added to test authz reacquisition upon invalid or expired
  tokens during prolonged workloads against a multi-master deployment

Change-Id: Iadd5f7709b45628d7ddd9e2b100d0dfaabbf15af
Reviewed-on: http://gerrit.cloudera.org:8080/12279
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
Reviewed-by: Hao Hao <[email protected]>
  • Loading branch information
andrwng committed Mar 11, 2019
1 parent 9d4ed7a commit 7645f5b
Show file tree
Hide file tree
Showing 15 changed files with 872 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.google.protobuf.Message;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import org.apache.kudu.security.Token;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
Expand All @@ -68,6 +69,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.kudu.security.Token.SignedTokenPB;
import org.apache.kudu.Common;
import org.apache.kudu.Schema;
import org.apache.kudu.master.Master;
Expand Down Expand Up @@ -116,8 +118,8 @@
* Authentication and Authorization Service</em> (JAAS) API provided by the JDK.
* JAAS provides a common way for applications to initialize Kerberos
* credentials, store these credentials in a {@link javax.security.auth.Subject}
* instance, and associate the Subject the current thread of execution. The Kudu
* client then accesses the Kerberos credentials in the
* instance, and associate the Subject with the current thread of execution.
* The Kudu client then accesses the Kerberos credentials in the
* {@link javax.security.auth.Subject} and uses them to authenticate to the
* remote cluster as necessary.
* <p>
Expand Down Expand Up @@ -355,6 +357,9 @@ public class AsyncKuduClient implements AutoCloseable {
/** A helper to facilitate re-acquiring of authentication token if current one expires. */
private final AuthnTokenReacquirer tokenReacquirer;

/** A helper to facilitate retrieving authz tokens */
private final AuthzTokenCache authzTokenCache;

private volatile boolean closed;

private AsyncKuduClient(AsyncKuduClientBuilder b) {
Expand All @@ -373,6 +378,7 @@ private AsyncKuduClient(AsyncKuduClientBuilder b) {
this.connectionCache = new ConnectionCache(
securityContext, timer, channelFactory);
this.tokenReacquirer = new AuthnTokenReacquirer(this);
this.authzTokenCache = new AuthzTokenCache(this);
}

/**
Expand Down Expand Up @@ -756,11 +762,14 @@ private Deferred<KuduTable> getTableSchema(
Preconditions.checkNotNull(tableName);

// Prefer a lookup by table ID over name, since the former is immutable.
// For backwards compatibility with older tservers, we don't require authz
// token support.
GetTableSchemaRequest rpc = new GetTableSchemaRequest(this.masterTable,
tableId,
tableId != null ? null : tableName,
timer,
defaultAdminOperationTimeoutMs);
defaultAdminOperationTimeoutMs,
/*requiresAuthzTokenSupport=*/false);

rpc.setParentRpc(parent);
return sendRpcToTablet(rpc).addCallback(new Callback<KuduTable, GetTableSchemaResponse>() {
Expand All @@ -773,6 +782,10 @@ public KuduTable call(GetTableSchemaResponse resp) throws Exception {
if (cache != null) {
cache.clearNonCoveredRangeEntries();
}
SignedTokenPB authzToken = resp.getAuthzToken();
if (authzToken != null) {
authzTokenCache.put(resp.getTableId(), authzToken);
}

LOG.debug("Opened table {}", resp.getTableId());
return new KuduTable(AsyncKuduClient.this,
Expand Down Expand Up @@ -898,6 +911,11 @@ public void run(final Timeout ignored) {
}));
}

@InterfaceAudience.LimitedPrivate("Test")
public AuthzTokenCache getAuthzTokenCache() {
return this.authzTokenCache;
}

/**
* Get the Hive Metastore configuration of the most recently connected-to leader master, or
* {@code null} if the Hive Metastore integration is not enabled.
Expand Down Expand Up @@ -1955,16 +1973,39 @@ <R> void handleRetryableErrorNoDelay(final KuduRpc<R> rpc, KuduException ex) {
}

/**
* Handle a RPC failed due to invalid authn token error. In short, connect to the Kudu cluster
* Handle an RPC failed due to invalid authn token error. In short, connect to the Kudu cluster
* to acquire a new authentication token and retry the RPC once a new authentication token
* is put into the {@link #securityContext}.
*
* @param rpc the RPC which failed do to invalid authn token
* @param rpc the RPC which failed with an invalid authn token
*/
<R> void handleInvalidToken(KuduRpc<R> rpc) {
<R> void handleInvalidAuthnToken(KuduRpc<R> rpc) {
// TODO(awong): plumb the offending KuduException into the reacquirer.
tokenReacquirer.handleAuthnTokenExpiration(rpc);
}

/**
* Handle an RPC that failed due to an invalid authorization token error. The
* RPC will be retried after fetching a new authz token.
*
* @param rpc the RPC that failed with an invalid authz token
* @param ex the KuduException that led to this handling
*/
<R> void handleInvalidAuthzToken(KuduRpc<R> rpc, KuduException ex) {
authzTokenCache.retrieveAuthzToken(rpc, ex);
}

/**
* Gets an authorization token for the given table from the cache, or nullptr
* if none exists.
*
* @param tableId the table ID for which to get an authz token
* @return a signed authz token for the table
*/
SignedTokenPB getAuthzToken(String tableId) {
return authzTokenCache.get(tableId);
}

/**
* This methods enable putting RPCs on hold for a period of time determined by
* {@link #getSleepTimeForRpcMillis(KuduRpc)}. If the RPC is out of time/retries, its errback will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.protobuf.UnsafeByteOperations;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import org.apache.kudu.security.Token;
import org.apache.kudu.tserver.Tserver.ScannerKeepAliveRequestPB;
import org.apache.kudu.tserver.Tserver.ScannerKeepAliveResponsePB;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -886,6 +887,9 @@ final class ScanRequest extends KuduRpc<Response> {

State state;

/** The token with which to authorize this RPC. */
private Token.SignedTokenPB authzToken;

ScanRequest(KuduTable table, State state, RemoteTablet tablet) {
super(table, client.getTimer(), scanRequestTimeout);
setTablet(tablet);
Expand Down Expand Up @@ -916,6 +920,16 @@ ReplicaSelection getReplicaSelection() {
return replicaSelection;
}

@Override
boolean needsAuthzToken() {
return true;
}

@Override
void bindAuthzToken(Token.SignedTokenPB token) {
authzToken = token;
}

/** Serializes this request. */
@Override
Message createRequestPB() {
Expand Down Expand Up @@ -968,6 +982,9 @@ Message createRequestPB() {
for (KuduPredicate pred : predicates.values()) {
newBuilder.addColumnPredicates(pred.toPB());
}
if (authzToken != null) {
newBuilder.setAuthzToken(authzToken);
}
builder.setNewScanRequest(newBuilder.build())
.setBatchSizeBytes(batchSizeBytes);
break;
Expand Down
Loading

0 comments on commit 7645f5b

Please sign in to comment.