Skip to content

Commit

Permalink
YARN-11390. TestResourceTrackerService.testNodeRemovalNormally: Shutd…
Browse files Browse the repository at this point in the history
…own nodes should be 0 now expected: <1> but was: <0> (#5190)

Reviewed-by: Peter Szucs
Signed-off-by: Chris Nauroth <[email protected]>
(cherry picked from commit ee7d178)
(cherry picked from commit 8b748c1)
  • Loading branch information
K0K0V0K authored and cnauroth committed Dec 8, 2022
1 parent 96930ab commit b9033f9
Showing 1 changed file with 39 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
Expand All @@ -44,11 +45,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
Expand Down Expand Up @@ -2257,50 +2261,49 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
}

//Test decommed/ing node that transitions to untracked,timer should remove
testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3,
maxThreadSleeptime, doGraceful);
testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3, doGraceful);
rm.stop();
}

// A helper method used by testNodeRemovalUtil to avoid exceeding
// max allowed length.
private void testNodeRemovalUtilDecomToUntracked(
RMContext rmContext, Configuration conf,
MockNM nm1, MockNM nm2, MockNM nm3,
long maxThreadSleeptime, boolean doGraceful) throws Exception {
MockNM nm1, MockNM nm2, MockNM nm3, boolean doGraceful
) throws Exception {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
String ip = NetUtils.normalizeHostName("localhost");
CountDownLatch latch = new CountDownLatch(1);
writeToHostsFile("host1", ip, "host2");
writeToHostsFile(excludeHostFile, "host2");
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
//nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
RMNode rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertNotEquals("Timer for this node was not canceled!",
rmNode, null);
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
Supplier<RMNode> nodeSupplier = doGraceful
? () -> rmContext.getRMNodes().get(nm2.getNodeId())
: () -> rmContext.getInactiveRMNodes().get(nm2.getNodeId());
pollingAssert(() -> nodeSupplier.get() != null,
"Timer for this node was not canceled!");
final List<NodeState> expectedStates = Arrays.asList(
NodeState.DECOMMISSIONED,
NodeState.DECOMMISSIONING
);
pollingAssert(() -> expectedStates.contains(nodeSupplier.get().getState()),
"Node should be in one of these states: " + expectedStates);


writeToHostsFile("host1", ip);
writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf);
nm2.nodeHeartbeat(true);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should have been forgotten!",
rmNode, null);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumDecommisionedNMs(), 0);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
pollingAssert(() -> nodeSupplier.get() == null,
"Node should have been forgotten!");
pollingAssert(metrics::getNumDecommisionedNMs, 0,
"metrics#getNumDecommisionedNMs should be 0 now");
pollingAssert(metrics::getNumShutdownNMs, 0,
"metrics#getNumShutdownNMs should be 0 now");
pollingAssert(metrics::getNumActiveNMs, 2,
"metrics#getNumActiveNMs should be 2 now");
}

private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception {
Expand Down Expand Up @@ -2825,6 +2828,18 @@ public void testResponseIdOverflow() throws Exception {
Assert.assertEquals(1, nodeHeartbeat.getResponseId());
}

private void pollingAssert(Supplier<Boolean> supplier, String message)
throws InterruptedException, TimeoutException {
GenericTestUtils.waitFor(supplier,
100, 10_000, message);
}

private <T> void pollingAssert(Supplier<T> supplier, T expected, String message)
throws InterruptedException, TimeoutException {
GenericTestUtils.waitFor(() -> Objects.equals(supplier.get(), expected),
100, 10_000, message);
}

/**
* A no-op implementation of NodeAttributeStore for testing.
*/
Expand Down

0 comments on commit b9033f9

Please sign in to comment.