Skip to content

Commit

Permalink
YARN-11225. [Federation] Add postDelegationToken, postDelegationToken…
Browse files Browse the repository at this point in the history
…Expiration, cancelDelegationToken REST APIs for Router.
  • Loading branch information
slfan1989 committed Dec 5, 2022
1 parent 8a9bdb1 commit 0e593d4
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@
import javax.servlet.http.HttpServletRequest;

import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.authentication.server.ProxyUserAuthenticationFilterInitializer;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -375,4 +380,56 @@ public static UserGroupInformation getCallerUserGroupInformation(

return callerUGI;
}

/**
* initForWritableEndpoints does the init and acls verification for all
* writable REST end points.
*
* @param conf Configuration.
* @param callerUGI remote caller who initiated the request.
* @throws AuthorizationException in case of no access to perfom this op.
*/
public static void initForWritableEndpoints(Configuration conf, UserGroupInformation callerUGI)
throws AuthorizationException {
if (callerUGI == null) {
String msg = "Unable to obtain user name, user not authenticated";
throw new AuthorizationException(msg);
}

if (UserGroupInformation.isSecurityEnabled() && isStaticUser(conf, callerUGI)) {
String msg = "The default static user cannot carry out this operation.";
throw new ForbiddenException(msg);
}
}

/**
* Determine whether the user is a static user.
*
* @param conf Configuration.
* @param callerUGI remote caller who initiated the request.
* @return true, static user; false, not static user;
*/
private static boolean isStaticUser(Configuration conf, UserGroupInformation callerUGI) {
String staticUser = conf.get(CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER,
CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER);
return staticUser.equals(callerUGI.getUserName());
}

public static void createKerberosUserGroupInformation(HttpServletRequest hsr)
throws AuthorizationException, YarnException {
String authType = hsr.getAuthType();
if (!KerberosAuthenticationHandler.TYPE.equalsIgnoreCase(authType)) {
String msg = "Delegation token operations can only be carried out on a "
+ "Kerberos authenticated channel. Expected auth type is "
+ KerberosAuthenticationHandler.TYPE + ", got type " + authType;
throw new YarnException(msg);
}

if (hsr.getAttribute(
DelegationTokenAuthenticationHandler.DELEGATION_TOKEN_UGI_ATTRIBUTE) != null) {
String msg = "Delegation token operations cannot be carried out using "
+ "delegation token authentication.";
throw new YarnException(msg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,23 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,14 @@ public synchronized Map<RMDelegationTokenIdentifier, Long> getAllTokens() {
}
return allTokens;
}

public long getRenewDate(RMDelegationTokenIdentifier ident)
throws InvalidToken {
DelegationTokenInformation info = currentTokens.get(ident);
if (info == null) {
throw new InvalidToken("token (" + ident.toString()
+ ") can't be found in cache");
}
return info.getRenewDate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;

import java.io.IOException;

Expand All @@ -35,6 +37,8 @@ public abstract class AbstractRESTRequestInterceptor
private RESTRequestInterceptor nextInterceptor;
private UserGroupInformation user = null;

private RouterClientRMService routerClientRMService = null;

/**
* Sets the {@link RESTRequestInterceptor} in the chain.
*/
Expand Down Expand Up @@ -123,4 +127,14 @@ private void setupUser(final String userName) {
public UserGroupInformation getUser() {
return user;
}

@Override
public RouterClientRMService getRouterClientRMService() {
return routerClientRMService;
}

@Override
public void setRouterClientRMService(RouterClientRMService routerClientRMService) {
this.routerClientRMService = routerClientRMService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.security.Principal;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -46,11 +47,16 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
Expand All @@ -59,6 +65,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
Expand Down Expand Up @@ -121,6 +128,10 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil.getCallerUserGroupInformation;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil.initForWritableEndpoints;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil.createKerberosUserGroupInformation;

/**
* Extends the {@code AbstractRESTRequestInterceptor} class and provides an
* implementation for federation of YARN RM and scaling an application across
Expand Down Expand Up @@ -1556,11 +1567,63 @@ public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
throw new RuntimeException("updateAppQueue Failed.");
}

/**
* This method posts a delegation token from the client.
*
* @param tokenData the token to delegate. It is a content param.
* @param hsr the servlet request.
* @return Response containing the status code.
* @throws AuthorizationException if Kerberos auth failed.
* @throws IOException if the delegation failed.
* @throws InterruptedException if interrupted.
* @throws Exception in case of bad request.
*/
@Override
public Response postDelegationToken(DelegationToken tokenData,
HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException, Exception {
throw new NotImplementedException("Code is not implemented");
Configuration conf = federationFacade.getConf();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(conf, callerUGI);

try {
createKerberosUserGroupInformation(hsr);
callerUGI.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS);
} catch (YarnException ye) {
return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
}

return createDelegationToken(tokenData, hsr, callerUGI);
}

private Response createDelegationToken(DelegationToken tokenData, HttpServletRequest hsr,
UserGroupInformation callerUGI) throws AuthorizationException, IOException,
InterruptedException, Exception {
final String renewer = tokenData.getRenewer();
GetDelegationTokenResponse resp;
try {
resp = callerUGI.doAs((PrivilegedExceptionAction<GetDelegationTokenResponse>) () -> {
GetDelegationTokenRequest createReq = GetDelegationTokenRequest.newInstance(renewer);
return this.getRouterClientRMService().getDelegationToken(createReq);
});
} catch (Exception e) {
LOG.info("Create delegation token request failed", e);
throw e;
}

Token<RMDelegationTokenIdentifier> tk = new Token<>(
resp.getRMDelegationToken().getIdentifier().array(),
resp.getRMDelegationToken().getPassword().array(),
new Text(resp.getRMDelegationToken().getKind()),
new Text(resp.getRMDelegationToken().getService()));

RMDelegationTokenIdentifier identifier = tk.decodeIdentifier();
long currentExpiration = this.getRouterClientRMService().
getRouterDTSecretManager().getRenewDate(identifier);
DelegationToken respToken = new DelegationToken(tk.encodeToUrlString(),
renewer, identifier.getOwner().toString(), tk.getKind().toString(),
currentExpiration, identifier.getMaxDate());
return Response.status(Status.OK).entity(respToken).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServiceProtocol;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
Expand Down Expand Up @@ -122,4 +124,18 @@ ContainersInfo getContainers(HttpServletRequest req, HttpServletResponse res,
*/
ContainerInfo getContainer(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId, String containerId);

/**
* Set RouterClientRMService.
*
* @param routerClientRMService routerClientRMService.
*/
void setRouterClientRMService(RouterClientRMService routerClientRMService);

/**
* Get RouterClientRMService.
*
* @return RouterClientRMService
*/
RouterClientRMService getRouterClientRMService();
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
Expand Down Expand Up @@ -208,6 +209,8 @@ private RequestInterceptorChainWrapper initializePipeline(String user) {
RESTRequestInterceptor interceptorChain =
this.createRequestInterceptorChain();
interceptorChain.init(user);
RouterClientRMService routerClientRMService = router.getClientRMProxyService();
interceptorChain.setRouterClientRMService(routerClientRMService);
chainWrapper.init(interceptorChain);
} catch (Exception e) {
LOG.error("Init RESTRequestInterceptor error for user: {}", user, e);
Expand Down Expand Up @@ -954,4 +957,8 @@ public NodeLabelsInfo getRMNodeLabels(@Context HttpServletRequest hsr)
RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getRMNodeLabels(hsr);
}

public Router getRouter() {
return router;
}
}

0 comments on commit 0e593d4

Please sign in to comment.