Skip to content

Commit

Permalink
TEZ-4441: TezAppMaster may stuck because of reportError skip send err…
Browse files Browse the repository at this point in the history
…or event (#236) (zhengchenyu reviewed by Laszlo Bodor)
  • Loading branch information
zhengchenyu authored and abstractdog committed Aug 29, 2022
1 parent 6231277 commit 3fe502d
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2663,7 +2663,7 @@ private boolean enableWebUIService() {
}

@VisibleForTesting
static void parseAllPlugins(
public static void parseAllPlugins(
List<NamedEntityDescriptor> taskSchedulerDescriptors, BiMap<String, Integer> taskSchedulerPluginMap,
List<NamedEntityDescriptor> containerLauncherDescriptors, BiMap<String, Integer> containerLauncherPluginMap,
List<NamedEntityDescriptor> taskCommDescriptors, BiMap<String, Integer> taskCommPluginMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,8 @@ public void reportError(int taskSchedulerIndex, ServicePluginError servicePlugin
LOG.info("Error reported by scheduler {} - {}",
Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext) + ": " +
diagnostics);
if (taskSchedulerDescriptors[taskSchedulerIndex].getClassName().equals(yarnSchedulerClassName)) {
if (taskSchedulerDescriptors[taskSchedulerIndex].getEntityName()
.equals(TezConstants.getTezYarnServicePluginName())) {
LOG.warn(
"Reporting a SchedulerServiceError to the DAGAppMaster since the error" +
" was reported by the YARN task scheduler");
Expand Down Expand Up @@ -1076,4 +1077,9 @@ public String getTaskSchedulerClassName(int taskSchedulerIndex) {
return taskSchedulers[taskSchedulerIndex].getTaskScheduler().getClass().getName();
}

@VisibleForTesting
public TaskScheduler getTaskScheduler(int taskSchedulerIndex) {
return taskSchedulers[taskSchedulerIndex].getTaskScheduler();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ public TaskSchedulerManagerForTest(AppContext appContext,
this.defaultPayload = defaultPayload;
}

TaskSchedulerManagerForTest(AppContext appContext,
EventHandler eventHandler,
TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
ContainerSignatureMatcher containerSignatureMatcher,
UserPayload defaultPayload,
List<NamedEntityDescriptor> descriptors) {
super(appContext, null, eventHandler, containerSignatureMatcher, null, descriptors,
false, new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim());
this.amrmClientAsync = amrmClientAsync;
this.containerSignatureMatcher = containerSignatureMatcher;
this.defaultPayload = defaultPayload;
}

@SuppressWarnings("unchecked")
@Override
public void instantiateSchedulers(String host, int port, String trackingUrl,
Expand Down Expand Up @@ -224,6 +237,10 @@ public Event verifyInvocation(Class<? extends Event> eventClass) {
fail("Expected Event: " + eventClass.getName() + " not sent");
return null;
}

public int getEventSize() {
return this.events.size();
}
}

static class TaskSchedulerWithDrainableContext extends YarnTaskSchedulerService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
Expand All @@ -59,6 +65,8 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ContainerSignatureMatcher;
Expand All @@ -72,10 +80,16 @@
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
Expand All @@ -89,6 +103,8 @@
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.AMContainerState;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.app.rm.node.AMNodeTracker;
import org.apache.tez.dag.app.web.WebUIService;
import org.apache.tez.dag.helpers.DagInfoImplForTest;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
Expand Down Expand Up @@ -839,6 +855,60 @@ protected void instantiateSchedulers(String host, int port, String trackingUrl,
}
}

@Test(timeout = 10000)
public void testHandleException() throws Exception {
Configuration tezConf = new Configuration(new YarnConfiguration());
UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(tezConf);

// Parse plugins
List<NamedEntityDescriptor> tsDescriptors = Lists.newLinkedList();
BiMap<String, Integer> tsMap = HashBiMap.create();
DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, Lists.newLinkedList(), HashBiMap.create(), Lists.newLinkedList(),
HashBiMap.create(), null, false, defaultPayload);

// Only TezYarn found.
Assert.assertEquals(1, tsDescriptors.size());
Assert.assertEquals(TezConstants.getTezYarnServicePluginName(), tsDescriptors.get(0).getEntityName());

// Construct eventHandler
TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);

// Construct AMRMClient
AMRMClient<YarnTaskSchedulerService.CookieContainerRequest> rmClientCore =
new TestTaskSchedulerHelpers.AMRMClientForTest();
TezAMRMClientAsync<YarnTaskSchedulerService.CookieContainerRequest> rmClient =
spy(new TestTaskSchedulerHelpers.AMRMClientAsyncForTest(rmClientCore, 100));

// Construct appContext
AppContext appContext = mock(AppContext.class);
doReturn(new Configuration(false)).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
doReturn(amContainerMap).when(appContext).getAllContainers();
doReturn(amNodeTracker).when(appContext).getNodeTracker();
doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
doReturn(dagID).when(appContext).getCurrentDAGID();
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();

// Construct TaskSchedulerManager
TaskSchedulerManager taskSchedulerManagerReal =
new TestTaskSchedulerHelpers.TaskSchedulerManagerForTest(appContext, eventHandler, rmClient,
new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher(), defaultPayload, tsDescriptors);
TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
taskSchedulerManager.init(tezConf);
taskSchedulerManager.start();

// Send error to schedule, then expect DAGAppMasterEventSchedulingServiceError event.
YarnTaskSchedulerService scheduler = ((YarnTaskSchedulerService) taskSchedulerManager.getTaskScheduler(0));
scheduler.onError(new Exception("Trigger by unit test"));
waitFor(() -> {
return eventHandler.getEventSize() > 0;
}, 1000, 5000);
eventHandler.verifyInvocation(DAGAppMasterEventSchedulingServiceError.class);
}

private static class ExceptionAnswer implements Answer {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Expand Down Expand Up @@ -1107,4 +1177,23 @@ public boolean hasUnregistered() throws ServicePluginException {
public void dagComplete() throws ServicePluginException {
}
}

public static void waitFor(Supplier<Boolean> check, int checkEveryMillis,
int waitForMillis) throws TimeoutException, InterruptedException {
Preconditions.checkNotNull(check, "Input supplier interface should be initailized");
Preconditions.checkArgument(waitForMillis >= checkEveryMillis,
"Total wait time should be greater than check interval time");

long st = Time.monotonicNow();
boolean result = check.get();

while (!result && (Time.monotonicNow() - st < waitForMillis)) {
Thread.sleep(checkEveryMillis);
result = check.get();
}

if (!result) {
throw new TimeoutException("Timed out waiting for condition.");
}
}
}

0 comments on commit 3fe502d

Please sign in to comment.