Skip to content

Commit

Permalink
TEZ-3363: Delete intermediate data at the vertex level for Shuffle Ha…
Browse files Browse the repository at this point in the history
…ndler
  • Loading branch information
shameersss1 committed Jan 29, 2022
1 parent 4e3eb95 commit a9baac9
Show file tree
Hide file tree
Showing 19 changed files with 942 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,26 @@ public TezConfiguration(boolean loadDefaults) {
public static final boolean TEZ_AM_TASK_ATTEMPT_CLEANUP_ON_FAILURE_DEFAULT = false;

/**
* Int value. Upper limit on the number of threads used to delete DAG directories and failed task attempts
* Boolean value. Instructs AM to delete vertex shuffle data if a vertex and all its
* child vertices at a certain depth are completed.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="boolean")
public static final String TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION = TEZ_AM_PREFIX
+ "vertex.cleanup.on.completion";
public static final boolean TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION_DEFAULT = false;

/**
* Int value. The height from the vertex that it can issue shuffle data deletion upon completion
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="integer")
public static final String TEZ_AM_VERTEX_CLEANUP_HEIGHT = TEZ_AM_PREFIX
+ "vertex.cleanup.height";
public static final int TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT = 1;

/**
* Int value. Upper limit on the number of threads used to delete DAG directories, Vertex directories and failed task attempts
* directories on nodes.
*/
@ConfigurationScope(Scope.AM)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;

import java.util.Set;

/**
* Plugin to allow custom container launchers to be written to launch containers that want to
* support cleanup of DAG level directories upon DAG completion in session mode. The directories are created by
Expand All @@ -43,6 +46,9 @@ public DagContainerLauncher(ContainerLauncherContext containerLauncherContext) {

public abstract void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager);

public abstract void vertexComplete(TezVertexID vertex, JobTokenSecretManager jobTokenSecretManager,
Set<NodeId> nodeIdList);

public abstract void taskAttemptFailed(TezTaskAttemptID taskAttemptID,
JobTokenSecretManager jobTokenSecretManager, NodeId nodeId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2740,6 +2740,10 @@ String buildPluginComponentLog(List<NamedEntityDescriptor> namedEntityDescriptor
return sb.toString();
}

public void vertexComplete(TezVertexID completedVertexID, Set<NodeId> nodesList) {
getContainerLauncherManager().vertexComplete(completedVertexID, jobTokenSecretManager, nodesList);
}

public void taskAttemptFailed(TezTaskAttemptID attemptID, NodeId nodeId) {
getContainerLauncherManager().taskAttemptFailed(attemptID, jobTokenSecretManager, nodeId);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.app.dag.event;

import org.apache.tez.dag.app.dag.Vertex;


public class VertexDeletionEvent extends VertexEvent {
// child vertex
private Vertex sourceVertex;
// parent vertex
private Vertex targetVertex;

public VertexDeletionEvent(Vertex sourceVertex, Vertex targetVertex) {
super(targetVertex.getVertexId(), VertexEventType.V_DELETE_SHUFFLE_DATA);
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
}

public Vertex getSourceVertex() {
return sourceVertex;
}

public Vertex getTargetVertex() {
return targetVertex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum VertexEventType {
V_START,
V_SOURCE_TASK_ATTEMPT_COMPLETED,
V_SOURCE_VERTEX_STARTED,
V_DELETE_SHUFFLE_DATA,

//Producer:Task
V_TASK_COMPLETED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.tez.common.counters.LimitExceededException;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;
import org.slf4j.Logger;
Expand Down Expand Up @@ -228,6 +229,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,

private TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;

private static boolean cleanupShuffleDataAtVertexLevel;

private static final DagStateChangedCallback STATE_CHANGED_CALLBACK = new DagStateChangedCallback();
private String[] logDirs;

Expand Down Expand Up @@ -573,6 +576,8 @@ public DAGImpl(TezDAGID dagId,
stateMachineFactory.make(this), this);
augmentStateMachine();
this.entityUpdateTracker = new StateChangeNotifier(this);
this.cleanupShuffleDataAtVertexLevel = dagConf.getBoolean(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION,
TezConfiguration.TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION_DEFAULT) && ShuffleUtils.isTezShuffleHandler(dagConf);
}

private void augmentStateMachine() {
Expand Down Expand Up @@ -1751,7 +1756,8 @@ private static void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlan

Map<Vertex, Edge> outVertices =
new HashMap<Vertex, Edge>();

List<Vertex> ancestors = new ArrayList<>();
List<Vertex> children = new ArrayList<>();
for(String inEdgeId : vertexPlan.getInEdgeIdList()){
EdgePlan edgePlan = edgePlans.get(inEdgeId);
Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
Expand All @@ -1772,6 +1778,50 @@ private static void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlan

vertex.setInputVertices(inVertices);
vertex.setOutputVertices(outVertices);
if (cleanupShuffleDataAtVertexLevel) {
int deletionHeight = dag.dagConf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT,
TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT);
getSpannedVerticesAncestors(vertex, ancestors, deletionHeight);
getSpannedVerticesChildren(vertex, children, deletionHeight);
((VertexImpl) vertex).setAncestors(ancestors);
((VertexImpl) vertex).setChildren(children);
}
}

/**
* get all the ancestor vertices at a particular depth
*/
private static void getSpannedVerticesAncestors(Vertex vertex, List<Vertex> ancestorVertices, int level) {
if (level == 0) {
ancestorVertices.add(vertex);
return;
}

if (level == 1) {
ancestorVertices.addAll(vertex.getInputVertices().keySet());
return;
}

vertex.getInputVertices().forEach((inVertex, edge) -> getSpannedVerticesAncestors(inVertex, ancestorVertices,
level - 1));
}

/**
* get all the child vertices at a particular depth
*/
private static void getSpannedVerticesChildren(Vertex vertex, List<Vertex> childVertices, int level) {
if (level == 0) {
childVertices.add(vertex);
return;
}

if (level == 1) {
childVertices.addAll(vertex.getOutputVertices().keySet());
return;
}

vertex.getOutputVertices().forEach((outVertex, edge) -> getSpannedVerticesChildren(outVertex, childVertices,
level - 1));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@
import org.apache.tez.dag.app.dag.RootInputInitializerManager;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
Expand All @@ -130,6 +132,7 @@
import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexDeletionEvent;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventCommitCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
Expand Down Expand Up @@ -187,6 +190,7 @@
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;
import org.apache.tez.util.StringInterner;
Expand Down Expand Up @@ -556,7 +560,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_RESCHEDULED))
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_DELETE_SHUFFLE_DATA))

