-
Notifications
You must be signed in to change notification settings - Fork 9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
YARN-11219. [Federation] Add getAppActivities, getAppStatistics REST APIs for Router. #4757
Changes from 3 commits
2da6161
ac69043
5787b7b
b61e580
1fd2862
9984ec9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
|
||
import javax.xml.bind.annotation.XmlAccessType; | ||
import javax.xml.bind.annotation.XmlAccessorType; | ||
|
@@ -33,12 +34,15 @@ public class ApplicationStatisticsInfo { | |
public ApplicationStatisticsInfo() { | ||
} // JAXB needs this | ||
|
||
public ApplicationStatisticsInfo(Collection<StatisticsItemInfo> items) { | ||
statItem.addAll(items); | ||
} | ||
|
||
public void add(StatisticsItemInfo statItem) { | ||
this.statItem.add(statItem); | ||
} | ||
|
||
public ArrayList<StatisticsItemInfo> getStatItems() { | ||
return statItem; | ||
} | ||
|
||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will fix it. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,6 +57,8 @@ | |
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; | ||
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; | ||
import org.apache.hadoop.yarn.webapp.BadRequestException; | ||
import org.apache.hadoop.yarn.webapp.ForbiddenException; | ||
|
@@ -540,4 +542,38 @@ public static NodeToLabelsInfo mergeNodeToLabels( | |
|
||
return new NodeToLabelsInfo(nodeToLabels); | ||
} | ||
|
||
public static ApplicationStatisticsInfo mergeApplicationStatisticsInfo( | ||
Collection<ApplicationStatisticsInfo> appStatistics) { | ||
ApplicationStatisticsInfo result = new ApplicationStatisticsInfo(); | ||
Map<String, StatisticsItemInfo> statisticsItemMap = new HashMap<>(); | ||
|
||
appStatistics.stream().forEach(appStatistic -> { | ||
List<StatisticsItemInfo> statisticsItemInfos = appStatistic.getStatItems(); | ||
for (StatisticsItemInfo statisticsItemInfo : statisticsItemInfos) { | ||
|
||
String statisticsItemKey = | ||
statisticsItemInfo.getType() + "_" + statisticsItemInfo.getState().toString(); | ||
|
||
StatisticsItemInfo statisticsItemValue; | ||
if(statisticsItemMap.containsKey(statisticsItemKey)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. space after if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your help reviewing the code, I will fix it. |
||
statisticsItemValue = statisticsItemMap.get(statisticsItemKey); | ||
long statisticsItemValueCount = statisticsItemValue.getCount(); | ||
long statisticsItemInfoCount = statisticsItemInfo.getCount(); | ||
long newCount = statisticsItemValueCount + statisticsItemInfoCount; | ||
statisticsItemValue.setCount(newCount); | ||
} else { | ||
statisticsItemValue = new StatisticsItemInfo(statisticsItemInfo); | ||
} | ||
|
||
statisticsItemMap.put(statisticsItemKey, statisticsItemValue); | ||
} | ||
}); | ||
|
||
if (!statisticsItemMap.isEmpty()) { | ||
result.getStatItems().addAll(statisticsItemMap.values()); | ||
} | ||
|
||
return result; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,8 @@ | |
import java.util.Collections; | ||
import java.util.Arrays; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.stream.Collectors; | ||
|
||
import javax.servlet.http.HttpServletRequest; | ||
import javax.servlet.http.HttpServletResponse; | ||
|
@@ -58,6 +60,16 @@ | |
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; | ||
import org.apache.hadoop.yarn.exceptions.YarnException; | ||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; | ||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; | ||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.*; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Expand There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will fix it. |
||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo; | ||
|
@@ -78,13 +90,22 @@ | |
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; | ||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; | ||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; | ||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; | ||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; | ||
import org.apache.hadoop.yarn.util.SystemClock; | ||
import org.apache.hadoop.yarn.util.resource.Resources; | ||
import org.apache.hadoop.yarn.webapp.NotFoundException; | ||
import org.mockito.Mockito; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import static org.mockito.Mockito.mock; | ||
|
||
/** | ||
* This class mocks the RESTRequestInterceptor. | ||
*/ | ||
|
@@ -132,8 +153,9 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, | |
// Initialize appReport | ||
ApplicationReport appReport = ApplicationReport.newInstance( | ||
appId, ApplicationAttemptId.newInstance(appId, 1), null, newApp.getQueue(), null, null, 0, | ||
null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0, null, null, null, | ||
false, Priority.newInstance(newApp.getPriority()), null, null); | ||
null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0, | ||
newApp.getApplicationType(), null, null, false, Priority.newInstance(newApp.getPriority()), | ||
null, null); | ||
|
||
// Initialize appTimeoutsMap | ||
HashMap<ApplicationTimeoutType, ApplicationTimeout> appTimeoutsMap = new HashMap<>(); | ||
|
@@ -661,4 +683,105 @@ public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, Str | |
AppQueue targetAppQueue = new AppQueue(targetQueue.getQueue()); | ||
return Response.status(Status.OK).entity(targetAppQueue).build(); | ||
} | ||
|
||
public void updateApplicationState(YarnApplicationState appState, String appId) | ||
throws AuthorizationException, YarnException, InterruptedException, IOException { | ||
validateRunning(); | ||
ApplicationId applicationId = ApplicationId.fromString(appId); | ||
if (!applicationMap.containsKey(applicationId)) { | ||
throw new NotFoundException("app with id: " + appId + " not found"); | ||
} | ||
ApplicationReport appReport = applicationMap.get(applicationId); | ||
appReport.setYarnApplicationState(appState); | ||
} | ||
|
||
@Override | ||
public ApplicationStatisticsInfo getAppStatistics( | ||
HttpServletRequest hsr, Set<String> stateQueries, Set<String> typeQueries) { | ||
if (!isRunning) { | ||
throw new RuntimeException("RM is stopped"); | ||
} | ||
|
||
Map<String, StatisticsItemInfo> itemInfoMap = new HashMap<>(); | ||
|
||
for (ApplicationReport appReport : applicationMap.values()) { | ||
|
||
YarnApplicationState appState = appReport.getYarnApplicationState(); | ||
String appType = appReport.getApplicationType(); | ||
|
||
if (stateQueries.contains(appState.name()) && typeQueries.contains(appType)) { | ||
String itemInfoMapKey = appState.toString() + "_" + appType; | ||
StatisticsItemInfo itemInfo = itemInfoMap.getOrDefault(itemInfoMapKey, null); | ||
if (itemInfo == null) { | ||
itemInfo = new StatisticsItemInfo(appState, appType, 1); | ||
} else { | ||
long newCount = itemInfo.getCount() + 1; | ||
itemInfo.setCount(newCount); | ||
} | ||
itemInfoMap.put(itemInfoMapKey, itemInfo); | ||
} | ||
} | ||
|
||
return new ApplicationStatisticsInfo(itemInfoMap.values()); | ||
} | ||
|
||
@Override | ||
public AppActivitiesInfo getAppActivities( | ||
HttpServletRequest hsr, String appId, String time, Set<String> requestPriorities, | ||
Set<String> allocationRequestIds, String groupBy, String limit, Set<String> actions, | ||
boolean summarize) { | ||
if (!isRunning) { | ||
throw new RuntimeException("RM is stopped"); | ||
} | ||
|
||
ApplicationId applicationId = ApplicationId.fromString(appId); | ||
if (!applicationMap.containsKey(applicationId)) { | ||
throw new NotFoundException("app with id: " + appId + " not found"); | ||
} | ||
|
||
SchedulerNode schedulerNode = TestUtils.getMockNode("host0", "rack", 1, 10240); | ||
|
||
RMContext rmContext = Mockito.mock(RMContext.class); | ||
Mockito.when(rmContext.getYarnConfiguration()).thenReturn(this.getConf()); | ||
ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class); | ||
Mockito.when(scheduler.getMinimumResourceCapability()).thenReturn(Resources.none()); | ||
Mockito.when(rmContext.getScheduler()).thenReturn(scheduler); | ||
LeafQueue mockQueue = Mockito.mock(LeafQueue.class); | ||
Map<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<>(); | ||
Mockito.doReturn(rmApps).when(rmContext).getRMApps(); | ||
|
||
FiCaSchedulerNode node = (FiCaSchedulerNode) schedulerNode; | ||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 0); | ||
RMApp mockApp = Mockito.mock(RMApp.class); | ||
Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp).getApplicationId(); | ||
Mockito.doReturn(FinalApplicationStatus.UNDEFINED).when(mockApp).getFinalApplicationStatus(); | ||
rmApps.put(appAttemptId.getApplicationId(), mockApp); | ||
FiCaSchedulerApp app = new FiCaSchedulerApp(appAttemptId, "user", mockQueue, | ||
mock(ActiveUsersManager.class), rmContext); | ||
|
||
ActivitiesManager newActivitiesManager = new ActivitiesManager(rmContext); | ||
newActivitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 3); | ||
|
||
int numActivities = 10; | ||
for (int i = 0; i < numActivities; i++) { | ||
ActivitiesLogger.APP.startAppAllocationRecording(newActivitiesManager, node, | ||
SystemClock.getInstance().getTime(), app); | ||
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(newActivitiesManager, node, app, | ||
new SchedulerRequestKey(Priority.newInstance(0), 0, null), | ||
ActivityDiagnosticConstant.NODE_IS_BLACKLISTED, ActivityState.REJECTED, | ||
ActivityLevel.NODE); | ||
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(newActivitiesManager, | ||
app.getApplicationId(), ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); | ||
} | ||
|
||
Set<Integer> prioritiesInt = | ||
requestPriorities.stream().map(pri -> Integer.parseInt(pri)).collect(Collectors.toSet()); | ||
Set<Long> allocationReqIds = | ||
allocationRequestIds.stream().map(id -> Long.parseLong(id)).collect(Collectors.toSet()); | ||
AppActivitiesInfo appActivitiesInfo = newActivitiesManager. | ||
getAppActivitiesInfo(app.getApplicationId(), prioritiesInt, allocationReqIds, null, | ||
Integer.parseInt(limit), summarize, 3); | ||
|
||
return appActivitiesInfo; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your help reviewing the code, I will modify the code.