Skip to content
This repository has been archived by the owner on Jul 22, 2022. It is now read-only.

Commit

Permalink
YARN-8992. Fair scheduler can delete a dynamic queue while an applica…
Browse files Browse the repository at this point in the history
…tion attempt is being added to the queue. (Contributed by Wilfred Spiegelenburg)
  • Loading branch information
haibchen authored and sjrand committed Mar 21, 2021
1 parent 870b79c commit 9eec406
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,20 @@ public int getNumAssignedApps() {
}
}

@Override
public boolean isEmpty() {
readLock.lock();
try {
if (runnableApps.size() > 0 || nonRunnableApps.size() > 0 ||
assignedApps.size() > 0) {
return false;
}
} finally {
readLock.unlock();
}
return true;
}

/**
* TODO: Based on how frequently this is called, we might want to club
* counting pending and active apps in the same method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,21 @@ public int getNumRunnableApps() {
}
}

@Override
public boolean isEmpty() {
readLock.lock();
try {
for (FSQueue queue: childQueues) {
if (!queue.isEmpty()) {
return false;
}
}
} finally {
readLock.unlock();
}
return true;
}

@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,4 +601,6 @@ public boolean isDynamic() {
public void setDynamic(boolean dynamic) {
this.isDynamic = dynamic;
}

public abstract boolean isEmpty();
}
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ public void removeEmptyDynamicQueues() {
}
while (!parentQueuesToCheck.isEmpty()) {
FSParentQueue queue = parentQueuesToCheck.iterator().next();
if (queue.getChildQueues().isEmpty()) {
if (queue.isEmpty()) {
removeQueue(queue);
if (queue.getParent().isDynamic()) {
parentQueuesToCheck.add(queue.getParent());
Expand Down Expand Up @@ -489,7 +489,7 @@ public void removePendingIncompatibleQueues() {
* @return true if removed, false otherwise
*/
private boolean removeQueueIfEmpty(FSQueue queue) {
if (isEmpty(queue)) {
if (queue.isEmpty()) {
removeQueue(queue);
return true;
}
Expand All @@ -514,26 +514,6 @@ private void removeQueue(FSQueue queue) {
}
}

/**
* Returns true if there are no applications, running or not, in the given
* queue or any of its descendents.
*/
protected boolean isEmpty(FSQueue queue) {
if (queue instanceof FSLeafQueue) {
FSLeafQueue leafQueue = (FSLeafQueue)queue;
return queue.getNumRunnableApps() == 0 &&
leafQueue.getNumNonRunnableApps() == 0 &&
leafQueue.getNumAssignedApps() == 0;
} else {
for (FSQueue child : queue.getChildQueues()) {
if (!isEmpty(child)) {
return false;
}
}
return true;
}
}

/**
* Gets a queue by name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import org.junit.Before;
import org.junit.Test;

import java.util.HashSet;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -34,7 +31,6 @@ public class TestFSParentQueue {

private FairSchedulerConfiguration conf;
private QueueManager queueManager;
private Set<FSQueue> notEmptyQueues;

@Before
public void setUp() throws Exception {
Expand All @@ -47,13 +43,7 @@ public void setUp() throws Exception {
new DefaultResourceCalculator());
SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock);
notEmptyQueues = new HashSet<FSQueue>();
queueManager = new QueueManager(scheduler) {
@Override
public boolean isEmpty(FSQueue queue) {
return !notEmptyQueues.contains(queue);
}
};
queueManager = new QueueManager(scheduler);
FSQueueMetrics.forQueue("root", null, true, conf);
queueManager.initialize(conf);
}
Expand Down
Loading

0 comments on commit 9eec406

Please sign in to comment.