// Transitions from SUCCEEDED state
.addTransition(
Expand Down Expand Up @@ -592,6 +597,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
.addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
new TaskAttemptCompletedEventTransition())
.addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
VertexEventType.V_DELETE_SHUFFLE_DATA,
new VertexDeleteTransition())


// Transitions from FAILED state
Expand All @@ -613,7 +621,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
VertexEventType.V_ROOT_INPUT_INITIALIZED,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_NULL_EDGE_INITIALIZED,
VertexEventType.V_INPUT_DATA_INFORMATION))
VertexEventType.V_INPUT_DATA_INFORMATION,
VertexEventType.V_DELETE_SHUFFLE_DATA))

// Transitions from KILLED state
.addTransition(
Expand All @@ -635,7 +644,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
VertexEventType.V_NULL_EDGE_INITIALIZED,
VertexEventType.V_INPUT_DATA_INFORMATION))
VertexEventType.V_INPUT_DATA_INFORMATION,
VertexEventType.V_DELETE_SHUFFLE_DATA))

// No transitions from INTERNAL_ERROR state. Ignore all.
.addTransition(
Expand All @@ -655,7 +665,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
VertexEventType.V_INTERNAL_ERROR,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
VertexEventType.V_NULL_EDGE_INITIALIZED,
VertexEventType.V_INPUT_DATA_INFORMATION))
VertexEventType.V_INPUT_DATA_INFORMATION,
VertexEventType.V_DELETE_SHUFFLE_DATA))
// create the topology tables
.installTopology();

