Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11219. [Federation] Add getAppActivities, getAppStatistics REST APIs for Router. #4757

Merged
merged 6 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;

import java.util.ArrayList;
import java.util.Collection;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
Expand All @@ -33,12 +34,15 @@ public class ApplicationStatisticsInfo {
public ApplicationStatisticsInfo() {
} // JAXB needs this

public ApplicationStatisticsInfo(Collection<StatisticsItemInfo> items) {
statItem.addAll(items);
}

public void add(StatisticsItemInfo statItem) {
this.statItem.add(statItem);
}

public ArrayList<StatisticsItemInfo> getStatItems() {
return statItem;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your help reviewing the code, I will modify the code.

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public StatisticsItemInfo(
this.count = count;
}

public StatisticsItemInfo(StatisticsItemInfo info) {
this.state = info.state;
this.type = info.type;
this.count = info.count;
}

public YarnApplicationState getState() {
return state;
}
Expand All @@ -53,4 +59,7 @@ public long getCount() {
return count;
}

public void setCount(long count) {
this.count = count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1129,13 +1129,49 @@ public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds, String groupBy, String limit,
Set<String> actions, boolean summarize) {
throw new NotImplementedException("Code is not implemented");

// Only verify the app_id, because the specific subCluster needs to be found according to the app_id,
// and other verifications are directly handed over to the corresponding subCluster RM
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());

final HttpServletRequest hsrCopy = clone(hsr);
return interceptor.getAppActivities(hsrCopy, appId, time, requestPriorities,
allocationRequestIds, groupBy, limit, actions, summarize);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.

"Unable to get subCluster by appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppActivities Failed.", e);
}

return null;
}

