Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11225. [Federation] Add postDelegationToken postDelegationTokenExpiration cancelDelegationToken REST APIs for Router. #5185

Merged
merged 11 commits into from
Jan 3, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -611,4 +611,16 @@ protected RouterDelegationTokenSecretManager createRouterRMDelegationTokenSecret
public RouterDelegationTokenSecretManager getRouterDTSecretManager() {
return routerDTSecretManager;
}

@VisibleForTesting
public void setRouterDTSecretManager(RouterDelegationTokenSecretManager routerDTSecretManager) {
this.routerDTSecretManager = routerDTSecretManager;
}

@VisibleForTesting
public void initUserPipelineMap(Configuration conf) {
int maxCacheSize = conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,16 @@ public synchronized Map<RMDelegationTokenIdentifier, Long> getAllTokens() {
return allTokens;
}

public long getRenewDate(RMDelegationTokenIdentifier ident)
throws InvalidToken {
DelegationTokenInformation info = currentTokens.get(ident);
if (info == null) {
throw new InvalidToken("token (" + ident.toString()
+ ") can't be found in cache");
}
return info.getRenewDate();
}

@Override
protected synchronized int incrementDelegationTokenSeqNum() {
return federationFacade.incrementDelegationTokenSeqNum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;

import org.apache.hadoop.yarn.server.router.RouterServerUtil;

/**
Expand All @@ -32,6 +34,7 @@ public abstract class AbstractRESTRequestInterceptor
private Configuration conf;
private RESTRequestInterceptor nextInterceptor;
private UserGroupInformation user = null;
private RouterClientRMService routerClientRMService = null;

/**
* Sets the {@link RESTRequestInterceptor} in the chain.
Expand Down Expand Up @@ -93,4 +96,14 @@ public RESTRequestInterceptor getNextInterceptor() {
public UserGroupInformation getUser() {
return user;
}

@Override
public RouterClientRMService getRouterClientRMService() {
return routerClientRMService;
}

@Override
public void setRouterClientRMService(RouterClientRMService routerClientRMService) {
this.routerClientRMService = routerClientRMService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.security.Principal;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -46,11 +47,20 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
Expand All @@ -61,6 +71,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
Expand Down Expand Up @@ -107,8 +118,11 @@
import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
Expand All @@ -124,6 +138,9 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import static org.apache.hadoop.yarn.server.router.webapp.RouterWebServiceUtil.extractToken;
import static org.apache.hadoop.yarn.server.router.webapp.RouterWebServiceUtil.getKerberosUserGroupInformation;

/**
* Extends the {@code AbstractRESTRequestInterceptor} class and provides an
* implementation for federation of YARN RM and scaling an application across
Expand Down Expand Up @@ -1567,25 +1584,209 @@ public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
throw new RuntimeException("updateAppQueue Failed.");
}

/**
* This method posts a delegation token from the client.
*
* @param tokenData the token to delegate. It is a content param.
* @param hsr the servlet request.
* @return Response containing the status code.
* @throws AuthorizationException if Kerberos auth failed.
* @throws IOException if the delegation failed.
* @throws InterruptedException if interrupted.
* @throws Exception in case of bad request.
*/
@Override
public Response postDelegationToken(DelegationToken tokenData,
HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException, Exception {
throw new NotImplementedException("Code is not implemented");
public Response postDelegationToken(DelegationToken tokenData, HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException, Exception {

if (tokenData == null || hsr == null) {
throw new IllegalArgumentException("Parameter error, the tokenData or hsr is null.");
}

try {
// get Caller UserGroupInformation
Configuration conf = federationFacade.getConf();
UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr);

// create a delegation token
return createDelegationToken(tokenData, callerUGI);
} catch (YarnException e) {
LOG.error("Create delegation token request failed.", e);
return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build();
}
}

/**
* Create DelegationToken.
*
* @param dtoken DelegationToken Data.
* @param callerUGI UserGroupInformation.
* @return Response.
* @throws Exception An exception occurred when creating a delegationToken.
*/
private Response createDelegationToken(DelegationToken dtoken, UserGroupInformation callerUGI)
throws IOException, InterruptedException {

String renewer = dtoken.getRenewer();

GetDelegationTokenResponse resp = callerUGI.doAs(
(PrivilegedExceptionAction<GetDelegationTokenResponse>) () -> {
GetDelegationTokenRequest createReq = GetDelegationTokenRequest.newInstance(renewer);
return this.getRouterClientRMService().getDelegationToken(createReq);
});

DelegationToken respToken = getDelegationToken(renewer, resp);
return Response.status(Status.OK).entity(respToken).build();
}

/**
* Get DelegationToken.
*
* @param renewer renewer.
* @param resp GetDelegationTokenResponse.
* @return DelegationToken.
* @throws IOException if there are I/O errors.
*/
private DelegationToken getDelegationToken(String renewer, GetDelegationTokenResponse resp)
throws IOException {
// Step1. Parse token from GetDelegationTokenResponse.
Token<RMDelegationTokenIdentifier> tk = getToken(resp);
String tokenKind = tk.getKind().toString();
RMDelegationTokenIdentifier tokenIdentifier = tk.decodeIdentifier();
String owner = tokenIdentifier.getOwner().toString();
long maxDate = tokenIdentifier.getMaxDate();

// Step2. Call the interface to get the expiration time of Token.
RouterClientRMService clientRMService = this.getRouterClientRMService();
RouterDelegationTokenSecretManager tokenSecretManager =
clientRMService.getRouterDTSecretManager();
long currentExpiration = tokenSecretManager.getRenewDate(tokenIdentifier);

// Step3. Generate Delegation token.
DelegationToken delegationToken = new DelegationToken(tk.encodeToUrlString(),
renewer, owner, tokenKind, currentExpiration, maxDate);

return delegationToken;
}

/**
* GetToken.
* We convert RMDelegationToken in GetDelegationTokenResponse to Token.
*
* @param resp GetDelegationTokenResponse.
* @return Token.
*/
private static Token<RMDelegationTokenIdentifier> getToken(GetDelegationTokenResponse resp) {
org.apache.hadoop.yarn.api.records.Token token = resp.getRMDelegationToken();
byte[] identifier = token.getIdentifier().array();
byte[] password = token.getPassword().array();
Text kind = new Text(token.getKind());
Text service = new Text(token.getService());
Token<RMDelegationTokenIdentifier> tk = new Token<>(identifier, password, kind, service);
return tk;
}

/**
* This method updates the expiration for a delegation token from the client.
*
* @param hsr the servlet request
* @return Response containing the status code.
* @throws AuthorizationException if Kerberos auth failed.
* @throws IOException if the delegation failed.
* @throws InterruptedException if interrupted.
* @throws Exception in case of bad request.
*/
@Override
public Response postDelegationTokenExpiration(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException,
Exception {
throw new NotImplementedException("Code is not implemented");
throws AuthorizationException, IOException, InterruptedException, Exception {

if (hsr == null) {
throw new IllegalArgumentException("Parameter error, the hsr is null.");
}

try {
// get Caller UserGroupInformation
Configuration conf = federationFacade.getConf();
UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr);
return renewDelegationToken(hsr, callerUGI);
} catch (YarnException e) {
LOG.error("Renew delegation token request failed.", e);
return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build();
}
}

/**
* Renew DelegationToken.
*
* @param hsr HttpServletRequest.
* @param callerUGI UserGroupInformation.
* @return Response
* @throws IOException if there are I/O errors.
* @throws InterruptedException if any thread has interrupted.
*/
private Response renewDelegationToken(HttpServletRequest hsr, UserGroupInformation callerUGI)
throws IOException, InterruptedException {

// renew Delegation Token
DelegationToken tokenData = new DelegationToken();
String encodeToken = extractToken(hsr).encodeToUrlString();
tokenData.setToken(encodeToken);

// Parse token data
Token<RMDelegationTokenIdentifier> token = extractToken(tokenData.getToken());
org.apache.hadoop.yarn.api.records.Token dToken =
BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind().toString(),
token.getPassword(), token.getService().toString());

// Renew token
RenewDelegationTokenResponse resp = callerUGI.doAs(
(PrivilegedExceptionAction<RenewDelegationTokenResponse>) () -> {
RenewDelegationTokenRequest req = RenewDelegationTokenRequest.newInstance(dToken);
return this.getRouterClientRMService().renewDelegationToken(req);
});

// return DelegationToken
long renewTime = resp.getNextExpirationTime();
DelegationToken respToken = new DelegationToken();
respToken.setNextExpirationTime(renewTime);
return Response.status(Status.OK).entity(respToken).build();
}

/**
* Cancel DelegationToken.
*
* @param hsr the servlet request
* @return Response containing the status code.
* @throws AuthorizationException if Kerberos auth failed.
* @throws IOException if the delegation failed.
* @throws InterruptedException if interrupted.
* @throws Exception in case of bad request.
*/
@Override
public Response cancelDelegationToken(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException,
Exception {
throw new NotImplementedException("Code is not implemented");
throws AuthorizationException, IOException, InterruptedException, Exception {
try {
// get Caller UserGroupInformation
Configuration conf = federationFacade.getConf();
UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr);

// parse Token Data
Token<RMDelegationTokenIdentifier> token = extractToken(hsr);
org.apache.hadoop.yarn.api.records.Token dToken = BuilderUtils
.newDelegationToken(token.getIdentifier(), token.getKind().toString(),
token.getPassword(), token.getService().toString());

// cancelDelegationToken
callerUGI.doAs((PrivilegedExceptionAction<CancelDelegationTokenResponse>) () -> {
CancelDelegationTokenRequest req = CancelDelegationTokenRequest.newInstance(dToken);
return this.getRouterClientRMService().cancelDelegationToken(req);
});

return Response.status(Status.OK).build();
} catch (YarnException e) {
LOG.error("Cancel delegation token request failed.", e);
return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServiceProtocol;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
Expand Down Expand Up @@ -122,4 +123,18 @@ ContainersInfo getContainers(HttpServletRequest req, HttpServletResponse res,
*/
ContainerInfo getContainer(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId, String containerId);

/**
* Set RouterClientRMService.
*
* @param routerClientRMService routerClientRMService.
*/
void setRouterClientRMService(RouterClientRMService routerClientRMService);

/**
* Get RouterClientRMService.
*
* @return RouterClientRMService
*/
RouterClientRMService getRouterClientRMService();
}
Loading