Skip to content

Commit

Permalink
TEZ-3363: Delete intermediate shuffle data at the vertex level
Browse files Browse the repository at this point in the history
  • Loading branch information
shameersss1 committed Mar 16, 2022
1 parent 132ea4c commit c64875a
Show file tree
Hide file tree
Showing 20 changed files with 1,020 additions and 66 deletions.
20 changes: 18 additions & 2 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,22 @@ public TezConfiguration(boolean loadDefaults) {
+ "dag.cleanup.on.completion";
public static final boolean TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT = false;

/**
* Integer value. Instructs AM to delete vertex shuffle data if a vertex and all its
* child vertices at a certain depth are completed. Value less than or equal to 0 indicates the feature
* is disabled.
* Let's say we have a dag Map1 - Reduce2 - Reduce3 - Reduce4.
* case:1 height = 1
* when Reduce 2 completes all the shuffle data of Map1 will be deleted and so on for other vertex.
* case: 2 height = 2
* when Reduce 3 completes all the shuffle data of Map1 will be deleted and so on for other vertex.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="integer")
public static final String TEZ_AM_VERTEX_CLEANUP_HEIGHT = TEZ_AM_PREFIX
+ "vertex.cleanup.height";
public static final int TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT = 0;

/**
* Boolean value. Instructs AM to delete intermediate attempt data for failed task attempts.
*/
Expand All @@ -893,8 +909,8 @@ public TezConfiguration(boolean loadDefaults) {
public static final boolean TEZ_AM_TASK_ATTEMPT_CLEANUP_ON_FAILURE_DEFAULT = false;

/**
* Int value. Upper limit on the number of threads used to delete DAG directories and failed task attempts
* directories on nodes.
* Int value. Upper limit on the number of threads used to delete DAG directories,
* Vertex directories and failed task attempts directories on nodes.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="integer")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;

import java.util.Set;

/**
* Plugin to allow custom container launchers to be written to launch containers that want to
* support cleanup of DAG level directories upon DAG completion in session mode. The directories are created by
Expand All @@ -43,6 +46,9 @@ public DagContainerLauncher(ContainerLauncherContext containerLauncherContext) {

public abstract void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager);

public abstract void vertexComplete(TezVertexID vertex, JobTokenSecretManager jobTokenSecretManager,
Set<NodeId> nodeIdList);

public abstract void taskAttemptFailed(TezTaskAttemptID taskAttemptID,
JobTokenSecretManager jobTokenSecretManager, NodeId nodeId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2739,6 +2739,10 @@ String buildPluginComponentLog(List<NamedEntityDescriptor> namedEntityDescriptor
return sb.toString();
}

public void vertexComplete(TezVertexID completedVertexID, Set<NodeId> nodesList) {
getContainerLauncherManager().vertexComplete(completedVertexID, jobTokenSecretManager, nodesList);
}

public void taskAttemptFailed(TezTaskAttemptID attemptID, NodeId nodeId) {
getContainerLauncherManager().taskAttemptFailed(attemptID, jobTokenSecretManager, nodeId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum VertexEventType {
V_START,
V_SOURCE_TASK_ATTEMPT_COMPLETED,
V_SOURCE_VERTEX_STARTED,
V_DELETE_SHUFFLE_DATA,

//Producer:Task
V_TASK_COMPLETED,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.app.dag.event;

import org.apache.tez.dag.app.dag.Vertex;


public class VertexShuffleDataDeletion extends VertexEvent {
// child vertex
private Vertex sourceVertex;
// parent vertex
private Vertex targetVertex;

public VertexShuffleDataDeletion(Vertex sourceVertex, Vertex targetVertex) {
super(targetVertex.getVertexId(), VertexEventType.V_DELETE_SHUFFLE_DATA);
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
}

public Vertex getSourceVertex() {
return sourceVertex;
}

public Vertex getTargetVertex() {
return targetVertex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.tez.common.counters.LimitExceededException;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1772,6 +1773,13 @@ private static void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlan

vertex.setInputVertices(inVertices);
vertex.setOutputVertices(outVertices);
boolean cleanupShuffleDataAtVertexLevel = dag.dagConf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT,
TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT) > 0 && ShuffleUtils.isTezShuffleHandler(dag.dagConf);
if (cleanupShuffleDataAtVertexLevel) {
int deletionHeight = dag.dagConf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT,
TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT);
((VertexImpl) vertex).initShuffleDeletionContext(deletionHeight);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@
import org.apache.tez.dag.app.dag.RootInputInitializerManager;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
Expand All @@ -130,6 +132,7 @@
import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexShuffleDataDeletion;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventCommitCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
Expand Down Expand Up @@ -187,6 +190,7 @@
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;
import org.apache.tez.util.StringInterner;
Expand Down Expand Up @@ -556,7 +560,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_RESCHEDULED))
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_DELETE_SHUFFLE_DATA))

// Transitions from SUCCEEDED state
.addTransition(
Expand Down Expand Up @@ -592,6 +597,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
.addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
new TaskAttemptCompletedEventTransition())
.addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
VertexEventType.V_DELETE_SHUFFLE_DATA,
new VertexShuffleDeleteTransition())


// Transitions from FAILED state
Expand All @@ -613,7 +621,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
VertexEventType.V_ROOT_INPUT_INITIALIZED,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_NULL_EDGE_INITIALIZED,
VertexEventType.V_INPUT_DATA_INFORMATION))
VertexEventType.V_INPUT_DATA_INFORMATION,
VertexEventType.V_DELETE_SHUFFLE_DATA))

// Transitions from KILLED state
.addTransition(
Expand All @@ -635,7 +644,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
VertexEventType.V_NULL_EDGE_INITIALIZED,
VertexEventType.V_INPUT_DATA_INFORMATION))
VertexEventType.V_INPUT_DATA_INFORMATION,
VertexEventType.V_DELETE_SHUFFLE_DATA))

// No transitions from INTERNAL_ERROR state. Ignore all.
.addTransition(
Expand All @@ -655,7 +665,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
VertexEventType.V_INTERNAL_ERROR,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
VertexEventType.V_NULL_EDGE_INITIALIZED,
VertexEventType.V_INPUT_DATA_INFORMATION))
VertexEventType.V_INPUT_DATA_INFORMATION,
VertexEventType.V_DELETE_SHUFFLE_DATA))
// create the topology tables
.installTopology();

Expand Down Expand Up @@ -729,6 +740,9 @@ private void augmentStateMachine() {
@VisibleForTesting
Map<Vertex, Edge> sourceVertices;
private Map<Vertex, Edge> targetVertices;
private boolean cleanupShuffleDataAtVertexLevel;
@VisibleForTesting
VertexShuffleDataDeletionContext vShuffleDeletionContext;
Set<Edge> uninitializedEdges = Sets.newHashSet();
// using a linked hash map to conveniently map edge names to a contiguous index
LinkedHashMap<String, Integer> ioIndices = Maps.newLinkedHashMap();
Expand Down Expand Up @@ -1151,7 +1165,9 @@ public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
.append(", ContainerLauncher=").append(containerLauncherIdentifier).append(":").append(containerLauncherName)
.append(", TaskCommunicator=").append(taskCommunicatorIdentifier).append(":").append(taskCommName);
LOG.info(sb.toString());

cleanupShuffleDataAtVertexLevel = vertexConf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT,
TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT) > 0 &&
ShuffleUtils.isTezShuffleHandler(vertexConf);
stateMachine = new StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl>(
stateMachineFactory.make(this), this);
augmentStateMachine();
Expand Down Expand Up @@ -2306,6 +2322,12 @@ static VertexState checkTasksForCompletion(final VertexImpl vertex) {
if((vertexSucceeded || vertexFailuresBelowThreshold) && vertex.terminationCause == null) {
if(vertexSucceeded) {
LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier);
if (vertex.cleanupShuffleDataAtVertexLevel) {

for (Vertex v : vertex.vShuffleDeletionContext.getAncestors()) {
vertex.eventHandler.handle(new VertexShuffleDataDeletion(vertex, v));
}
}
} else {
LOG.info("All tasks in the vertex " + vertex.logIdentifier + " have completed and the percentage of failed tasks (failed/total) (" + vertex.failedTaskCount + "/" + vertex.numTasks + ") is less that the threshold of " + vertex.maxFailuresPercent);
vertex.addDiagnostic("Vertex succeeded as percentage of failed tasks (failed/total) (" + vertex.failedTaskCount + "/" + vertex.numTasks + ") is less that the threshold of " + vertex.maxFailuresPercent);
Expand Down Expand Up @@ -3758,6 +3780,36 @@ public VertexState transition(VertexImpl vertex, VertexEvent event) {
}
}

private static class VertexShuffleDeleteTransition implements
SingleArcTransition<VertexImpl, VertexEvent> {

@Override
public void transition(VertexImpl vertex, VertexEvent event) {
int incompleteChildrenVertices = vertex.vShuffleDeletionContext.getIncompleteChildrenVertices();
incompleteChildrenVertices = incompleteChildrenVertices - 1;
vertex.vShuffleDeletionContext.setIncompleteChildrenVertices(incompleteChildrenVertices);
// check if all the child vertices are completed
if (incompleteChildrenVertices == 0) {
LOG.info("Vertex shuffle data deletion for vertex name: " +
vertex.getName() + " with vertex id: " + vertex.getVertexId());
// Get nodes of all the task attempts in vertex
Set<NodeId> nodes = Sets.newHashSet();
Map<TezTaskID, Task> tasksMap = vertex.getTasks();
tasksMap.keySet().forEach(taskId -> {
Map<TezTaskAttemptID, TaskAttempt> taskAttemptMap = tasksMap.get(taskId).getAttempts();
taskAttemptMap.keySet().forEach(attemptId -> {
nodes.add(taskAttemptMap.get(attemptId).getNodeId());
});
});
vertex.appContext.getAppMaster().vertexComplete(
vertex.vertexId, nodes);
} else {
LOG.debug("The number of incomplete child vertex are {} for the vertex {}",
incompleteChildrenVertices, vertex.vertexId);
}
}
}

private static class TaskCompletedAfterVertexSuccessTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@Override
Expand Down Expand Up @@ -4930,4 +4982,14 @@ public boolean getTaskRescheduleRelaxedLocality() {
public Map<String, Set<String>> getDownstreamBlamingHosts(){
return downstreamBlamingHosts;
}

/**
* Initialize context from vertex shuffle deletion.
* @param deletionHeight
*/
public void initShuffleDeletionContext(int deletionHeight) {
VertexShuffleDataDeletionContext vShuffleDeletionContext = new VertexShuffleDataDeletionContext(deletionHeight);
vShuffleDeletionContext.setSpannedVertices(this);
this.vShuffleDeletionContext = vShuffleDeletionContext;
}
}
Loading

0 comments on commit c64875a

Please sign in to comment.