Skip to content

Commit

Permalink
YARN-10996. Fix race condition of User object acquisitions. Contribut…
Browse files Browse the repository at this point in the history
…ed by Andras Gyori
  • Loading branch information
szilard-nemeth committed Nov 12, 2021
1 parent db89a94 commit e220e88
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Set;
import java.util.TreeSet;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
Expand Down Expand Up @@ -419,17 +420,18 @@ private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
String userName = app.getUser();
TempUserPerPartition tmpUser = usersPerPartition.get(userName);
if (tmpUser == null) {
ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName)
.getResourceUsage();
// User might have already been removed, but preemption still accounts for this app,
// therefore reinserting the user will not cause a memory leak
User user = tq.leafQueue.getOrCreateUser(userName);
ResourceUsage userResourceUsage = user.getResourceUsage();

// perUserAMUsed was populated with running apps, now we are looping
// through both running and pending apps.
Resource userSpecificAmUsed = perUserAMUsed.get(userName);
amUsed = (userSpecificAmUsed == null)
? Resources.none() : userSpecificAmUsed;

tmpUser = new TempUserPerPartition(
tq.leafQueue.getUser(userName), tq.queueName,
tmpUser = new TempUserPerPartition(user, tq.queueName,
Resources.clone(userResourceUsage.getUsed(partition)),
Resources.clone(amUsed),
Resources.clone(userResourceUsage.getReserved(partition)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,11 @@ public User getUser(String userName) {
return usersManager.getUser(userName);
}

@VisibleForTesting
public User getOrCreateUser(String userName) {
return usersManager.getUserAndAddIfAbsent(userName);
}

@Private
public List<AppPriorityACLGroup> getPriorityACLs() {
readLock.lock();
Expand Down Expand Up @@ -2007,15 +2012,25 @@ public void decUsedResource(String nodeLabel, Resource resourceToDec,

public void incAMUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().incAMUsed(nodeLabel,
User user = getUser(application.getUser());
if (user == null) {
return;
}

user.getResourceUsage().incAMUsed(nodeLabel,
resourceToInc);
// ResourceUsage has its own lock, no addition lock needs here.
usageTracker.getQueueUsage().incAMUsed(nodeLabel, resourceToInc);
}

public void decAMUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().decAMUsed(nodeLabel,
User user = getUser(application.getUser());
if (user == null) {
return;
}

user.getResourceUsage().decAMUsed(nodeLabel,
resourceToDec);
// ResourceUsage has its own lock, no addition lock needs here.
usageTracker.getQueueUsage().decAMUsed(nodeLabel, resourceToDec);
Expand Down Expand Up @@ -2103,7 +2118,7 @@ public Resource getTotalPendingResourcesConsideringUserLimit(
for (FiCaSchedulerApp app : getApplications()) {
String userName = app.getUser();
if (!userNameToHeadroom.containsKey(userName)) {
User user = getUser(userName);
User user = getUsersManager().getUserAndAddIfAbsent(userName);
Resource headroom = Resources.subtract(
getResourceLimitForActiveUsers(app.getUser(), clusterResources,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,7 @@ partitionResource, getUsageRatio(nodePartition),
lQueue.getMinimumAllocation());

if (LOG.isDebugEnabled()) {
float weight = lQueue.getUserWeights().getByUser(userName);
LOG.debug("User limit computation for " + userName
+ ", in queue: " + lQueue.getQueuePath()
+ ", userLimitPercent=" + lQueue.getUserLimit()
Expand All @@ -834,7 +835,7 @@ partitionResource, getUsageRatio(nodePartition),
+ ", Partition=" + nodePartition
+ ", resourceUsed=" + resourceUsed
+ ", maxUserLimit=" + maxUserLimit
+ ", userWeight=" + getUser(userName).getWeight()
+ ", userWeight=" + weight
);
}
return userLimitResource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ private void setupUserToQueueSettings(String label, String queueName,
user.setResourceUsage(userResourceUsage.get(userName));
}
when(queue.getUser(eq(userName))).thenReturn(user);
when(queue.getOrCreateUser(eq(userName))).thenReturn(user);
when(queue.getResourceLimitForAllUsers(eq(userName),
any(Resource.class), anyString(), any(SchedulingMode.class)))
.thenReturn(userLimit);
Expand Down

0 comments on commit e220e88

Please sign in to comment.