Skip to content

Commit

Permalink
YARN-11225. [Federation] Add postDelegationToken postDelegationTokenE…
Browse files Browse the repository at this point in the history
…xpiration cancelDelegationToken REST APIs for Router.
  • Loading branch information
slfan1989 committed Dec 10, 2022
1 parent 8a9bdb1 commit 2b67668
Show file tree
Hide file tree
Showing 9 changed files with 548 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -611,4 +611,16 @@ protected RouterDelegationTokenSecretManager createRouterRMDelegationTokenSecret
public RouterDelegationTokenSecretManager getRouterDTSecretManager() {
return routerDTSecretManager;
}

@VisibleForTesting
public void setRouterDTSecretManager(RouterDelegationTokenSecretManager routerDTSecretManager) {
this.routerDTSecretManager = routerDTSecretManager;
}

@VisibleForTesting
public void initUserPipelineMap(Configuration conf) {
int maxCacheSize = conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,14 @@ public synchronized Map<RMDelegationTokenIdentifier, Long> getAllTokens() {
}
return allTokens;
}

public long getRenewDate(RMDelegationTokenIdentifier ident)
throws InvalidToken {
DelegationTokenInformation info = currentTokens.get(ident);
if (info == null) {
throw new InvalidToken("token (" + ident.toString()
+ ") can't be found in cache");
}
return info.getRenewDate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;

import java.io.IOException;

Expand All @@ -34,6 +35,7 @@ public abstract class AbstractRESTRequestInterceptor
private Configuration conf;
private RESTRequestInterceptor nextInterceptor;
private UserGroupInformation user = null;
private RouterClientRMService routerClientRMService = null;

/**
* Sets the {@link RESTRequestInterceptor} in the chain.
Expand Down Expand Up @@ -123,4 +125,14 @@ private void setupUser(final String userName) {
public UserGroupInformation getUser() {
return user;
}

@Override
public RouterClientRMService getRouterClientRMService() {
return routerClientRMService;
}

@Override
public void setRouterClientRMService(RouterClientRMService routerClientRMService) {
this.routerClientRMService = routerClientRMService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.security.Principal;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -46,11 +47,15 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.protocolrecords.*;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
Expand All @@ -59,6 +64,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
Expand Down Expand Up @@ -104,8 +110,11 @@
import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
Expand All @@ -121,6 +130,9 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import static org.apache.hadoop.yarn.server.router.webapp.RouterWebServiceUtil.extractToken;
import static org.apache.hadoop.yarn.server.router.webapp.RouterWebServiceUtil.getKerberosUserGroupInformation;

/**
* Extends the {@code AbstractRESTRequestInterceptor} class and provides an
* implementation for federation of YARN RM and scaling an application across
Expand Down Expand Up @@ -1556,25 +1568,178 @@ public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
throw new RuntimeException("updateAppQueue Failed.");
}

/**
* This method posts a delegation token from the client.
*
* @param tokenData the token to delegate. It is a content param.
* @param hsr the servlet request.
* @return Response containing the status code.
* @throws AuthorizationException if Kerberos auth failed.
* @throws IOException if the delegation failed.
* @throws InterruptedException if interrupted.
* @throws Exception in case of bad request.
*/
@Override
public Response postDelegationToken(DelegationToken tokenData,
HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException, Exception {
throw new NotImplementedException("Code is not implemented");
public Response postDelegationToken(DelegationToken tokenData, HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException, Exception {

if (tokenData == null || hsr == null) {
throw new IllegalArgumentException("Parameter error, the tokenData or hsr is null.");
}

try {
// get Caller UserGroupInformation
Configuration conf = federationFacade.getConf();
UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr);

// create a delegation token
return createDelegationToken(tokenData, callerUGI);
} catch (YarnException e) {
LOG.error("Create delegation token request failed.", e);
return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build();
}
}

/**
* Create DelegationToken.
*
* @param dtoken DelegationToken Data.
* @param callerUGI UserGroupInformation.
* @return Response.
* @throws Exception An exception occurred when creating a delegationToken.
*/
private Response createDelegationToken(DelegationToken dtoken, UserGroupInformation callerUGI)
throws IOException, InterruptedException {

String renewer = dtoken.getRenewer();

GetDelegationTokenResponse resp = callerUGI.doAs(
(PrivilegedExceptionAction<GetDelegationTokenResponse>) () -> {
GetDelegationTokenRequest createReq = GetDelegationTokenRequest.newInstance(renewer);
return this.getRouterClientRMService().getDelegationToken(createReq);
});

org.apache.hadoop.yarn.api.records.Token token = resp.getRMDelegationToken();
byte[] identifier = token.getIdentifier().array();
byte[] password = token.getPassword().array();
Text kind = new Text(token.getKind());
Text service = new Text(token.getService());
Token<RMDelegationTokenIdentifier> tk = new Token<>(identifier, password, kind, service);

RMDelegationTokenIdentifier tokenIdentifier = tk.decodeIdentifier();
RouterClientRMService clientRMService = this.getRouterClientRMService();
RouterDelegationTokenSecretManager tokenSecretManager =
clientRMService.getRouterDTSecretManager();
long currentExpiration = tokenSecretManager.getRenewDate(tokenIdentifier);

String owner = tokenIdentifier.getOwner().toString();
String tokenKind = tk.getKind().toString();
long maxDate = tokenIdentifier.getMaxDate();
DelegationToken respToken = new DelegationToken(tk.encodeToUrlString(),
renewer, owner, tokenKind, currentExpiration, maxDate);
return Response.status(Status.OK).entity(respToken).build();
}

/**
* This method updates the expiration for a delegation token from the client.
*
* @param hsr the servlet request
* @return Response containing the status code.
* @throws AuthorizationException if Kerberos auth failed.
* @throws IOException if the delegation failed.
* @throws InterruptedException if interrupted.
* @throws Exception in case of bad request.
*/
@Override
public Response postDelegationTokenExpiration(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException,
Exception {
throw new NotImplementedException("Code is not implemented");
throws AuthorizationException, IOException, InterruptedException, Exception {

if (hsr == null) {
throw new IllegalArgumentException("Parameter error, the hsr is null.");
}

try {
// get Caller UserGroupInformation
Configuration conf = federationFacade.getConf();
UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr);

// renew Delegation Token
DelegationToken requestToken = new DelegationToken();
String token = extractToken(hsr).encodeToUrlString();
requestToken.setToken(token);
return renewDelegationToken(requestToken, callerUGI);
} catch (YarnException e) {
LOG.error("Renew delegation token request failed.", e);
return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build();
}
}

/**
* Renew DelegationToken.
*
* @param tokenData DelegationToken.
* @param callerUGI UserGroupInformation.
* @return Response
* @throws IOException if there are I/O errors.
* @throws InterruptedException if any thread has interrupted.
*/
private Response renewDelegationToken(DelegationToken tokenData, UserGroupInformation callerUGI)
throws IOException, InterruptedException {
// Parse token data
Token<RMDelegationTokenIdentifier> token = extractToken(tokenData.getToken());
org.apache.hadoop.yarn.api.records.Token dToken =
BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind().toString(),
token.getPassword(), token.getService().toString());

// Renew token
RenewDelegationTokenResponse resp = callerUGI.doAs(
(PrivilegedExceptionAction<RenewDelegationTokenResponse>) () -> {
RenewDelegationTokenRequest req = RenewDelegationTokenRequest.newInstance(dToken);
return this.getRouterClientRMService().renewDelegationToken(req);
});

// return DelegationToken
long renewTime = resp.getNextExpirationTime();
DelegationToken respToken = new DelegationToken();
respToken.setNextExpirationTime(renewTime);
return Response.status(Status.OK).entity(respToken).build();
}

/**
* Cancel DelegationToken.
*
* @param hsr the servlet request
* @return Response containing the status code.
* @throws AuthorizationException if Kerberos auth failed.
* @throws IOException if the delegation failed.
* @throws InterruptedException if interrupted.
* @throws Exception in case of bad request.
*/
@Override
public Response cancelDelegationToken(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException,
Exception {
throw new NotImplementedException("Code is not implemented");
throws AuthorizationException, IOException, InterruptedException, Exception {
try {
// get Caller UserGroupInformation
Configuration conf = federationFacade.getConf();
UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr);

// parse Token Data
Token<RMDelegationTokenIdentifier> token = extractToken(hsr);
org.apache.hadoop.yarn.api.records.Token dToken = BuilderUtils
.newDelegationToken(token.getIdentifier(), token.getKind().toString(),
token.getPassword(), token.getService().toString());

// cancelDelegationToken
callerUGI.doAs((PrivilegedExceptionAction<CancelDelegationTokenResponse>) () -> {
CancelDelegationTokenRequest req = CancelDelegationTokenRequest.newInstance(dToken);
return this.getRouterClientRMService().cancelDelegationToken(req);
});

return Response.status(Status.OK).build();
} catch (YarnException e) {
LOG.error("Cancel delegation token request failed.", e);
return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServiceProtocol;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
Expand Down Expand Up @@ -122,4 +123,18 @@ ContainersInfo getContainers(HttpServletRequest req, HttpServletResponse res,
*/
ContainerInfo getContainer(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId, String containerId);

/**
* Set RouterClientRMService.
*
* @param routerClientRMService routerClientRMService.
*/
void setRouterClientRMService(RouterClientRMService routerClientRMService);

/**
* Get RouterClientRMService.
*
* @return RouterClientRMService
*/
RouterClientRMService getRouterClientRMService();
}
Loading

0 comments on commit 2b67668

Please sign in to comment.