Skip to content

Commit

Permalink
YARN-11219. [Federation] Add getAppActivities, getAppStatistics REST …
Browse files Browse the repository at this point in the history
…APIs for Router. (#4757)
  • Loading branch information
slfan1989 authored Aug 26, 2022
1 parent 5736b34 commit f8b9dd9
Show file tree
Hide file tree
Showing 7 changed files with 383 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;

import java.util.ArrayList;
import java.util.Collection;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
Expand All @@ -33,6 +34,10 @@ public class ApplicationStatisticsInfo {
public ApplicationStatisticsInfo() {
} // JAXB needs this

public ApplicationStatisticsInfo(Collection<StatisticsItemInfo> items) {
statItem.addAll(items);
}

public void add(StatisticsItemInfo statItem) {
this.statItem.add(statItem);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public StatisticsItemInfo(
this.count = count;
}

public StatisticsItemInfo(StatisticsItemInfo info) {
this.state = info.state;
this.type = info.type;
this.count = info.count;
}

public YarnApplicationState getState() {
return state;
}
Expand All @@ -53,4 +59,7 @@ public long getCount() {
return count;
}

public void setCount(long count) {
this.count = count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1129,13 +1129,50 @@ public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds, String groupBy, String limit,
Set<String> actions, boolean summarize) {
throw new NotImplementedException("Code is not implemented");

// Only verify the app_id,
// because the specific subCluster needs to be found according to the app_id,
// and other verifications are directly handed over to the corresponding subCluster RM
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());

final HttpServletRequest hsrCopy = clone(hsr);
return interceptor.getAppActivities(hsrCopy, appId, time, requestPriorities,
allocationRequestIds, groupBy, limit, actions, summarize);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e, "Unable to get subCluster by appId: %s.",
appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppActivities Failed.", e);
}

return null;
}

@Override
public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
Set<String> stateQueries, Set<String> typeQueries) {
throw new NotImplementedException("Code is not implemented");
try {
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
final HttpServletRequest hsrCopy = clone(hsr);
Class[] argsClasses = new Class[]{HttpServletRequest.class, Set.class, Set.class};
Object[] args = new Object[]{hsrCopy, stateQueries, typeQueries};
ClientMethod remoteMethod = new ClientMethod("getAppStatistics", argsClasses, args);
Map<SubClusterInfo, ApplicationStatisticsInfo> appStatisticsMap = invokeConcurrent(
subClustersActive.values(), remoteMethod, ApplicationStatisticsInfo.class);
return RouterWebServiceUtil.mergeApplicationStatisticsInfo(appStatisticsMap.values());
} catch (IOException e) {
RouterServerUtil.logAndThrowRunTimeException(e, "Get all active sub cluster(s) error.");
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException(e, "getAppStatistics error.");
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
Expand Down Expand Up @@ -540,4 +542,38 @@ public static NodeToLabelsInfo mergeNodeToLabels(

return new NodeToLabelsInfo(nodeToLabels);
}

public static ApplicationStatisticsInfo mergeApplicationStatisticsInfo(
Collection<ApplicationStatisticsInfo> appStatistics) {
ApplicationStatisticsInfo result = new ApplicationStatisticsInfo();
Map<String, StatisticsItemInfo> statisticsItemMap = new HashMap<>();

appStatistics.stream().forEach(appStatistic -> {
List<StatisticsItemInfo> statisticsItemInfos = appStatistic.getStatItems();
for (StatisticsItemInfo statisticsItemInfo : statisticsItemInfos) {

String statisticsItemKey =
statisticsItemInfo.getType() + "_" + statisticsItemInfo.getState().toString();

StatisticsItemInfo statisticsItemValue;
if (statisticsItemMap.containsKey(statisticsItemKey)) {
statisticsItemValue = statisticsItemMap.get(statisticsItemKey);
long statisticsItemValueCount = statisticsItemValue.getCount();
long statisticsItemInfoCount = statisticsItemInfo.getCount();
long newCount = statisticsItemValueCount + statisticsItemInfoCount;
statisticsItemValue.setCount(newCount);
} else {
statisticsItemValue = new StatisticsItemInfo(statisticsItemInfo);
}

statisticsItemMap.put(statisticsItemKey, statisticsItemValue);
}
});

if (!statisticsItemMap.isEmpty()) {
result.getStatItems().addAll(statisticsItemMap.values());
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Collections;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
Expand Down Expand Up @@ -58,6 +60,20 @@
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
Expand All @@ -78,13 +94,22 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.mockito.Mockito.mock;

/**
* This class mocks the RESTRequestInterceptor.
*/
Expand Down Expand Up @@ -132,8 +157,9 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
// Initialize appReport
ApplicationReport appReport = ApplicationReport.newInstance(
appId, ApplicationAttemptId.newInstance(appId, 1), null, newApp.getQueue(), null, null, 0,
null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0, null, null, null,
false, Priority.newInstance(newApp.getPriority()), null, null);
null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0,
newApp.getApplicationType(), null, null, false, Priority.newInstance(newApp.getPriority()),
null, null);

// Initialize appTimeoutsMap
HashMap<ApplicationTimeoutType, ApplicationTimeout> appTimeoutsMap = new HashMap<>();
Expand Down Expand Up @@ -661,4 +687,105 @@ public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, Str
AppQueue targetAppQueue = new AppQueue(targetQueue.getQueue());
return Response.status(Status.OK).entity(targetAppQueue).build();
}

public void updateApplicationState(YarnApplicationState appState, String appId)
throws AuthorizationException, YarnException, InterruptedException, IOException {
validateRunning();
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationReport appReport = applicationMap.get(applicationId);
appReport.setYarnApplicationState(appState);
}

@Override
public ApplicationStatisticsInfo getAppStatistics(
HttpServletRequest hsr, Set<String> stateQueries, Set<String> typeQueries) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

Map<String, StatisticsItemInfo> itemInfoMap = new HashMap<>();

for (ApplicationReport appReport : applicationMap.values()) {

YarnApplicationState appState = appReport.getYarnApplicationState();
String appType = appReport.getApplicationType();

if (stateQueries.contains(appState.name()) && typeQueries.contains(appType)) {
String itemInfoMapKey = appState.toString() + "_" + appType;
StatisticsItemInfo itemInfo = itemInfoMap.getOrDefault(itemInfoMapKey, null);
if (itemInfo == null) {
itemInfo = new StatisticsItemInfo(appState, appType, 1);
} else {
long newCount = itemInfo.getCount() + 1;
itemInfo.setCount(newCount);
}
itemInfoMap.put(itemInfoMapKey, itemInfo);
}
}

return new ApplicationStatisticsInfo(itemInfoMap.values());
}

@Override
public AppActivitiesInfo getAppActivities(
HttpServletRequest hsr, String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds, String groupBy, String limit, Set<String> actions,
boolean summarize) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

SchedulerNode schedulerNode = TestUtils.getMockNode("host0", "rack", 1, 10240);

RMContext rmContext = Mockito.mock(RMContext.class);
Mockito.when(rmContext.getYarnConfiguration()).thenReturn(this.getConf());
ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
Mockito.when(scheduler.getMinimumResourceCapability()).thenReturn(Resources.none());
Mockito.when(rmContext.getScheduler()).thenReturn(scheduler);
LeafQueue mockQueue = Mockito.mock(LeafQueue.class);
Map<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<>();
Mockito.doReturn(rmApps).when(rmContext).getRMApps();

FiCaSchedulerNode node = (FiCaSchedulerNode) schedulerNode;
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 0);
RMApp mockApp = Mockito.mock(RMApp.class);
Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp).getApplicationId();
Mockito.doReturn(FinalApplicationStatus.UNDEFINED).when(mockApp).getFinalApplicationStatus();
rmApps.put(appAttemptId.getApplicationId(), mockApp);
FiCaSchedulerApp app = new FiCaSchedulerApp(appAttemptId, "user", mockQueue,
mock(ActiveUsersManager.class), rmContext);

ActivitiesManager newActivitiesManager = new ActivitiesManager(rmContext);
newActivitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 3);

int numActivities = 10;
for (int i = 0; i < numActivities; i++) {
ActivitiesLogger.APP.startAppAllocationRecording(newActivitiesManager, node,
SystemClock.getInstance().getTime(), app);
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(newActivitiesManager, node, app,
new SchedulerRequestKey(Priority.newInstance(0), 0, null),
ActivityDiagnosticConstant.NODE_IS_BLACKLISTED, ActivityState.REJECTED,
ActivityLevel.NODE);
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(newActivitiesManager,
app.getApplicationId(), ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
}

Set<Integer> prioritiesInt =
requestPriorities.stream().map(pri -> Integer.parseInt(pri)).collect(Collectors.toSet());
Set<Long> allocationReqIds =
allocationRequestIds.stream().map(id -> Long.parseLong(id)).collect(Collectors.toSet());
AppActivitiesInfo appActivitiesInfo = newActivitiesManager.
getAppActivitiesInfo(app.getApplicationId(), prioritiesInt, allocationReqIds, null,
Integer.parseInt(limit), summarize, 3);

return appActivitiesInfo;
}
}
Loading

0 comments on commit f8b9dd9

Please sign in to comment.