Expand Down Expand Up @@ -729,6 +740,10 @@ private void augmentStateMachine() {
@VisibleForTesting
Map<Vertex, Edge> sourceVertices;
private Map<Vertex, Edge> targetVertices;
private List<Vertex> ancestors;
private int incompleteChildrenVertices = 0;
private Set<NodeId> nodes = Sets.newHashSet();
private boolean cleanupShuffleDataAtVertexLevel;
Set<Edge> uninitializedEdges = Sets.newHashSet();
// using a linked hash map to conveniently map edge names to a contiguous index
LinkedHashMap<String, Integer> ioIndices = Maps.newLinkedHashMap();
Expand Down Expand Up @@ -1151,7 +1166,9 @@ public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
.append(", ContainerLauncher=").append(containerLauncherIdentifier).append(":").append(containerLauncherName)
.append(", TaskCommunicator=").append(taskCommunicatorIdentifier).append(":").append(taskCommName);
LOG.info(sb.toString());

cleanupShuffleDataAtVertexLevel = vertexConf.getBoolean(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION,
TezConfiguration.TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION_DEFAULT) &&
ShuffleUtils.isTezShuffleHandler(vertexConf);
stateMachine = new StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl>(
stateMachineFactory.make(this), this);
augmentStateMachine();
Expand Down Expand Up @@ -2306,6 +2323,11 @@ static VertexState checkTasksForCompletion(final VertexImpl vertex) {
if((vertexSucceeded || vertexFailuresBelowThreshold) && vertex.terminationCause == null) {
if(vertexSucceeded) {
LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier);
if (vertex.cleanupShuffleDataAtVertexLevel) {
for (Vertex v : vertex.ancestors) {
vertex.eventHandler.handle(new VertexDeletionEvent(vertex, v));
}
}
} else {
LOG.info("All tasks in the vertex " + vertex.logIdentifier + " have completed and the percentage of failed tasks (failed/total) (" + vertex.failedTaskCount + "/" + vertex.numTasks + ") is less that the threshold of " + vertex.maxFailuresPercent);
vertex.addDiagnostic("Vertex succeeded as percentage of failed tasks (failed/total) (" + vertex.failedTaskCount + "/" + vertex.numTasks + ") is less that the threshold of " + vertex.maxFailuresPercent);
Expand Down Expand Up @@ -3758,6 +3780,31 @@ public VertexState transition(VertexImpl vertex, VertexEvent event) {
}
}

private static class VertexDeleteTransition implements
SingleArcTransition<VertexImpl, VertexEvent> {

@Override
public void transition(VertexImpl vertex, VertexEvent event) {
vertex.incompleteChildrenVertices--;
// check if all the child vertices are completed
if (vertex.incompleteChildrenVertices == 0) {
LOG.info("Vertex shuffle data deletion for vertex name: " +
vertex.getName() + " with vertex id: " + vertex.getVertexId());
// Get nodes of all the task attempts in vertex
Set<NodeId> nodes = Sets.newHashSet();
Map<TezTaskID, Task> tasksMap = vertex.getTasks();
tasksMap.keySet().forEach(taskId -> {
Map<TezTaskAttemptID, TaskAttempt> taskAttemptMap = tasksMap.get(taskId).getAttempts();
taskAttemptMap.keySet().forEach(attemptId -> {
nodes.add(taskAttemptMap.get(attemptId).getNodeId());
});
});
vertex.appContext.getAppMaster().vertexComplete(
vertex.vertexId, nodes);
}
}
}

private static class TaskCompletedAfterVertexSuccessTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@Override
Expand Down Expand Up @@ -4350,6 +4397,14 @@ public void setOutputVertices(Map<Vertex, Edge> outVertices) {
}
}

public void setAncestors(List<Vertex> ancestors) {
this.ancestors = ancestors;
}

public void setChildren(List<Vertex> children) {
this.incompleteChildrenVertices = children.size();
}

@Override
public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) {
LOG.info("Setting " + inputs.size() + " additional inputs for vertex" + this.logIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.net.UnknownHostException;
import java.util.List;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.common.Preconditions;
Expand All @@ -37,6 +38,8 @@
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
Expand Down Expand Up @@ -202,6 +205,12 @@ public void dagComplete(TezDAGID dag, JobTokenSecretManager secretManager) {
}
}

public void vertexComplete(TezVertexID vertex, JobTokenSecretManager secretManager, Set<NodeId> nodeIdList) {
for (int i = 0 ; i < containerLaunchers.length ; i++) {
containerLaunchers[i].vertexComplete(vertex, secretManager, nodeIdList);
}
}

public void taskAttemptFailed(TezTaskAttemptID taskAttemptID, JobTokenSecretManager secretManager, NodeId nodeId) {
for (int i = 0; i < containerLaunchers.length; i++) {
containerLaunchers[i].taskAttemptFailed(taskAttemptID, secretManager, nodeId);
Expand Down
Loading

0 comments on commit a9baac9

Please sign in to comment.