Skip to content

Commit

Permalink
YARN-11390. TestResourceTrackerService.testNodeRemovalNormally: Shutd…
Browse files Browse the repository at this point in the history
…own nodes should be 0 now expected: <1> but was: <0>

- The hardcoded sleep what was used in the test was not the most stable solution
- It was replaced with a polling assert
  • Loading branch information
K0K0V0K committed Dec 6, 2022
1 parent 86ac1ad commit 5fc63f9
Showing 1 changed file with 39 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
Expand Down Expand Up @@ -2345,50 +2347,49 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
}

//Test decommed/ing node that transitions to untracked,timer should remove
testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3,
maxThreadSleeptime, doGraceful);
testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3, doGraceful);
rm.stop();
}

// A helper method used by testNodeRemovalUtil to avoid exceeding
// max allowed length.
private void testNodeRemovalUtilDecomToUntracked(
RMContext rmContext, Configuration conf,
MockNM nm1, MockNM nm2, MockNM nm3,
long maxThreadSleeptime, boolean doGraceful) throws Exception {
MockNM nm1, MockNM nm2, MockNM nm3, boolean doGraceful
) throws Exception {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
String ip = NetUtils.normalizeHostName("localhost");
CountDownLatch latch = new CountDownLatch(1);
writeToHostsFile("host1", ip, "host2");
writeToHostsFile(excludeHostFile, "host2");
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
//nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
RMNode rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertNotEquals("Timer for this node was not canceled!",
rmNode, null);
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
Supplier<RMNode> nodeSupplier = doGraceful
? () -> rmContext.getRMNodes().get(nm2.getNodeId())
: () -> rmContext.getInactiveRMNodes().get(nm2.getNodeId());
pollingAssert(() -> nodeSupplier.get() != null,
"Timer for this node was not canceled!");
final List<NodeState> expectedStates = Arrays.asList(
NodeState.DECOMMISSIONED,
NodeState.DECOMMISSIONING
);
pollingAssert(() -> expectedStates.contains(nodeSupplier.get().getState()),
"Node should be in one of these states: " + expectedStates);


writeToHostsFile("host1", ip);
writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf);
nm2.nodeHeartbeat(true);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should have been forgotten!",
rmNode, null);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumDecommisionedNMs(), 0);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
pollingAssert(() -> nodeSupplier.get() == null,
"Node should have been forgotten!");
pollingAssert(metrics::getNumDecommisionedNMs, 0,
"metrics#getNumDecommisionedNMs should be 0 now");
pollingAssert(metrics::getNumShutdownNMs, 0,
"metrics#getNumShutdownNMs should be 0 now");
pollingAssert(metrics::getNumActiveNMs, 2,
"metrics#getNumActiveNMs should be 2 now");
}

private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception {
Expand Down Expand Up @@ -2959,6 +2960,20 @@ protected ResourceTrackerService createResourceTrackerService() {
mockRM.stop();
}

private void pollingAssert(Supplier<Boolean> supplier, String message)
throws InterruptedException {
pollingAssert(supplier, true, message);
}

private <T> void pollingAssert(Supplier<T> supplier, T expected, String message)
throws InterruptedException {
long timeOut = System.currentTimeMillis() + 10_000;
while (System.currentTimeMillis() < timeOut && !Objects.equals(expected, supplier.get())) {
Thread.sleep(100);
}
Assert.assertEquals(message, expected, supplier.get());
}

/**
* A no-op implementation of NodeAttributeStore for testing
*/
Expand Down

0 comments on commit 5fc63f9

Please sign in to comment.