From 73486cc66483aab86763b230a66c8612db03eb76 Mon Sep 17 00:00:00 2001 From: Syed Shameerur Rahman Date: Wed, 16 Mar 2022 21:44:49 +0530 Subject: [PATCH] TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler --- .../apache/tez/dag/api/TezConfiguration.java | 20 +- .../tez/common/DagContainerLauncher.java | 6 + .../org/apache/tez/dag/app/DAGAppMaster.java | 4 + .../dag/app/dag/event/VertexEventType.java | 1 + .../dag/event/VertexShuffleDataDeletion.java | 43 ++ .../apache/tez/dag/app/dag/impl/DAGImpl.java | 8 + .../tez/dag/app/dag/impl/VertexImpl.java | 72 ++- .../VertexShuffleDataDeletionContext.java | 96 ++++ .../launcher/ContainerLauncherManager.java | 8 + .../launcher/ContainerLauncherWrapper.java | 9 + .../tez/dag/app/launcher/DeletionTracker.java | 7 + .../dag/app/launcher/DeletionTrackerImpl.java | 23 + .../app/launcher/LocalContainerLauncher.java | 15 +- .../launcher/TezContainerLauncherImpl.java | 16 +- .../app/launcher/VertexDeleteRunnable.java | 82 +++ .../tez/dag/app/dag/impl/TestVertexImpl.java | 529 ++++++++++++++++-- .../TestContainerLauncherWrapper.java | 2 +- .../tez/auxservices/ShuffleHandler.java | 48 ++ .../tez/auxservices/TestShuffleHandler.java | 78 +++ .../library/common/TezRuntimeUtils.java | 19 + 20 files changed, 1020 insertions(+), 66 deletions(-) create mode 100644 tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexShuffleDataDeletion.java create mode 100644 tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexShuffleDataDeletionContext.java create mode 100644 tez-dag/src/main/java/org/apache/tez/dag/app/launcher/VertexDeleteRunnable.java diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 16d1dfcb70..71ebfee10b 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -883,6 +883,22 @@ public TezConfiguration(boolean loadDefaults) { + "dag.cleanup.on.completion"; public static final boolean TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT = false; + /** + * Integer value. Instructs AM to delete vertex shuffle data if a vertex and all its + * child vertices at a certain depth are completed. Value less than or equal to 0 indicates the feature + * is disabled. + * Let's say we have a dag Map1 - Reduce2 - Reduce3 - Reduce4. + * case:1 height = 1 + * when Reduce 2 completes all the shuffle data of Map1 will be deleted and so on for other vertex. + * case: 2 height = 2 + * when Reduce 3 completes all the shuffle data of Map1 will be deleted and so on for other vertex. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="integer") + public static final String TEZ_AM_VERTEX_CLEANUP_HEIGHT = TEZ_AM_PREFIX + + "vertex.cleanup.height"; + public static final int TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT = 0; + /** * Boolean value. Instructs AM to delete intermediate attempt data for failed task attempts. */ @@ -893,8 +909,8 @@ public TezConfiguration(boolean loadDefaults) { public static final boolean TEZ_AM_TASK_ATTEMPT_CLEANUP_ON_FAILURE_DEFAULT = false; /** - * Int value. Upper limit on the number of threads used to delete DAG directories and failed task attempts - * directories on nodes. + * Int value. Upper limit on the number of threads used to delete DAG directories, + * Vertex directories and failed task attempts directories on nodes. */ @ConfigurationScope(Scope.AM) @ConfigurationProperty(type="integer") diff --git a/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java b/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java index 6bda0a8da3..c2337af77e 100644 --- a/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java +++ b/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java @@ -24,9 +24,12 @@ import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import java.util.Set; + /** * Plugin to allow custom container launchers to be written to launch containers that want to * support cleanup of DAG level directories upon DAG completion in session mode. The directories are created by @@ -43,6 +46,9 @@ public DagContainerLauncher(ContainerLauncherContext containerLauncherContext) { public abstract void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager); + public abstract void vertexComplete(TezVertexID vertex, JobTokenSecretManager jobTokenSecretManager, + Set nodeIdList); + public abstract void taskAttemptFailed(TezTaskAttemptID taskAttemptID, JobTokenSecretManager jobTokenSecretManager, NodeId nodeId); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 972fadf854..58288612bb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -2739,6 +2739,10 @@ String buildPluginComponentLog(List namedEntityDescriptor return sb.toString(); } + public void vertexComplete(TezVertexID completedVertexID, Set nodesList) { + getContainerLauncherManager().vertexComplete(completedVertexID, jobTokenSecretManager, nodesList); + } + public void taskAttemptFailed(TezTaskAttemptID attemptID, NodeId nodeId) { getContainerLauncherManager().taskAttemptFailed(attemptID, jobTokenSecretManager, nodeId); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java index 15be94dfa1..ed325290cb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java @@ -34,6 +34,7 @@ public enum VertexEventType { V_START, V_SOURCE_TASK_ATTEMPT_COMPLETED, V_SOURCE_VERTEX_STARTED, + V_DELETE_SHUFFLE_DATA, //Producer:Task V_TASK_COMPLETED, diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexShuffleDataDeletion.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexShuffleDataDeletion.java new file mode 100644 index 0000000000..8ea3a154b3 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexShuffleDataDeletion.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.event; + +import org.apache.tez.dag.app.dag.Vertex; + + +public class VertexShuffleDataDeletion extends VertexEvent { + // child vertex + private Vertex sourceVertex; + // parent vertex + private Vertex targetVertex; + + public VertexShuffleDataDeletion(Vertex sourceVertex, Vertex targetVertex) { + super(targetVertex.getVertexId(), VertexEventType.V_DELETE_SHUFFLE_DATA); + this.sourceVertex = sourceVertex; + this.targetVertex = targetVertex; + } + + public Vertex getSourceVertex() { + return sourceVertex; + } + + public Vertex getTargetVertex() { + return targetVertex; + } +} \ No newline at end of file diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index c9337e473a..aa28e02441 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -51,6 +51,7 @@ import org.apache.tez.common.counters.LimitExceededException; import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; import org.apache.tez.dag.app.dag.event.DiagnosableEvent; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.state.OnStateChangedCallback; import org.apache.tez.state.StateMachineTez; import org.slf4j.Logger; @@ -1772,6 +1773,13 @@ private static void parseVertexEdges(DAGImpl dag, Map edgePlan vertex.setInputVertices(inVertices); vertex.setOutputVertices(outVertices); + boolean cleanupShuffleDataAtVertexLevel = dag.dagConf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT, + TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT) > 0 && ShuffleUtils.isTezShuffleHandler(dag.dagConf); + if (cleanupShuffleDataAtVertexLevel) { + int deletionHeight = dag.dagConf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT, + TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT); + ((VertexImpl) vertex).initShuffleDeletionContext(deletionHeight); + } } /** diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 934dd4e76e..e55b10a659 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -114,6 +114,8 @@ import org.apache.tez.dag.app.dag.RootInputInitializerManager; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.app.dag.TaskAttempt; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; import org.apache.tez.dag.app.dag.TaskTerminationCause; import org.apache.tez.dag.app.dag.Vertex; @@ -130,6 +132,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; import org.apache.tez.dag.app.dag.event.TaskEventTermination; import org.apache.tez.dag.app.dag.event.TaskEventType; +import org.apache.tez.dag.app.dag.event.VertexShuffleDataDeletion; import org.apache.tez.dag.app.dag.event.VertexEvent; import org.apache.tez.dag.app.dag.event.VertexEventCommitCompleted; import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation; @@ -187,6 +190,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.state.OnStateChangedCallback; import org.apache.tez.state.StateMachineTez; import org.apache.tez.util.StringInterner; @@ -556,7 +560,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl VertexEventType.V_ROUTE_EVENT, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, - VertexEventType.V_TASK_RESCHEDULED)) + VertexEventType.V_TASK_RESCHEDULED, + VertexEventType.V_DELETE_SHUFFLE_DATA)) // Transitions from SUCCEEDED state .addTransition( @@ -592,6 +597,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, new TaskAttemptCompletedEventTransition()) + .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED, + VertexEventType.V_DELETE_SHUFFLE_DATA, + new VertexShuffleDeleteTransition()) // Transitions from FAILED state @@ -613,7 +621,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_NULL_EDGE_INITIALIZED, - VertexEventType.V_INPUT_DATA_INFORMATION)) + VertexEventType.V_INPUT_DATA_INFORMATION, + VertexEventType.V_DELETE_SHUFFLE_DATA)) // Transitions from KILLED state .addTransition( @@ -635,7 +644,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl VertexEventType.V_TASK_COMPLETED, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_NULL_EDGE_INITIALIZED, - VertexEventType.V_INPUT_DATA_INFORMATION)) + VertexEventType.V_INPUT_DATA_INFORMATION, + VertexEventType.V_DELETE_SHUFFLE_DATA)) // No transitions from INTERNAL_ERROR state. Ignore all. .addTransition( @@ -655,7 +665,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl VertexEventType.V_INTERNAL_ERROR, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_NULL_EDGE_INITIALIZED, - VertexEventType.V_INPUT_DATA_INFORMATION)) + VertexEventType.V_INPUT_DATA_INFORMATION, + VertexEventType.V_DELETE_SHUFFLE_DATA)) // create the topology tables .installTopology(); @@ -729,6 +740,9 @@ private void augmentStateMachine() { @VisibleForTesting Map sourceVertices; private Map targetVertices; + private boolean cleanupShuffleDataAtVertexLevel; + @VisibleForTesting + VertexShuffleDataDeletionContext vShuffleDeletionContext; Set uninitializedEdges = Sets.newHashSet(); // using a linked hash map to conveniently map edge names to a contiguous index LinkedHashMap ioIndices = Maps.newLinkedHashMap(); @@ -1151,7 +1165,9 @@ public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, .append(", ContainerLauncher=").append(containerLauncherIdentifier).append(":").append(containerLauncherName) .append(", TaskCommunicator=").append(taskCommunicatorIdentifier).append(":").append(taskCommName); LOG.info(sb.toString()); - + cleanupShuffleDataAtVertexLevel = vertexConf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT, + TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT) > 0 && + ShuffleUtils.isTezShuffleHandler(vertexConf); stateMachine = new StateMachineTez( stateMachineFactory.make(this), this); augmentStateMachine(); @@ -2306,6 +2322,12 @@ static VertexState checkTasksForCompletion(final VertexImpl vertex) { if((vertexSucceeded || vertexFailuresBelowThreshold) && vertex.terminationCause == null) { if(vertexSucceeded) { LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier); + if (vertex.cleanupShuffleDataAtVertexLevel) { + + for (Vertex v : vertex.vShuffleDeletionContext.getAncestors()) { + vertex.eventHandler.handle(new VertexShuffleDataDeletion(vertex, v)); + } + } } else { LOG.info("All tasks in the vertex " + vertex.logIdentifier + " have completed and the percentage of failed tasks (failed/total) (" + vertex.failedTaskCount + "/" + vertex.numTasks + ") is less that the threshold of " + vertex.maxFailuresPercent); vertex.addDiagnostic("Vertex succeeded as percentage of failed tasks (failed/total) (" + vertex.failedTaskCount + "/" + vertex.numTasks + ") is less that the threshold of " + vertex.maxFailuresPercent); @@ -3758,6 +3780,36 @@ public VertexState transition(VertexImpl vertex, VertexEvent event) { } } + private static class VertexShuffleDeleteTransition implements + SingleArcTransition { + + @Override + public void transition(VertexImpl vertex, VertexEvent event) { + int incompleteChildrenVertices = vertex.vShuffleDeletionContext.getIncompleteChildrenVertices(); + incompleteChildrenVertices = incompleteChildrenVertices - 1; + vertex.vShuffleDeletionContext.setIncompleteChildrenVertices(incompleteChildrenVertices); + // check if all the child vertices are completed + if (incompleteChildrenVertices == 0) { + LOG.info("Vertex shuffle data deletion for vertex name: " + + vertex.getName() + " with vertex id: " + vertex.getVertexId()); + // Get nodes of all the task attempts in vertex + Set nodes = Sets.newHashSet(); + Map tasksMap = vertex.getTasks(); + tasksMap.keySet().forEach(taskId -> { + Map taskAttemptMap = tasksMap.get(taskId).getAttempts(); + taskAttemptMap.keySet().forEach(attemptId -> { + nodes.add(taskAttemptMap.get(attemptId).getNodeId()); + }); + }); + vertex.appContext.getAppMaster().vertexComplete( + vertex.vertexId, nodes); + } else { + LOG.debug("The number of incomplete child vertex are {} for the vertex {}", + incompleteChildrenVertices, vertex.vertexId); + } + } + } + private static class TaskCompletedAfterVertexSuccessTransition implements MultipleArcTransition { @Override @@ -4930,4 +4982,14 @@ public boolean getTaskRescheduleRelaxedLocality() { public Map> getDownstreamBlamingHosts(){ return downstreamBlamingHosts; } + + /** + * Initialize context from vertex shuffle deletion. + * @param deletionHeight + */ + public void initShuffleDeletionContext(int deletionHeight) { + VertexShuffleDataDeletionContext vShuffleDeletionContext = new VertexShuffleDataDeletionContext(deletionHeight); + vShuffleDeletionContext.setSpannedVertices(this); + this.vShuffleDeletionContext = vShuffleDeletionContext; + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexShuffleDataDeletionContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexShuffleDataDeletionContext.java new file mode 100644 index 0000000000..4ffdf11231 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexShuffleDataDeletionContext.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.impl; + +import org.apache.tez.dag.app.dag.Vertex; + +import java.util.HashSet; +import java.util.Set; + +public class VertexShuffleDataDeletionContext { + private int deletionHeight; + private int incompleteChildrenVertices; + private Set ancestors; + private Set children; + + public VertexShuffleDataDeletionContext(int deletionHeight) { + this.deletionHeight = deletionHeight; + this.incompleteChildrenVertices = 0; + this.ancestors = new HashSet<>(); + this.children = new HashSet<>(); + } + + public void setSpannedVertices(Vertex vertex) { + getSpannedVerticesAncestors(vertex, ancestors, deletionHeight); + getSpannedVerticesChildren(vertex, children, deletionHeight); + setIncompleteChildrenVertices(children.size()); + } + + /** + * get all the ancestor vertices at a particular depth. + */ + private static void getSpannedVerticesAncestors(Vertex vertex, Set ancestorVertices, int level) { + if (level == 0) { + ancestorVertices.add(vertex); + return; + } + + if (level == 1) { + ancestorVertices.addAll(vertex.getInputVertices().keySet()); + return; + } + + vertex.getInputVertices().forEach((inVertex, edge) -> getSpannedVerticesAncestors(inVertex, ancestorVertices, + level - 1)); + } + + /** + * get all the child vertices at a particular depth. + */ + private static void getSpannedVerticesChildren(Vertex vertex, Set childVertices, int level) { + if (level == 0) { + childVertices.add(vertex); + return; + } + + if (level == 1) { + childVertices.addAll(vertex.getOutputVertices().keySet()); + return; + } + + vertex.getOutputVertices().forEach((outVertex, edge) -> getSpannedVerticesChildren(outVertex, childVertices, + level - 1)); + } + + public void setIncompleteChildrenVertices(int incompleteChildrenVertices) { + this.incompleteChildrenVertices = incompleteChildrenVertices; + } + + public int getIncompleteChildrenVertices() { + return this.incompleteChildrenVertices; + } + + public Set getAncestors() { + return this.ancestors; + } + + public Set getChildren() { + return this.children; + } +} diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java index b0e0f0cf0d..65360d6d01 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java @@ -16,6 +16,7 @@ import java.net.UnknownHostException; import java.util.List; +import java.util.Set; import com.google.common.annotations.VisibleForTesting; import org.apache.tez.common.Preconditions; @@ -37,6 +38,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -202,6 +204,12 @@ public void dagComplete(TezDAGID dag, JobTokenSecretManager secretManager) { } } + public void vertexComplete(TezVertexID vertex, JobTokenSecretManager secretManager, Set nodeIdList) { + for (int i = 0; i < containerLaunchers.length; i++) { + containerLaunchers[i].vertexComplete(vertex, secretManager, nodeIdList); + } + } + public void taskAttemptFailed(TezTaskAttemptID taskAttemptID, JobTokenSecretManager secretManager, NodeId nodeId) { for (int i = 0; i < containerLaunchers.length; i++) { containerLaunchers[i].taskAttemptFailed(taskAttemptID, secretManager, nodeId); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java index 5d262bdab4..4703abe100 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java @@ -14,11 +14,14 @@ package org.apache.tez.dag.app.launcher; +import java.util.Set; + import org.apache.tez.common.DagContainerLauncher; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerStopRequest; @@ -49,6 +52,12 @@ public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManage } } + public void vertexComplete(TezVertexID vertex, JobTokenSecretManager jobTokenSecretManager, Set nodeIdList) { + if (real instanceof DagContainerLauncher) { + ((DagContainerLauncher) real).vertexComplete(vertex, jobTokenSecretManager, nodeIdList); + } + } + public void taskAttemptFailed(TezTaskAttemptID taskAttemptID, JobTokenSecretManager jobTokenSecretManager, NodeId nodeId) { if (real instanceof DagContainerLauncher) { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java index 87b7366bfc..56760c86be 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java @@ -18,11 +18,14 @@ package org.apache.tez.dag.app.launcher; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezVertexID; public abstract class DeletionTracker { @@ -36,6 +39,10 @@ public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManage //do nothing } + public void vertexComplete(TezVertexID vertex, JobTokenSecretManager jobTokenSecretManager, Set nodeIdList) { + //do nothing + } + public void taskAttemptFailed(TezTaskAttemptID taskAttemptID, JobTokenSecretManager jobTokenSecretManager, NodeId nodeId) { //do nothing diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java index e4204bfc71..73eaa68e72 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,4 +115,25 @@ public void shutdown() { } nodeIdShufflePortMap = null; } + + @Override + public void vertexComplete(TezVertexID vertex, JobTokenSecretManager jobTokenSecretManager, Set nodeIdList) { + super.vertexComplete(vertex, jobTokenSecretManager, nodeIdList); + String vertexId = String.format("%02d", vertex.getId()); + for (NodeId nodeId : nodeIdList) { + Integer shufflePort = null; + if (nodeIdShufflePortMap != null) { + shufflePort = nodeIdShufflePortMap.get(nodeId); + } + if (shufflePort != null) { + VertexDeleteRunnable vertexDeleteRunnable = new VertexDeleteRunnable(vertex, jobTokenSecretManager, nodeId, + shufflePort, vertexId, TezRuntimeUtils.getHttpConnectionParams(conf)); + try { + dagCleanupService.submit(vertexDeleteRunnable); + } catch (RejectedExecutionException rejectedException) { + LOG.info("Ignoring deletion request for " + vertexDeleteRunnable); + } + } + } + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index ebc8f95566..47cc9f1325 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -50,6 +51,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; @@ -96,6 +98,7 @@ public class LocalContainerLauncher extends DagContainerLauncher { int shufflePort = TezRuntimeUtils.INVALID_PORT; private DeletionTracker deletionTracker; private boolean dagDelete; + private boolean vertexDelete; private boolean failedTaskAttemptDelete; private final ConcurrentHashMap> @@ -162,11 +165,14 @@ public LocalContainerLauncher(ContainerLauncherContext containerLauncherContext, dagDelete = ShuffleUtils.isTezShuffleHandler(conf) && conf.getBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION, TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT); + vertexDelete = ShuffleUtils.isTezShuffleHandler(conf) && + conf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT, + TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT) > 0; failedTaskAttemptDelete = ShuffleUtils.isTezShuffleHandler(conf) && conf.getBoolean(TezConfiguration.TEZ_AM_TASK_ATTEMPT_CLEANUP_ON_FAILURE, TezConfiguration.TEZ_AM_TASK_ATTEMPT_CLEANUP_ON_FAILURE_DEFAULT); - if (dagDelete || failedTaskAttemptDelete) { + if (dagDelete || vertexDelete || failedTaskAttemptDelete) { String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS, TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT); deletionTracker = ReflectionUtils.createClazzInstance( @@ -454,6 +460,13 @@ public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManage } } + @Override + public void vertexComplete(TezVertexID dag, JobTokenSecretManager jobTokenSecretManager, Set nodeIdList) { + if (vertexDelete && deletionTracker != null) { + deletionTracker.vertexComplete(dag, jobTokenSecretManager, nodeIdList); + } + } + @Override public void taskAttemptFailed(TezTaskAttemptID taskAttemptID, JobTokenSecretManager jobTokenSecretManager, NodeId nodeId) { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java index 88ed4f7b89..654224adb5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java @@ -21,8 +21,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; -import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; @@ -43,6 +43,7 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; @@ -93,6 +94,7 @@ public class TezContainerLauncherImpl extends DagContainerLauncher { private AtomicBoolean serviceStopped = new AtomicBoolean(false); private DeletionTracker deletionTracker = null; private boolean dagDelete; + private boolean vertexDelete; private boolean failedTaskAttemptDelete; private Container getContainer(ContainerOp event) { @@ -339,11 +341,14 @@ public void run() { dagDelete = ShuffleUtils.isTezShuffleHandler(conf) && conf.getBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION, TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT); + vertexDelete = ShuffleUtils.isTezShuffleHandler(conf) && + conf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT, + TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT) > 0; failedTaskAttemptDelete = ShuffleUtils.isTezShuffleHandler(conf) && conf.getBoolean(TezConfiguration.TEZ_AM_TASK_ATTEMPT_CLEANUP_ON_FAILURE, TezConfiguration.TEZ_AM_TASK_ATTEMPT_CLEANUP_ON_FAILURE_DEFAULT); - if (dagDelete || failedTaskAttemptDelete) { + if (dagDelete || vertexDelete || failedTaskAttemptDelete) { String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS, TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT); deletionTracker = ReflectionUtils.createClazzInstance( @@ -454,6 +459,13 @@ public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManage } } + @Override + public void vertexComplete(TezVertexID vertex, JobTokenSecretManager jobTokenSecretManager, Set nodeIdList) { + if (vertexDelete && deletionTracker != null) { + deletionTracker.vertexComplete(vertex, jobTokenSecretManager, nodeIdList); + } + } + @Override public void taskAttemptFailed(TezTaskAttemptID taskAttemptID, JobTokenSecretManager jobTokenSecretManager, NodeId nodeId) { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/VertexDeleteRunnable.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/VertexDeleteRunnable.java new file mode 100644 index 0000000000..a8d25379a5 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/VertexDeleteRunnable.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.http.BaseHttpConnection; +import org.apache.tez.http.HttpConnectionParams; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; + +public class VertexDeleteRunnable implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(VertexDeleteRunnable.class); + final private TezVertexID vertex; + final private JobTokenSecretManager jobTokenSecretManager; + final private NodeId nodeId; + final private int shufflePort; + final private String vertexId; + final private HttpConnectionParams httpConnectionParams; + + VertexDeleteRunnable(TezVertexID vertex, JobTokenSecretManager jobTokenSecretManager, + NodeId nodeId, int shufflePort, String vertexId, + HttpConnectionParams httpConnectionParams) { + this.vertex = vertex; + this.jobTokenSecretManager = jobTokenSecretManager; + this.nodeId = nodeId; + this.shufflePort = shufflePort; + this.vertexId = vertexId; + this.httpConnectionParams = httpConnectionParams; + } + + @Override + public void run() { + BaseHttpConnection httpConnection = null; + try { + URL baseURL = TezRuntimeUtils.constructBaseURIForShuffleHandlerVertexComplete( + nodeId.getHost(), shufflePort, + vertex.getDAGID().getApplicationId().toString(), vertex.getDAGID().getId(), vertexId, false); + httpConnection = TezRuntimeUtils.getHttpConnection(true, baseURL, httpConnectionParams, + "VertexDelete", jobTokenSecretManager); + httpConnection.connect(); + httpConnection.getInputStream(); + } catch (Exception e) { + LOG.warn("Could not setup HTTP Connection to the node %s " + nodeId.getHost() + + " for vertex shuffle delete. ", e); + } finally { + try { + if (httpConnection != null) { + httpConnection.cleanup(true); + } + } catch (IOException e) { + LOG.warn("Encountered IOException for " + nodeId.getHost() + " during close. ", e); + } + } + } + + @Override + public String toString() { + return "VertexDeleteRunnable nodeId=" + nodeId + ", shufflePort=" + shufflePort + ", vertexId=" + vertexId; + } +} diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 5cdcf49206..c118110948 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -20,6 +20,15 @@ import java.nio.ByteBuffer; +import org.apache.tez.common.TezUtils; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.app.DAGAppMaster; +import org.apache.tez.dag.app.dag.TaskAttempt; +import org.apache.tez.dag.app.launcher.ContainerLauncherManager; +import org.apache.tez.dag.app.launcher.TezContainerLauncherImpl; +import org.apache.tez.dag.app.rm.container.AMContainer; +import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -2395,12 +2404,254 @@ private DAGPlan createDAGPlanForGraceParallelism() throws IOException { .build(); } - private void setupVertices() { + /** + * The dag is of the following structure. + * vertex1 vertex2 + * \ / + * vertex 3 + * / \ + * vertex4 vertex5 + * \ / + * vertex6 + * @return dagPlan + */ + + public DAGPlan createDAGPlanVertexShuffleDelete() { + LOG.info("Setting up dag plan"); + DAGPlan dag = DAGPlan.newBuilder() + .setName("testverteximpl") + .setDagConf(DAGProtos.ConfigurationProto.newBuilder() + .addConfKeyValues(DAGProtos.PlanKeyValuePair.newBuilder() + .setKey(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS) + .setValue(3 + ""))) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex1") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host1") + .addRack("rack1") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x1.y1") + .build() + ) + .setVertexConf(DAGProtos.ConfigurationProto.newBuilder() + .addConfKeyValues(DAGProtos.PlanKeyValuePair.newBuilder() + .setKey(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS) + .setValue(2+""))) + .addOutEdgeId("e1") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex2") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host2") + .addRack("rack2") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x2.y2") + .build() + ) + .addOutEdgeId("e2") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex3") + .setType(PlanVertexType.NORMAL) + .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host3") + .addRack("rack3") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("foo") + .setTaskModule("x3.y3") + .build() + ) + .addInEdgeId("e1") + .addInEdgeId("e2") + .addOutEdgeId("e3") + .addOutEdgeId("e4") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex4") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host4") + .addRack("rack4") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x4.y4") + .build() + ) + .addInEdgeId("e3") + .addOutEdgeId("e5") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex5") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host5") + .addRack("rack5") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x5.y5") + .build() + ) + .addInEdgeId("e4") + .addOutEdgeId("e6") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex6") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host6") + .addRack("rack6") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x6.y6") + .build() + ) + .addInEdgeId("e5") + .addInEdgeId("e6") + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i3_v1")) + .setInputVertexName("vertex1") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1")) + .setOutputVertexName("vertex3") + .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER) + .setId("e1") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i3_v2")) + .setInputVertexName("vertex2") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2")) + .setOutputVertexName("vertex3") + .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER) + .setId("e2") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i4_v3")) + .setInputVertexName("vertex3") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o3_v4")) + .setOutputVertexName("vertex4") + .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER) + .setId("e3") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i5_v3")) + .setInputVertexName("vertex3") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o3_v5")) + .setOutputVertexName("vertex5") + .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER) + .setId("e4") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i6_v4")) + .setInputVertexName("vertex4") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o4")) + .setOutputVertexName("vertex6") + .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER) + .setId("e5") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i6_v5")) + .setInputVertexName("vertex5") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o5")) + .setOutputVertexName("vertex6") + .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER) + .setId("e6") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) + .build(); + + return dag; + } + + private void setupVertices(boolean cleanupShuffleDataAtVertexLevel) { int vCnt = dagPlan.getVertexCount(); LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt); vertices = new HashMap(); vertexIdMap = new HashMap(); Configuration dagConf = new Configuration(false); + dagConf.setBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION, true); + conf.setInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT, cleanupShuffleDataAtVertexLevel ? 1 : 0); dagConf.set("abc", "foobar"); for (int i = 0; i < vCnt; ++i) { VertexPlan vPlan = dagPlan.getVertex(i); @@ -2447,7 +2698,6 @@ private void parseVertexEdges() { Map outVertices = new HashMap(); - for(String inEdgeId : vertexPlan.getInEdgeIdList()){ EdgePlan edgePlan = edgePlans.get(inEdgeId); Vertex inVertex = this.vertices.get(edgePlan.getInputVertexName()); @@ -2472,8 +2722,14 @@ private void parseVertexEdges() { + ", outputVerticesCnt=" + outVertices.size()); vertex.setOutputVertices(outVertices); } + + for (Map.Entry vertex : vertices.entrySet()) { + VertexImpl vertexImpl = vertex.getValue(); + vertexImpl.initShuffleDeletionContext(2); + } } + public void setupPreDagCreation() { LOG.info("____________ RESETTING CURRENT DAG ____________"); conf = new Configuration(); @@ -2488,8 +2744,9 @@ public void setupPreDagCreation() { } @SuppressWarnings({ "unchecked", "rawtypes" }) - public void setupPostDagCreation() throws TezException { + public void setupPostDagCreation(boolean cleanupShuffleDataAtVertexLevel) throws TezException { String dagName = "dag0"; + taskCommunicatorManagerInterface = mock(TaskCommunicatorManagerInterface.class); // dispatcher may be created multiple times (setupPostDagCreation may be called multiples) if (dispatcher != null) { dispatcher.stop(); @@ -2499,6 +2756,40 @@ public void setupPostDagCreation() throws TezException { when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); when(appContext.getContainerLauncherName(anyInt())).thenReturn( TezConstants.getTezYarnServicePluginName()); + DAGAppMaster mockDagAppMaster = mock(DAGAppMaster.class); + when(appContext.getAppMaster()).thenReturn(mockDagAppMaster); + doCallRealMethod().when(mockDagAppMaster).vertexComplete(any(TezVertexID.class), any(Set.class)); + List containerDescriptors = new ArrayList<>(); + ContainerLauncherDescriptor containerLaunchers = + ContainerLauncherDescriptor.create("ContainerLaunchers", + TezContainerLauncherImpl.class.getName()); + conf.setBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION, true); + conf.set(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, "tez_shuffle"); + conf.setInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT, 0); + try { + containerLaunchers.setUserPayload(UserPayload.create( + TezUtils.createByteStringFromConf(conf).asReadOnlyByteBuffer())); + } catch (IOException e) { + e.printStackTrace(); + } + containerDescriptors.add(containerLaunchers); + ContainerLauncherManager mockContainerLauncherManager = spy(new ContainerLauncherManager(appContext, + taskCommunicatorManagerInterface, "test", containerDescriptors, false)); + doCallRealMethod().when(mockContainerLauncherManager).vertexComplete(any( + TezVertexID.class), any(JobTokenSecretManager.class + ), any(Set.class)); + when(appContext.getAppMaster().getContainerLauncherManager()).thenReturn( + mockContainerLauncherManager); + mockContainerLauncherManager.init(conf); + mockContainerLauncherManager.start(); + AMContainerMap amContainerMap = mock(AMContainerMap.class); + AMContainer amContainer = mock(AMContainer.class); + Container mockContainer = mock(Container.class); + when(amContainer.getContainer()).thenReturn(mockContainer); + when(mockContainer.getNodeId()).thenReturn(mock(NodeId.class)); + when(mockContainer.getNodeHttpAddress()).thenReturn("localhost:12345"); + when(amContainerMap.get(any(ContainerId.class))).thenReturn(amContainer); + when(appContext.getAllContainers()).thenReturn(amContainerMap); thh = mock(TaskHeartbeatHandler.class); historyEventHandler = mock(HistoryEventHandler.class); @@ -2557,7 +2848,7 @@ public ListenableFuture answer(InvocationOnMock invocation) { updateTracker.stop(); } updateTracker = new StateChangeNotifierForTest(appContext.getCurrentDAG()); - setupVertices(); + setupVertices(cleanupShuffleDataAtVertexLevel); when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer() { @Override public Vertex answer(InvocationOnMock invocation) throws Throwable { @@ -2622,7 +2913,7 @@ public void setup() throws TezException { setupPreDagCreation(); dagPlan = createTestDAGPlan(); invalidDagPlan = createInvalidDAGPlan(); - setupPostDagCreation(); + setupPostDagCreation(false); } @After @@ -2750,7 +3041,7 @@ public void testVertexInit() throws AMUserCodeException { public void testNonExistVertexManager() throws TezException { setupPreDagCreation(); dagPlan = createDAGPlanWithNonExistVertexManager(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl v1 = vertices.get("vertex1"); v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT)); Assert.assertEquals(VertexState.FAILED, v1.getState()); @@ -2763,7 +3054,7 @@ public void testNonExistVertexManager() throws TezException { public void testNonExistInputInitializer() throws TezException { setupPreDagCreation(); dagPlan = createDAGPlanWithNonExistInputInitializer(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl v1 = vertices.get("vertex1"); v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT)); Assert.assertEquals(VertexState.FAILED, v1.getState()); @@ -2776,7 +3067,7 @@ public void testNonExistInputInitializer() throws TezException { public void testNonExistOutputCommitter() throws TezException { setupPreDagCreation(); dagPlan = createDAGPlanWithNonExistOutputCommitter(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl v1 = vertices.get("vertex1"); v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT)); Assert.assertEquals(VertexState.FAILED, v1.getState()); @@ -2815,7 +3106,7 @@ public void testVertexConfigureEventWithReconfigure() throws Exception { setupPreDagCreation(); // initialize() will make VM call planned() and started() will make VM call done() dagPlan = createDAGPlanWithVMException("TestVMStateUpdate", VMExceptionLocation.NoExceptionDoReconfigure); - setupPostDagCreation(); + setupPostDagCreation(false); TestUpdateListener listener = new TestUpdateListener(); updateTracker @@ -3824,7 +4115,7 @@ public void testFailuresMaxPercentSourceTaskAttemptCompletionEvents() throws Tez conf.setFloat(TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT, 50.0f); conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1); dagPlan = createTestDAGPlan(); - setupPostDagCreation(); + setupPostDagCreation(false); initAllVertices(VertexState.INITED); VertexImpl v4 = vertices.get("vertex4"); @@ -3879,7 +4170,7 @@ public void testFailuresMaxPercentExceededSourceTaskAttemptCompletionEvents() th conf.setFloat(TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT, 50.0f); conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1); dagPlan = createTestDAGPlan(); - setupPostDagCreation(); + setupPostDagCreation(false); initAllVertices(VertexState.INITED); VertexImpl v4 = vertices.get("vertex4"); @@ -3978,7 +4269,7 @@ public void testTaskReschedule() { public void testTerminatingVertexForTaskComplete() throws Exception { setupPreDagCreation(); dagPlan = createSamplerDAGPlan(false); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl vertex = spy(vertices.get("A")); initVertex(vertex); startVertex(vertex); @@ -3996,7 +4287,7 @@ public void testTerminatingVertexForTaskComplete() throws Exception { public void testTerminatingVertexForVComplete() throws Exception { setupPreDagCreation(); dagPlan = createSamplerDAGPlan(false); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl vertex = spy(vertices.get("A")); initVertex(vertex); startVertex(vertex); @@ -4251,7 +4542,7 @@ public void testBadCommitter2() throws Exception { public void testVertexInitWithCustomVertexManager() throws Exception { setupPreDagCreation(); dagPlan = createDAGWithCustomVertexManager(); - setupPostDagCreation(); + setupPostDagCreation(false); int numTasks = 3; VertexImpl v1 = vertices.get("v1"); @@ -4305,7 +4596,7 @@ public void testVertexInitWithCustomVertexManager() throws Exception { public void testVertexManagerHeuristic() throws TezException { setupPreDagCreation(); dagPlan = createDAGPlanWithMixedEdges(); - setupPostDagCreation(); + setupPostDagCreation(false); initAllVertices(VertexState.INITED); Assert.assertEquals(ImmediateStartVertexManager.class, vertices.get("vertex1").getVertexManager().getPlugin().getClass()); @@ -4330,7 +4621,7 @@ public void testVertexWithOneToOneSplit() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanForOneToOneSplit("TestInputInitializer", -1, true); - setupPostDagCreation(); + setupPostDagCreation(false); int numTasks = 5; VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices @@ -4397,7 +4688,7 @@ public void testVertexWithOneToOneSplitWhileRunning() throws Exception { // create a diamond shaped dag with 1-1 edges. setupPreDagCreation(); dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl v1 = vertices.get("vertex1"); v1.vertexReconfigurationPlanned(); initAllVertices(VertexState.INITED); @@ -4436,7 +4727,7 @@ public void testVertexWithOneToOneSplitWhileInited() throws Exception { // create a diamond shaped dag with 1-1 edges. setupPreDagCreation(); dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl v1 = vertices.get("vertex1"); v1.vertexReconfigurationPlanned(); initAllVertices(VertexState.INITED); @@ -4478,7 +4769,7 @@ public void testVertexVMErrorReport() throws Exception { // create a diamond shaped dag with 1-1 edges. setupPreDagCreation(); dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl v1 = vertices.get("vertex1"); initAllVertices(VertexState.INITED); @@ -4522,7 +4813,7 @@ public void testVertexWithInitializerFailure() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer"); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices .get("vertex1"); @@ -4567,7 +4858,7 @@ public void testVertexWithInitializerParallelismSetTo0() throws InterruptedExcep setupPreDagCreation(); dagPlan = createDAGPlanWithInitializer0Tasks(RootInitializerSettingParallelismTo0.class.getName()); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl v1 = vertices.get("vertex1"); VertexImpl v2 = vertices.get("vertex2"); @@ -4615,7 +4906,7 @@ public void testInputInitializerVertexStateUpdates() throws Exception { initializer.setNumVertexStateUpdateEvents(3); setupPreDagCreation(); dagPlan = createDAGPlanWithRunningInitializer(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); @@ -4650,7 +4941,7 @@ public void testInputInitializerEventMultipleAttempts() throws Exception { (EventHandlingRootInputInitializer) customInitializer; setupPreDagCreation(); dagPlan = createDAGPlanWithRunningInitializer4(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); @@ -4738,7 +5029,7 @@ public void testInputInitializerEventsMultipleSources() throws Exception { initializer.setNumExpectedEvents(4); setupPreDagCreation(); dagPlan = createDAGPlanWithRunningInitializer4(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); @@ -4861,7 +5152,7 @@ public void testInputInitializerEventNoDirectConnection() throws Exception { (EventHandlingRootInputInitializer) customInitializer; setupPreDagCreation(); dagPlan = createDAGPlanWithRunningInitializer4(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); @@ -4941,7 +5232,7 @@ public void testInputInitializerEventsAtNew() throws Exception { (EventHandlingRootInputInitializer) customInitializer; setupPreDagCreation(); dagPlan = createDAGPlanWithRunningInitializer3(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); @@ -5027,7 +5318,7 @@ public void testInputInitializerEvents() throws Exception { (EventHandlingRootInputInitializer) customInitializer; setupPreDagCreation(); dagPlan = createDAGPlanWithRunningInitializer(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); @@ -5104,7 +5395,7 @@ public void testInputInitializerEvents() throws Exception { public void testTaskSchedulingWithCustomEdges() throws TezException { setupPreDagCreation(); dagPlan = createCustomDAGWithCustomEdges(); - setupPostDagCreation(); + setupPostDagCreation(false); /** * @@ -5402,7 +5693,7 @@ public void testVertexWithMultipleInitializers1() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer"); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices .get("vertex1"); @@ -5432,7 +5723,7 @@ public void testVertexWithMultipleInitializers2() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer"); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices .get("vertex1"); @@ -5462,7 +5753,7 @@ public void testVertexWithInitializerSuccess() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer"); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices .get("vertex1"); @@ -5563,7 +5854,7 @@ public void testVertexWithInitializerSuccessLegacyRouting() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer"); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices .get("vertex1"); @@ -5638,7 +5929,7 @@ public void testVertexWithInputDistributor() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithInputDistributor("TestInputInitializer"); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices .get("vertex1"); @@ -5673,7 +5964,7 @@ public void testVertexRootInputSpecUpdateAll() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer"); - setupPostDagCreation(); + setupPostDagCreation(false); int expectedNumTasks = RootInputSpecUpdaterVertexManager.NUM_TASKS; VertexImplWithControlledInitializerManager v3 = (VertexImplWithControlledInitializerManager) vertices @@ -5703,7 +5994,7 @@ public void testVertexRootInputSpecUpdatePerTask() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer"); - setupPostDagCreation(); + setupPostDagCreation(false); int expectedNumTasks = RootInputSpecUpdaterVertexManager.NUM_TASKS; VertexImplWithControlledInitializerManager v4 = (VertexImplWithControlledInitializerManager) vertices @@ -6015,7 +6306,7 @@ public void completeInputInitialization(int initializerIndex, int targetTasks, public void testVertexGroupInput() throws TezException { setupPreDagCreation(); dagPlan = createVertexGroupDAGPlan(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl vA = vertices.get("A"); VertexImpl vB = vertices.get("B"); @@ -6044,7 +6335,7 @@ public void testStartWithUninitializedCustomEdge() throws Exception { // been initialized setupPreDagCreation(); dagPlan = createSamplerDAGPlan(true); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl vA = vertices.get("A"); VertexImpl vB = vertices.get("B"); @@ -6093,7 +6384,7 @@ public void testVertexConfiguredDoneByVMBeforeEdgeDefined() throws Exception { // been initialized setupPreDagCreation(); dagPlan = createSamplerDAGPlan(true); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl vA = vertices.get("A"); VertexImpl vB = vertices.get("B"); @@ -6167,7 +6458,7 @@ public void testInitStartRace() throws TezException { // been initialized setupPreDagCreation(); dagPlan = createSamplerDAGPlan(false); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl vA = vertices.get("A"); VertexImpl vB = vertices.get("B"); @@ -6190,7 +6481,7 @@ public void testInitStartRace2() throws TezException { // been initialized setupPreDagCreation(); dagPlan = createSamplerDAGPlan2(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl vA = vertices.get("A"); VertexImpl vB = vertices.get("B"); @@ -6215,7 +6506,7 @@ public void testInitStartRace2() throws TezException { public void testTez2684() throws IOException, TezException { setupPreDagCreation(); dagPlan = createSamplerDAGPlan2(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl vA = vertices.get("A"); VertexImpl vB = vertices.get("B"); @@ -6255,7 +6546,7 @@ public void testTez2684() throws IOException, TezException { public void testVertexGraceParallelism() throws IOException, TezException { setupPreDagCreation(); dagPlan = createDAGPlanForGraceParallelism(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl vA = vertices.get("A"); VertexImpl vB = vertices.get("B"); @@ -6323,7 +6614,7 @@ public void testVMEventBeforeVertexInitialized() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithCountingVM(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImpl v1 = vertices.get("vertex1"); VertexImpl v2 = vertices.get("vertex2"); @@ -6380,7 +6671,7 @@ public void testExceptionFromVM_Initialize() throws TezException { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.Initialize); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices .get("vertex1"); @@ -6399,7 +6690,7 @@ public void testExceptionFromVM_OnRootVertexInitialized() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnRootVertexInitialized); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices .get("vertex1"); @@ -6423,7 +6714,7 @@ public void testExceptionFromVM_OnVertexStarted() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnVertexStarted); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices .get("vertex1"); @@ -6450,7 +6741,7 @@ public void testExceptionFromVM_OnSourceTaskCompleted() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnSourceTaskCompleted); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices .get("vertex1"); @@ -6486,7 +6777,7 @@ public void testExceptionFromVM_OnVertexManagerEventReceived() throws Exception useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnVertexManagerEventReceived); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices .get("vertex1"); @@ -6514,7 +6805,7 @@ public void testExceptionFromVM_OnVertexManagerVertexStateUpdated() throws Excep useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithVMException("TestVMStateUpdate", VMExceptionLocation.OnVertexManagerVertexStateUpdated); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices .get("vertex1"); @@ -6543,7 +6834,7 @@ public void testExceptionFromII_Initialize() throws InterruptedException, TezExc (EventHandlingRootInputInitializer) customInitializer; setupPreDagCreation(); dagPlan = createDAGPlanWithIIException(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); @@ -6564,7 +6855,7 @@ public void testExceptionFromII_InitFailedAfterInitialized() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithIIException(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)vertices.get("vertex1"); @@ -6588,7 +6879,7 @@ public void testExceptionFromII_InitFailedAfterRunning() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithIIException(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)vertices.get("vertex1"); @@ -6616,7 +6907,7 @@ public void testExceptionFromII_HandleInputInitializerEvent() throws Exception { (EventHandlingRootInputInitializer) customInitializer; setupPreDagCreation(); dagPlan = createDAGPlanWithRunningInitializer(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); @@ -6666,7 +6957,7 @@ public void testExceptionFromII_OnVertexStateUpdated() throws InterruptedExcepti (EventHandlingRootInputInitializer) customInitializer; setupPreDagCreation(); dagPlan = createDAGPlanWithRunningInitializer(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); @@ -6695,7 +6986,7 @@ public void testExceptionFromII_InitSucceededAfterInitFailure() throws Interrupt (EventHandlingRootInputInitializer) customInitializer; setupPreDagCreation(); dagPlan = createDAGPlanWithRunningInitializer(); - setupPostDagCreation(); + setupPostDagCreation(false); VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); @@ -7212,4 +7503,132 @@ public void testPickupDagLocalResourceOnScheduleTask() { Assert.assertTrue(localResourceMap.containsKey("dag lr")); Assert.assertTrue(localResourceMap.containsKey("vertex lr")); } + + @Test + public void testVertexShuffleDelete() throws Exception { + setupPreDagCreation(); + dagPlan = createDAGPlanVertexShuffleDelete(); + setupPostDagCreation(true); + checkSpannedVertices(); + runVertices(); + Mockito.verify(appContext.getAppMaster().getContainerLauncherManager(), + times(3)).vertexComplete(any(TezVertexID.class), + any(JobTokenSecretManager.class), any(Set.class)); + } + + private void checkSpannedVertices() { + // vertex1 should have 0 ancestor and 2 children at height = 2 + VertexImpl v1 = vertices.get("vertex1"); + checkResults(v1.vShuffleDeletionContext.getAncestors(), new ArrayList<>()); + checkResults(v1.vShuffleDeletionContext.getChildren(), Arrays.asList("vertex5", "vertex4")); + + // vertex2 should have 0 ancestor and 2 children at height = 2 + VertexImpl v2 = vertices.get("vertex2"); + checkResults(v2.vShuffleDeletionContext.getAncestors(), new ArrayList<>()); + checkResults(v2.vShuffleDeletionContext.getChildren(), Arrays.asList("vertex5", "vertex4")); + + // vertex3 should have 0 ancestor and 1 children at height = 2 + VertexImpl v3 = vertices.get("vertex3"); + checkResults(v3.vShuffleDeletionContext.getAncestors(), new ArrayList<>()); + checkResults(v3.vShuffleDeletionContext.getChildren(), Arrays.asList("vertex6")); + + // vertex4 should have 2 ancestor and 0 children at height = 2 + VertexImpl v4 = vertices.get("vertex4"); + checkResults(v4.vShuffleDeletionContext.getAncestors(), Arrays.asList("vertex1", "vertex2")); + checkResults(v4.vShuffleDeletionContext.getChildren(), new ArrayList<>()); + + // vertex5 should have 2 ancestor and 0 children at height = 2 + VertexImpl v5 = vertices.get("vertex5"); + checkResults(v5.vShuffleDeletionContext.getAncestors(), Arrays.asList("vertex1", "vertex2")); + checkResults(v5.vShuffleDeletionContext.getChildren(), new ArrayList<>()); + + // vertex6 should have 1 ancestor and 0 children at height = 2 + VertexImpl v6 = vertices.get("vertex6"); + checkResults(v6.vShuffleDeletionContext.getAncestors(), Arrays.asList("vertex3")); + checkResults(v6.vShuffleDeletionContext.getChildren(), new ArrayList<>()); + } + + private void checkResults(Set actual, List expected) { + assertEquals(actual.size(), expected.size()); + for (Vertex vertex : actual) { + assertTrue(expected.contains(vertex.getName())); + } + } + + private void runVertices() { + VertexImpl v1 = vertices.get("vertex1"); + VertexImpl v2 = vertices.get("vertex2"); + VertexImpl v3 = vertices.get("vertex3"); + VertexImpl v4 = vertices.get("vertex4"); + VertexImpl v5 = vertices.get("vertex5"); + VertexImpl v6 = vertices.get("vertex6"); + dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT)); + dispatcher.getEventHandler().handle(new VertexEvent(v2.getVertexId(), VertexEventType.V_INIT)); + dispatcher.await(); + dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_START)); + dispatcher.getEventHandler().handle(new VertexEvent(v2.getVertexId(), VertexEventType.V_START)); + dispatcher.await(); + + TezTaskID v1t1 = TezTaskID.getInstance(v1.getVertexId(), 0); + Map attempts = v1.getTask(v1t1).getAttempts(); + startAttempts(attempts); + v1.handle(new VertexEventTaskCompleted(v1t1, TaskState.SUCCEEDED)); + TezTaskID v2t1 = TezTaskID.getInstance(v2.getVertexId(), 0); + attempts = v2.getTask(v2t1).getAttempts(); + startAttempts(attempts); + v2.handle(new VertexEventTaskCompleted(v2t1, TaskState.SUCCEEDED)); + TezTaskID v2t2 = TezTaskID.getInstance(v2.getVertexId(), 1); + attempts = v2.getTask(v2t2).getAttempts(); + startAttempts(attempts); + v2.handle(new VertexEventTaskCompleted(v2t2, TaskState.SUCCEEDED)); + TezTaskID v3t1 = TezTaskID.getInstance(v3.getVertexId(), 0); + v3.scheduleTasks(Lists.newArrayList(ScheduleTaskRequest.create(0, null))); + dispatcher.await(); + attempts = v3.getTask(v3t1).getAttempts(); + startAttempts(attempts); + v3.handle(new VertexEventTaskCompleted(v3t1, TaskState.SUCCEEDED)); + TezTaskID v3t2 = TezTaskID.getInstance(v3.getVertexId(), 1); + attempts = v3.getTask(v3t2).getAttempts(); + startAttempts(attempts); + v3.handle(new VertexEventTaskCompleted(v3t2, TaskState.SUCCEEDED)); + dispatcher.await(); + TezTaskID v4t1 = TezTaskID.getInstance(v4.getVertexId(), 0); + attempts = v4.getTask(v4t1).getAttempts(); + startAttempts(attempts); + v4.handle(new VertexEventTaskCompleted(v4t1, TaskState.SUCCEEDED)); + TezTaskID v4t2 = TezTaskID.getInstance(v4.getVertexId(), 1); + attempts = v4.getTask(v4t2).getAttempts(); + startAttempts(attempts); + v4.handle(new VertexEventTaskCompleted(v4t2, TaskState.SUCCEEDED)); + TezTaskID v5t1 = TezTaskID.getInstance(v5.getVertexId(), 0); + attempts = v5.getTask(v5t1).getAttempts(); + startAttempts(attempts); + v5.handle(new VertexEventTaskCompleted(v5t1, TaskState.SUCCEEDED)); + TezTaskID v5t2 = TezTaskID.getInstance(v5.getVertexId(), 1); + attempts = v5.getTask(v5t2).getAttempts(); + startAttempts(attempts); + v5.handle(new VertexEventTaskCompleted(v5t2, TaskState.SUCCEEDED)); + TezTaskID v6t1 = TezTaskID.getInstance(v6.getVertexId(), 0); + attempts = v6.getTask(v6t1).getAttempts(); + startAttempts(attempts); + v6.handle(new VertexEventTaskCompleted(v6t1, TaskState.SUCCEEDED)); + TezTaskID v6t2 = TezTaskID.getInstance(v6.getVertexId(), 1); + attempts = v6.getTask(v6t2).getAttempts(); + startAttempts(attempts); + v6.handle(new VertexEventTaskCompleted(v6t2, TaskState.SUCCEEDED)); + dispatcher.await(); + } + + private void startAttempts(Map attempts) { + for (Map.Entry entry : attempts.entrySet()) { + TezTaskAttemptID id = entry.getKey(); + TaskAttemptImpl taskAttempt = (TaskAttemptImpl)entry.getValue(); + taskAttempt.handle(new TaskAttemptEventSchedule(id, 10, 10)); + dispatcher.await(); + ContainerId mockContainer = mock(ContainerId.class, RETURNS_DEEP_STUBS); + taskAttempt.handle(new TaskAttemptEventSubmitted(id, mockContainer)); + taskAttempt.handle(new TaskAttemptEventStartedRemotely(id)); + dispatcher.await(); + } + } } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java index c4f4eff0cf..cb7d62dca0 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java @@ -24,7 +24,7 @@ public class TestContainerLauncherWrapper { @Test(timeout = 5000) public void testDelegation() throws Exception { PluginWrapperTestHelpers.testDelegation(ContainerLauncherWrapper.class, ContainerLauncher.class, - Sets.newHashSet("getContainerLauncher", "dagComplete", "taskAttemptFailed")); + Sets.newHashSet("getContainerLauncher", "dagComplete", "vertexComplete", "taskAttemptFailed")); } } diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index 7e6fd750f8..0fa1c03a96 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -18,6 +18,7 @@ package org.apache.tez.auxservices; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.util.DiskChecker; import static org.fusesource.leveldbjni.JniDBFactory.asString; import static org.fusesource.leveldbjni.JniDBFactory.bytes; @@ -1009,6 +1010,7 @@ private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request) final Map> q = new QueryStringDecoder(request.getUri()).parameters(); final List keepAliveList = q.get("keepAlive"); final List dagCompletedQ = q.get("dagAction"); + final List vertexCompletedQ = q.get("vertexAction"); final List taskAttemptFailedQ = q.get("taskAttemptAction"); boolean keepAliveParam = false; if (keepAliveList != null && keepAliveList.size() == 1) { @@ -1019,6 +1021,7 @@ private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request) final Range reduceRange = splitReduces(q.get("reduce")); final List jobQ = q.get("job"); final List dagIdQ = q.get("dag"); + final List vertexIdQ = q.get("vertex"); if (LOG.isDebugEnabled()) { LOG.debug("RECV: " + request.getUri() + "\n mapId: " + mapIds + @@ -1031,6 +1034,9 @@ private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request) if (deleteDagDirectories(ctx.channel(), dagCompletedQ, jobQ, dagIdQ)) { return; } + if (deleteVertexDirectories(ctx.channel(), vertexCompletedQ, jobQ, dagIdQ, vertexIdQ)) { + return; + } if (deleteTaskAttemptDirectories(ctx.channel(), taskAttemptFailedQ, jobQ, dagIdQ, mapIds)) { return; } @@ -1155,6 +1161,25 @@ private boolean deleteDagDirectories(Channel channel, return false; } + private boolean deleteVertexDirectories(Channel channel, List vertexCompletedQ, + List jobQ, List dagIdQ, + List vertexIdQ) { + if (jobQ == null || jobQ.isEmpty()) { + return false; + } + if (notEmptyAndContains(vertexCompletedQ, "delete") && !isNullOrEmpty(vertexIdQ)) { + try { + deleteTaskDirsOfVertex(jobQ.get(0), dagIdQ.get(0), vertexIdQ.get(0), userRsrc.get(jobQ.get(0))); + } catch (IOException e) { + LOG.warn("Encountered exception during vertex delete " + e); + } + channel.writeAndFlush(new DefaultHttpResponse(HTTP_1_1, OK)) + .addListener(ChannelFutureListener.CLOSE); + return true; + } + return false; + } + private boolean deleteTaskAttemptDirectories(Channel channel, List taskAttemptFailedQ, List jobQ, List dagIdQ, List taskAttemptIdQ) { if (jobQ == null || jobQ.isEmpty()) { @@ -1256,6 +1281,29 @@ private String getBaseLocation(String jobId, String dagId, String user) { return baseStr; } + /** + * Delete shuffle data in task directories belonging to a vertex. + */ + private void deleteTaskDirsOfVertex(String jobId, String dagId, String vertexId, String user) throws IOException { + String baseStr = getBaseLocation(jobId, dagId, user); + FileContext lfc = FileContext.getLocalFSFileContext(); + for(Path dagPath : lDirAlloc.getAllLocalPathsToRead(baseStr, conf)) { + RemoteIterator status = lfc.listStatus(dagPath); + final JobID jobID = JobID.forName(jobId); + String taskDirPrefix = String.format("attempt%s_%s_%s_", + jobID.toString().replace("job", ""), dagId, vertexId); + while (status.hasNext()) { + FileStatus fileStatus = status.next(); + Path attemptPath = fileStatus.getPath(); + if (attemptPath.getName().startsWith(taskDirPrefix)) { + if(lfc.delete(attemptPath, true)) { + LOG.debug("deleted shuffle data in task directory: {}", attemptPath); + } + } + } + } + } + private String getDagLocation(String jobId, String dagId, String user) { final JobID jobID = JobID.forName(jobId); final ApplicationId appID = diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java index 45dd0adb06..b91e0eb203 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import static io.netty.buffer.Unpooled.wrappedBuffer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -1312,6 +1313,83 @@ protected void sendError(ChannelHandlerContext ctx, String message, } } + @Test + public void testVertexShuffleDelete() throws Exception { + final ArrayList failures = new ArrayList(1); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "simple"); + UserGroupInformation.setConfiguration(conf); + File absLogDir = new File("target", TestShuffleHandler.class. + getSimpleName() + "LocDir").getAbsoluteFile(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + ApplicationId appId = ApplicationId.newInstance(12345L, 1); + String appAttemptId = "attempt_12345_0001_1_00_000000_0_10003_0"; + String user = "randomUser"; + List fileMap = new ArrayList(); + String vertexDirStr = StringUtils.join(Path.SEPARATOR, new String[] { absLogDir.getAbsolutePath(), + ShuffleHandler.USERCACHE, user, ShuffleHandler.APPCACHE, appId.toString(), "dag_1/output/" + appAttemptId}); + File vertexDir = new File(vertexDirStr); + Assert.assertFalse("vertex directory should not be present", vertexDir.exists()); + createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + conf, fileMap); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { + if (failures.size() == 0) { + failures.add(new Error(message)); + ctx.channel().close(); + } + } + }; + } + }; + shuffleHandler.init(conf); + try { + shuffleHandler.start(); + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token jt = + new Token("identifier".getBytes(), + "password".getBytes(), new Text(user), new Text("shuffleService")); + jt.write(outputBuffer); + shuffleHandler + .initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); + URL url = + new URL( + "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?vertexAction=delete&job=job_12345_0001&dag=1&vertex=00"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + Assert.assertTrue("Attempt Directory does not exist!", vertexDir.exists()); + conn.connect(); + try { + DataInputStream is = new DataInputStream(conn.getInputStream()); + is.close(); + Assert.assertFalse("Vertex Directory was not deleted", vertexDir.exists()); + } catch (EOFException e) { + fail("Encountered Exception!" + e.getMessage()); + } + } finally { + shuffleHandler.stop(); + FileUtil.fullyDelete(absLogDir); + } + } + @Test(timeout = 5000) public void testFailedTaskAttemptDelete() throws Exception { final ArrayList failures = new ArrayList(1); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java index 48b23bc694..a75925c5fe 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java @@ -187,6 +187,25 @@ public static URL constructBaseURIForShuffleHandlerDagComplete( return new URL(sb.toString()); } + public static URL constructBaseURIForShuffleHandlerVertexComplete( + String host, int port, String appId, int dagIdentifier, String vertexIndentifier, boolean sslShuffle) + throws MalformedURLException { + String httpProtocol = (sslShuffle) ? "https://" : "http://"; + StringBuilder sb = new StringBuilder(httpProtocol); + sb.append(host); + sb.append(":"); + sb.append(port); + sb.append("/"); + sb.append("mapOutput?vertexAction=delete"); + sb.append("&job="); + sb.append(appId.replace("application", "job")); + sb.append("&dag="); + sb.append(String.valueOf(dagIdentifier)); + sb.append("&vertex="); + sb.append(String.valueOf(vertexIndentifier)); + return new URL(sb.toString()); + } + public static URL constructBaseURIForShuffleHandlerTaskAttemptFailed( String host, int port, String appId, int dagIdentifier, String taskAttemptIdentifier, boolean sslShuffle) throws MalformedURLException {