-
Notifications
You must be signed in to change notification settings - Fork 9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
YARN-11219. [Federation] Add getAppActivities, getAppStatistics REST APIs for Router. #4757
Changes from 1 commit
2da6161
ac69043
5787b7b
b61e580
1fd2862
9984ec9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1129,13 +1129,49 @@ public AppActivitiesInfo getAppActivities(HttpServletRequest hsr, | |
String appId, String time, Set<String> requestPriorities, | ||
Set<String> allocationRequestIds, String groupBy, String limit, | ||
Set<String> actions, boolean summarize) { | ||
throw new NotImplementedException("Code is not implemented"); | ||
|
||
// Only verify the app_id, because the specific subCluster needs to be found according to the app_id, | ||
// and other verifications are directly handed over to the corresponding subCluster RM | ||
if (appId == null || appId.isEmpty()) { | ||
throw new IllegalArgumentException("Parameter error, the appId is empty or null."); | ||
} | ||
|
||
try { | ||
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); | ||
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( | ||
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); | ||
|
||
final HttpServletRequest hsrCopy = clone(hsr); | ||
return interceptor.getAppActivities(hsrCopy, appId, time, requestPriorities, | ||
allocationRequestIds, groupBy, limit, actions, summarize); | ||
} catch (IllegalArgumentException e) { | ||
RouterServerUtil.logAndThrowRunTimeException(e, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Single line? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will fix it. |
||
"Unable to get subCluster by appId: %s.", appId); | ||
} catch (YarnException e) { | ||
RouterServerUtil.logAndThrowRunTimeException("getAppActivities Failed.", e); | ||
} | ||
|
||
return null; | ||
} | ||
|
||
@Override | ||
public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr, | ||
Set<String> stateQueries, Set<String> typeQueries) { | ||
throw new NotImplementedException("Code is not implemented"); | ||
try { | ||
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); | ||
final HttpServletRequest hsrCopy = clone(hsr); | ||
Class[] argsClasses = new Class[]{HttpServletRequest.class, Set.class, Set.class}; | ||
Object[] args = new Object[]{hsrCopy, stateQueries, typeQueries}; | ||
ClientMethod remoteMethod = new ClientMethod("getAppStatistics", argsClasses, args); | ||
Map<SubClusterInfo, ApplicationStatisticsInfo> appStatisticsMap = invokeConcurrent( | ||
subClustersActive.values(), remoteMethod, ApplicationStatisticsInfo.class); | ||
return RouterWebServiceUtil.mergeApplicationStatisticsInfo(appStatisticsMap.values()); | ||
} catch (IOException e) { | ||
RouterServerUtil.logAndThrowRunTimeException(e, "Get all active sub cluster(s) error."); | ||
} catch (YarnException e) { | ||
RouterServerUtil.logAndThrowRunTimeException(e, "getAppStatistics error."); | ||
} | ||
return null; | ||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,6 +57,8 @@ | |
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; | ||
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; | ||
import org.apache.hadoop.yarn.webapp.BadRequestException; | ||
import org.apache.hadoop.yarn.webapp.ForbiddenException; | ||
|
@@ -540,4 +542,34 @@ public static NodeToLabelsInfo mergeNodeToLabels( | |
|
||
return new NodeToLabelsInfo(nodeToLabels); | ||
} | ||
|
||
public static ApplicationStatisticsInfo mergeApplicationStatisticsInfo( | ||
Collection<ApplicationStatisticsInfo> appStatistics) { | ||
ApplicationStatisticsInfo result = new ApplicationStatisticsInfo(); | ||
HashMap<String, StatisticsItemInfo> statisticsItemMap = new HashMap(); | ||
|
||
appStatistics.stream().forEach(appStatistic -> { | ||
List<StatisticsItemInfo> statisticsItemInfos = appStatistic.getStatItems(); | ||
for (StatisticsItemInfo statisticsItemInfo : statisticsItemInfos) { | ||
String statisticsItemKey = statisticsItemInfo.getType() + "_" + statisticsItemInfo.getState().toString(); | ||
StatisticsItemInfo statisticsItemValue = | ||
statisticsItemMap.getOrDefault(statisticsItemKey, null); | ||
if (statisticsItemValue != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cleaner to do contains or similar. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will fix it. |
||
long statisticsItemValueCount = statisticsItemValue.getCount(); | ||
long statisticsItemInfoCount = statisticsItemInfo.getCount(); | ||
long newCount = statisticsItemValueCount + statisticsItemInfoCount; | ||
statisticsItemValue.setCount(newCount); | ||
} else { | ||
statisticsItemValue = new StatisticsItemInfo(statisticsItemInfo); | ||
} | ||
statisticsItemMap.put(statisticsItemKey, statisticsItemValue); | ||
} | ||
}); | ||
|
||
if (!statisticsItemMap.isEmpty()) { | ||
result.getStatItems().addAll(statisticsItemMap.values()); | ||
} | ||
|
||
return result; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,8 @@ | |
import java.util.Collections; | ||
import java.util.Arrays; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.stream.Collectors; | ||
|
||
import javax.servlet.http.HttpServletRequest; | ||
import javax.servlet.http.HttpServletResponse; | ||
|
@@ -58,6 +60,16 @@ | |
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; | ||
import org.apache.hadoop.yarn.exceptions.YarnException; | ||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; | ||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; | ||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.*; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Expand There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will fix it. |
||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; | ||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo; | ||
|
@@ -78,13 +90,22 @@ | |
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; | ||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; | ||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; | ||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; | ||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; | ||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; | ||
import org.apache.hadoop.yarn.util.SystemClock; | ||
import org.apache.hadoop.yarn.util.resource.Resources; | ||
import org.apache.hadoop.yarn.webapp.NotFoundException; | ||
import org.mockito.Mockito; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import static org.mockito.Mockito.mock; | ||
|
||
/** | ||
* This class mocks the RESTRequestInterceptor. | ||
*/ | ||
|
@@ -132,8 +153,9 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, | |
// Initialize appReport | ||
ApplicationReport appReport = ApplicationReport.newInstance( | ||
appId, ApplicationAttemptId.newInstance(appId, 1), null, newApp.getQueue(), null, null, 0, | ||
null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0, null, null, null, | ||
false, Priority.newInstance(newApp.getPriority()), null, null); | ||
null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0, | ||
newApp.getApplicationType(), null, null, false, Priority.newInstance(newApp.getPriority()), | ||
null, null); | ||
|
||
// Initialize appTimeoutsMap | ||
HashMap<ApplicationTimeoutType, ApplicationTimeout> appTimeoutsMap = new HashMap<>(); | ||
|
@@ -661,4 +683,108 @@ public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, Str | |
AppQueue targetAppQueue = new AppQueue(targetQueue.getQueue()); | ||
return Response.status(Status.OK).entity(targetAppQueue).build(); | ||
} | ||
|
||
public void updateApplicationState(YarnApplicationState appState, String appId) | ||
throws AuthorizationException, YarnException, InterruptedException, IOException { | ||
validateRunning(); | ||
ApplicationId applicationId = ApplicationId.fromString(appId); | ||
if (!applicationMap.containsKey(applicationId)) { | ||
throw new NotFoundException("app with id: " + appId + " not found"); | ||
} | ||
ApplicationReport appReport = applicationMap.get(applicationId); | ||
appReport.setYarnApplicationState(appState); | ||
} | ||
|
||
@Override | ||
public ApplicationStatisticsInfo getAppStatistics( | ||
HttpServletRequest hsr, Set<String> stateQueries, Set<String> typeQueries) { | ||
if (!isRunning) { | ||
throw new RuntimeException("RM is stopped"); | ||
} | ||
|
||
Map<String, StatisticsItemInfo> itemInfoMap = new HashMap<>(); | ||
|
||
for (HashMap.Entry<ApplicationId, ApplicationReport> item : applicationMap.entrySet()) { | ||
|
||
ApplicationReport applicationReport = item.getValue(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't do getKey, we can iterate .values() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will fix it. |
||
YarnApplicationState appState = applicationReport.getYarnApplicationState(); | ||
String appType = applicationReport.getApplicationType(); | ||
|
||
if (stateQueries.contains(appState.name()) && typeQueries.contains(appType)) { | ||
String itemInfoMapKey = appState.toString() + "_" + appType; | ||
StatisticsItemInfo itemInfo = itemInfoMap.getOrDefault(itemInfoMapKey, null); | ||
if (itemInfo == null) { | ||
itemInfo = new StatisticsItemInfo(appState, appType, 1); | ||
} else { | ||
long newCount = itemInfo.getCount() + 1; | ||
itemInfo.setCount(newCount); | ||
} | ||
itemInfoMap.put(itemInfoMapKey, itemInfo); | ||
} | ||
} | ||
|
||
ArrayList<StatisticsItemInfo> itemInfos = new ArrayList<>(itemInfoMap.values()); | ||
|
||
return new ApplicationStatisticsInfo(itemInfos); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could do:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for your suggestion, I will modify the code. |
||
} | ||
|
||
@Override | ||
public AppActivitiesInfo getAppActivities( | ||
HttpServletRequest hsr, String appId, String time, Set<String> requestPriorities, | ||
Set<String> allocationRequestIds, String groupBy, String limit, Set<String> actions, | ||
boolean summarize) { | ||
if (!isRunning) { | ||
throw new RuntimeException("RM is stopped"); | ||
} | ||
|
||
ApplicationId applicationId = ApplicationId.fromString(appId); | ||
if (!applicationMap.containsKey(applicationId)) { | ||
throw new NotFoundException("app with id: " + appId + " not found"); | ||
} | ||
|
||
SchedulerNode schedulerNode = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Single line There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will fix it. |
||
TestUtils.getMockNode("host0", "rack", 1, 10240); | ||
|
||
RMContext rmContext = Mockito.mock(RMContext.class); | ||
Mockito.when(rmContext.getYarnConfiguration()).thenReturn(this.getConf()); | ||
ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class); | ||
Mockito.when(scheduler.getMinimumResourceCapability()).thenReturn(Resources.none()); | ||
Mockito.when(rmContext.getScheduler()).thenReturn(scheduler); | ||
LeafQueue mockQueue = Mockito.mock(LeafQueue.class); | ||
Map<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<>(); | ||
Mockito.doReturn(rmApps).when(rmContext).getRMApps(); | ||
|
||
FiCaSchedulerNode node = (FiCaSchedulerNode) schedulerNode; | ||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId,0); | ||
RMApp mockApp = Mockito.mock(RMApp.class); | ||
Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp).getApplicationId(); | ||
Mockito.doReturn(FinalApplicationStatus.UNDEFINED).when(mockApp).getFinalApplicationStatus(); | ||
rmApps.put(appAttemptId.getApplicationId(), mockApp); | ||
FiCaSchedulerApp app = new FiCaSchedulerApp(appAttemptId, "user", mockQueue, | ||
mock(ActiveUsersManager.class), rmContext); | ||
|
||
ActivitiesManager newActivitiesManager = new ActivitiesManager(rmContext); | ||
newActivitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 3); | ||
|
||
int numActivities = 10; | ||
for (int i = 0; i < numActivities; i++) { | ||
ActivitiesLogger.APP.startAppAllocationRecording(newActivitiesManager, node, | ||
SystemClock.getInstance().getTime(), app); | ||
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(newActivitiesManager, node, app, | ||
new SchedulerRequestKey(Priority.newInstance(0), 0, null), | ||
ActivityDiagnosticConstant.NODE_IS_BLACKLISTED, ActivityState.REJECTED, ActivityLevel.NODE); | ||
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(newActivitiesManager, | ||
app.getApplicationId(), ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); | ||
} | ||
|
||
Set<Integer> prioritiesInt = | ||
requestPriorities.stream().map(pri -> Integer.parseInt(pri)).collect(Collectors.toSet()); | ||
Set<Long> allocationReqIds = | ||
allocationRequestIds.stream().map(id -> Long.parseLong(id)).collect(Collectors.toSet()); | ||
AppActivitiesInfo appActivitiesInfo = newActivitiesManager. | ||
getAppActivitiesInfo(app.getApplicationId(), prioritiesInt, allocationReqIds, null, | ||
Integer.parseInt(limit), summarize, 3); | ||
|
||
return appActivitiesInfo; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your help reviewing the code, I will modify the code.