From 3fe502d687d1f3f1e8fd698a7ea129cb49005bce Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Mon, 29 Aug 2022 19:52:29 +0800 Subject: [PATCH] TEZ-4441: TezAppMaster may stuck because of reportError skip send error event (#236) (zhengchenyu reviewed by Laszlo Bodor) --- .../org/apache/tez/dag/app/DAGAppMaster.java | 2 +- .../tez/dag/app/rm/TaskSchedulerManager.java | 8 +- .../dag/app/rm/TestTaskSchedulerHelpers.java | 17 ++++ .../dag/app/rm/TestTaskSchedulerManager.java | 89 +++++++++++++++++++ 4 files changed, 114 insertions(+), 2 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index f1486e8ba0..ffbf0976c4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -2663,7 +2663,7 @@ private boolean enableWebUIService() { } @VisibleForTesting - static void parseAllPlugins( + public static void parseAllPlugins( List taskSchedulerDescriptors, BiMap taskSchedulerPluginMap, List containerLauncherDescriptors, BiMap containerLauncherPluginMap, List taskCommDescriptors, BiMap taskCommPluginMap, diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index cc2e163720..4e919db8bb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -908,7 +908,8 @@ public void reportError(int taskSchedulerIndex, ServicePluginError servicePlugin LOG.info("Error reported by scheduler {} - {}", Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext) + ": " + diagnostics); - if (taskSchedulerDescriptors[taskSchedulerIndex].getClassName().equals(yarnSchedulerClassName)) { + if (taskSchedulerDescriptors[taskSchedulerIndex].getEntityName() + .equals(TezConstants.getTezYarnServicePluginName())) { LOG.warn( "Reporting a SchedulerServiceError to the DAGAppMaster since the error" + " was reported by the YARN task scheduler"); @@ -1076,4 +1077,9 @@ public String getTaskSchedulerClassName(int taskSchedulerIndex) { return taskSchedulers[taskSchedulerIndex].getTaskScheduler().getClass().getName(); } + @VisibleForTesting + public TaskScheduler getTaskScheduler(int taskSchedulerIndex) { + return taskSchedulers[taskSchedulerIndex].getTaskScheduler(); + } + } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index b7acc6876c..490067a54c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -161,6 +161,19 @@ public TaskSchedulerManagerForTest(AppContext appContext, this.defaultPayload = defaultPayload; } + TaskSchedulerManagerForTest(AppContext appContext, + EventHandler eventHandler, + TezAMRMClientAsync amrmClientAsync, + ContainerSignatureMatcher containerSignatureMatcher, + UserPayload defaultPayload, + List descriptors) { + super(appContext, null, eventHandler, containerSignatureMatcher, null, descriptors, + false, new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim()); + this.amrmClientAsync = amrmClientAsync; + this.containerSignatureMatcher = containerSignatureMatcher; + this.defaultPayload = defaultPayload; + } + @SuppressWarnings("unchecked") @Override public void instantiateSchedulers(String host, int port, String trackingUrl, @@ -224,6 +237,10 @@ public Event verifyInvocation(Class eventClass) { fail("Expected Event: " + eventClass.getName() + " not sent"); return null; } + + public int getEventSize() { + return this.events.size(); + } } static class TaskSchedulerWithDrainableContext extends YarnTaskSchedulerService { diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java index dcf9a5dd69..e416c65702 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java @@ -45,11 +45,17 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; @@ -59,6 +65,8 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ContainerSignatureMatcher; @@ -72,10 +80,16 @@ import org.apache.tez.dag.api.client.DAGClientServer; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.ClusterInfo; import org.apache.tez.dag.app.ContainerContext; +import org.apache.tez.dag.app.ContainerHeartbeatHandler; +import org.apache.tez.dag.app.DAGAppMaster; +import org.apache.tez.dag.app.DAGAppMasterState; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.TaskAttempt; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; @@ -89,6 +103,8 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.AMContainerState; +import org.apache.tez.dag.app.rm.container.ContainerContextMatcher; +import org.apache.tez.dag.app.rm.node.AMNodeTracker; import org.apache.tez.dag.app.web.WebUIService; import org.apache.tez.dag.helpers.DagInfoImplForTest; import org.apache.tez.dag.records.TaskAttemptTerminationCause; @@ -839,6 +855,60 @@ protected void instantiateSchedulers(String host, int port, String trackingUrl, } } + @Test(timeout = 10000) + public void testHandleException() throws Exception { + Configuration tezConf = new Configuration(new YarnConfiguration()); + UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(tezConf); + + // Parse plugins + List tsDescriptors = Lists.newLinkedList(); + BiMap tsMap = HashBiMap.create(); + DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, Lists.newLinkedList(), HashBiMap.create(), Lists.newLinkedList(), + HashBiMap.create(), null, false, defaultPayload); + + // Only TezYarn found. + Assert.assertEquals(1, tsDescriptors.size()); + Assert.assertEquals(TezConstants.getTezYarnServicePluginName(), tsDescriptors.get(0).getEntityName()); + + // Construct eventHandler + TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler(); + TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); + + // Construct AMRMClient + AMRMClient rmClientCore = + new TestTaskSchedulerHelpers.AMRMClientForTest(); + TezAMRMClientAsync rmClient = + spy(new TestTaskSchedulerHelpers.AMRMClientAsyncForTest(rmClientCore, 100)); + + // Construct appContext + AppContext appContext = mock(AppContext.class); + doReturn(new Configuration(false)).when(appContext).getAMConf(); + AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), + mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); + AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); + doReturn(amContainerMap).when(appContext).getAllContainers(); + doReturn(amNodeTracker).when(appContext).getNodeTracker(); + doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); + doReturn(dagID).when(appContext).getCurrentDAGID(); + doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); + + // Construct TaskSchedulerManager + TaskSchedulerManager taskSchedulerManagerReal = + new TestTaskSchedulerHelpers.TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, + new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher(), defaultPayload, tsDescriptors); + TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); + taskSchedulerManager.init(tezConf); + taskSchedulerManager.start(); + + // Send error to schedule, then expect DAGAppMasterEventSchedulingServiceError event. + YarnTaskSchedulerService scheduler = ((YarnTaskSchedulerService) taskSchedulerManager.getTaskScheduler(0)); + scheduler.onError(new Exception("Trigger by unit test")); + waitFor(() -> { + return eventHandler.getEventSize() > 0; + }, 1000, 5000); + eventHandler.verifyInvocation(DAGAppMasterEventSchedulingServiceError.class); + } + private static class ExceptionAnswer implements Answer { @Override public Object answer(InvocationOnMock invocation) throws Throwable { @@ -1107,4 +1177,23 @@ public boolean hasUnregistered() throws ServicePluginException { public void dagComplete() throws ServicePluginException { } } + + public static void waitFor(Supplier check, int checkEveryMillis, + int waitForMillis) throws TimeoutException, InterruptedException { + Preconditions.checkNotNull(check, "Input supplier interface should be initailized"); + Preconditions.checkArgument(waitForMillis >= checkEveryMillis, + "Total wait time should be greater than check interval time"); + + long st = Time.monotonicNow(); + boolean result = check.get(); + + while (!result && (Time.monotonicNow() - st < waitForMillis)) { + Thread.sleep(checkEveryMillis); + result = check.get(); + } + + if (!result) { + throw new TimeoutException("Timed out waiting for condition."); + } + } }