@Override
public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
Set<String> stateQueries, Set<String> typeQueries) {
throw new NotImplementedException("Code is not implemented");
try {
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
final HttpServletRequest hsrCopy = clone(hsr);
Class[] argsClasses = new Class[]{HttpServletRequest.class, Set.class, Set.class};
Object[] args = new Object[]{hsrCopy, stateQueries, typeQueries};
ClientMethod remoteMethod = new ClientMethod("getAppStatistics", argsClasses, args);
Map<SubClusterInfo, ApplicationStatisticsInfo> appStatisticsMap = invokeConcurrent(
subClustersActive.values(), remoteMethod, ApplicationStatisticsInfo.class);
return RouterWebServiceUtil.mergeApplicationStatisticsInfo(appStatisticsMap.values());
} catch (IOException e) {
RouterServerUtil.logAndThrowRunTimeException(e, "Get all active sub cluster(s) error.");
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException(e, "getAppStatistics error.");
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
Expand Down Expand Up @@ -540,4 +542,34 @@ public static NodeToLabelsInfo mergeNodeToLabels(

return new NodeToLabelsInfo(nodeToLabels);
}

public static ApplicationStatisticsInfo mergeApplicationStatisticsInfo(
Collection<ApplicationStatisticsInfo> appStatistics) {
ApplicationStatisticsInfo result = new ApplicationStatisticsInfo();
HashMap<String, StatisticsItemInfo> statisticsItemMap = new HashMap();

appStatistics.stream().forEach(appStatistic -> {
List<StatisticsItemInfo> statisticsItemInfos = appStatistic.getStatItems();
for (StatisticsItemInfo statisticsItemInfo : statisticsItemInfos) {
String statisticsItemKey = statisticsItemInfo.getType() + "_" + statisticsItemInfo.getState().toString();
StatisticsItemInfo statisticsItemValue =
statisticsItemMap.getOrDefault(statisticsItemKey, null);
if (statisticsItemValue != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaner to do contains or similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.

long statisticsItemValueCount = statisticsItemValue.getCount();
long statisticsItemInfoCount = statisticsItemInfo.getCount();
long newCount = statisticsItemValueCount + statisticsItemInfoCount;
statisticsItemValue.setCount(newCount);
} else {
statisticsItemValue = new StatisticsItemInfo(statisticsItemInfo);
}
statisticsItemMap.put(statisticsItemKey, statisticsItemValue);
}
});

if (!statisticsItemMap.isEmpty()) {
result.getStatItems().addAll(statisticsItemMap.values());
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Collections;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
Expand Down Expand Up @@ -58,6 +60,16 @@
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expand

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
Expand All @@ -78,13 +90,22 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.mockito.Mockito.mock;

/**
* This class mocks the RESTRequestInterceptor.
*/
Expand Down Expand Up @@ -132,8 +153,9 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
// Initialize appReport
ApplicationReport appReport = ApplicationReport.newInstance(
appId, ApplicationAttemptId.newInstance(appId, 1), null, newApp.getQueue(), null, null, 0,
null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0, null, null, null,
false, Priority.newInstance(newApp.getPriority()), null, null);
null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0,
newApp.getApplicationType(), null, null, false, Priority.newInstance(newApp.getPriority()),
null, null);

// Initialize appTimeoutsMap
HashMap<ApplicationTimeoutType, ApplicationTimeout> appTimeoutsMap = new HashMap<>();
Expand Down Expand Up @@ -661,4 +683,108 @@ public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, Str
AppQueue targetAppQueue = new AppQueue(targetQueue.getQueue());
return Response.status(Status.OK).entity(targetAppQueue).build();
}

public void updateApplicationState(YarnApplicationState appState, String appId)
throws AuthorizationException, YarnException, InterruptedException, IOException {
validateRunning();
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationReport appReport = applicationMap.get(applicationId);
appReport.setYarnApplicationState(appState);
}

@Override
public ApplicationStatisticsInfo getAppStatistics(
HttpServletRequest hsr, Set<String> stateQueries, Set<String> typeQueries) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

Map<String, StatisticsItemInfo> itemInfoMap = new HashMap<>();

for (HashMap.Entry<ApplicationId, ApplicationReport> item : applicationMap.entrySet()) {

ApplicationReport applicationReport = item.getValue();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't do getKey, we can iterate .values()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.

YarnApplicationState appState = applicationReport.getYarnApplicationState();
String appType = applicationReport.getApplicationType();

if (stateQueries.contains(appState.name()) && typeQueries.contains(appType)) {
String itemInfoMapKey = appState.toString() + "_" + appType;
StatisticsItemInfo itemInfo = itemInfoMap.getOrDefault(itemInfoMapKey, null);
if (itemInfo == null) {
itemInfo = new StatisticsItemInfo(appState, appType, 1);
} else {
long newCount = itemInfo.getCount() + 1;
itemInfo.setCount(newCount);
}
itemInfoMap.put(itemInfoMapKey, itemInfo);
}
}

ArrayList<StatisticsItemInfo> itemInfos = new ArrayList<>(itemInfoMap.values());

return new ApplicationStatisticsInfo(itemInfos);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could do:

return new ApplicationStatisticsInfo(itemInfoMap.values());

Copy link
Contributor Author

@slfan1989 slfan1989 Aug 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestion, I will modify the code.

}

@Override
public AppActivitiesInfo getAppActivities(
HttpServletRequest hsr, String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds, String groupBy, String limit, Set<String> actions,
boolean summarize) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

SchedulerNode schedulerNode =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.

TestUtils.getMockNode("host0", "rack", 1, 10240);

RMContext rmContext = Mockito.mock(RMContext.class);
Mockito.when(rmContext.getYarnConfiguration()).thenReturn(this.getConf());
ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
Mockito.when(scheduler.getMinimumResourceCapability()).thenReturn(Resources.none());
Mockito.when(rmContext.getScheduler()).thenReturn(scheduler);
LeafQueue mockQueue = Mockito.mock(LeafQueue.class);
Map<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<>();
Mockito.doReturn(rmApps).when(rmContext).getRMApps();

FiCaSchedulerNode node = (FiCaSchedulerNode) schedulerNode;
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId,0);
RMApp mockApp = Mockito.mock(RMApp.class);
Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp).getApplicationId();
Mockito.doReturn(FinalApplicationStatus.UNDEFINED).when(mockApp).getFinalApplicationStatus();
rmApps.put(appAttemptId.getApplicationId(), mockApp);
FiCaSchedulerApp app = new FiCaSchedulerApp(appAttemptId, "user", mockQueue,
mock(ActiveUsersManager.class), rmContext);

ActivitiesManager newActivitiesManager = new ActivitiesManager(rmContext);
newActivitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 3);

int numActivities = 10;
for (int i = 0; i < numActivities; i++) {
ActivitiesLogger.APP.startAppAllocationRecording(newActivitiesManager, node,
SystemClock.getInstance().getTime(), app);
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(newActivitiesManager, node, app,
new SchedulerRequestKey(Priority.newInstance(0), 0, null),
ActivityDiagnosticConstant.NODE_IS_BLACKLISTED, ActivityState.REJECTED, ActivityLevel.NODE);
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(newActivitiesManager,
app.getApplicationId(), ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
}

Set<Integer> prioritiesInt =
requestPriorities.stream().map(pri -> Integer.parseInt(pri)).collect(Collectors.toSet());
Set<Long> allocationReqIds =
allocationRequestIds.stream().map(id -> Long.parseLong(id)).collect(Collectors.toSet());
AppActivitiesInfo appActivitiesInfo = newActivitiesManager.
getAppActivitiesInfo(app.getApplicationId(), prioritiesInt, allocationReqIds, null,
Integer.parseInt(limit), summarize, 3);

return appActivitiesInfo;
}
}
Loading