Skip to content

Commit

Permalink
TEZ-4227 Introduce convenient methods in TezID subclasses
Browse files Browse the repository at this point in the history
Change-Id: I6cabfa75e9b6b62e41ba8c2cc5e3d2d1a8a49102
  • Loading branch information
ghanko committed Nov 23, 2021
1 parent 211b59b commit afbc774
Show file tree
Hide file tree
Showing 83 changed files with 741 additions and 684 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.tez.dag.records;

import org.apache.hadoop.yarn.api.records.ApplicationId;

public interface DAGIDHolder {
TezDAGID getDAGId();

default ApplicationId getApplicationId() {
return getDAGId().getApplicationId();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.apache.tez.dag.records;

public interface TaskAttemptIDHolder extends TaskIDHolder {
TezTaskAttemptID getTaskAttemptID();

@Override
default TezTaskID getTaskID() {
return getTaskAttemptID().getTaskID();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.apache.tez.dag.records;

public interface TaskIDHolder extends VertexIDHolder {
TezTaskID getTaskID();

@Override
default TezVertexID getVertexID() {
return getTaskID().getVertexID();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TezTaskAttemptID extends TezID {
public class TezTaskAttemptID extends TezID implements TaskIDHolder {
public static final String ATTEMPT = "attempt";
private TezTaskID taskId;

Expand Down Expand Up @@ -73,6 +73,7 @@ private TezTaskAttemptID(TezTaskID taskId, int id) {
}

/** Returns the {@link TezTaskID} object that this task attempt belongs to */
@Override
public TezTaskID getTaskID() {
return taskId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TezTaskID extends TezID {
public class TezTaskID extends TezID implements VertexIDHolder {
public static final String TASK = "task";
private final int serializingHash;

Expand Down Expand Up @@ -79,6 +79,7 @@ public int getSerializingHash() {
}

/** Returns the {@link TezVertexID} object that this task belongs to */
@Override
public TezVertexID getVertexID() {
return vertexId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TezVertexID extends TezID {
public class TezVertexID extends TezID implements DAGIDHolder {
public static final String VERTEX = "vertex";
static final ThreadLocal<FastNumberFormat> tezVertexIdFormat = new ThreadLocal<FastNumberFormat>() {

Expand Down Expand Up @@ -79,6 +79,7 @@ private TezVertexID(TezDAGID dagId, int id) {
}

/** Returns the {@link TezDAGID} object that this tip belongs to */
@Override
public TezDAGID getDAGId() {
return dagId;
}
Expand Down Expand Up @@ -158,5 +159,4 @@ public static TezVertexID fromString(String vertexIdStr) {
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.apache.tez.dag.records;

public interface VertexIDHolder extends DAGIDHolder {
TezVertexID getVertexID();

@Override
default TezDAGID getDAGId() {
return getVertexID().getDAGId();
}
}
14 changes: 7 additions & 7 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -2187,12 +2187,12 @@ private class TaskEventDispatcher implements EventHandler<TaskEvent> {
public void handle(TaskEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getTaskID().getVertexID().getDAGId().getId();
event.getDAGId().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Task task =
dag.getVertex(event.getTaskID().getVertexID()).
dag.getVertex(event.getVertexID()).
getTask(event.getTaskID());
((EventHandler<TaskEvent>)task).handle(event);
}
Expand All @@ -2217,13 +2217,13 @@ private class TaskAttemptEventDispatcher
public void handle(TaskAttemptEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
event.getDAGId().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Task task =
dag.getVertex(event.getTaskAttemptID().getTaskID().getVertexID()).
getTask(event.getTaskAttemptID().getTaskID());
dag.getVertex(event.getVertexID()).
getTask(event.getTaskID());
TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
((EventHandler<TaskAttemptEvent>) attempt).handle(event);
}
Expand All @@ -2236,13 +2236,13 @@ private class VertexEventDispatcher
public void handle(VertexEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getVertexId().getDAGId().getId();
event.getDAGId().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}

Vertex vertex =
dag.getVertex(event.getVertexId());
dag.getVertex(event.getVertexID());
((EventHandler<VertexEvent>) vertex).handle(event);
}
}
Expand Down
12 changes: 6 additions & 6 deletions tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -846,19 +846,19 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
case TASK_STARTED:
{
TaskStartedEvent taskStartedEvent = (TaskStartedEvent) event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getTaskID().getVertexID());
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getTaskID().getVertexID());
"Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getVertexID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskStartedEvent.getTaskID());
taskRecoveryData.taskStartedEvent = taskStartedEvent;
break;
}
case TASK_FINISHED:
{
TaskFinishedEvent taskFinishedEvent = (TaskFinishedEvent) event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getTaskID().getVertexID());
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getTaskID().getVertexID());
"Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getVertexID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskFinishedEvent.getTaskID());
taskRecoveryData.taskFinishedEvent = taskFinishedEvent;
break;
Expand All @@ -867,7 +867,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
{
TaskAttemptStartedEvent taStartedEvent = (TaskAttemptStartedEvent)event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(
taStartedEvent.getTaskAttemptID().getTaskID().getVertexID());
taStartedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskAttemptStartedEvent, its vertexId does not exist, taId=" + taStartedEvent.getTaskAttemptID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap
Expand All @@ -882,7 +882,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
{
TaskAttemptFinishedEvent taFinishedEvent = (TaskAttemptFinishedEvent)event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(
taFinishedEvent.getTaskAttemptID().getTaskID().getVertexID());
taFinishedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskAttemtFinishedEvent, its vertexId does not exist, taId=" + taFinishedEvent.getTaskAttemptID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,14 @@ public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
}
}
if (!eventsForVertex.isEmpty()) {
TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
TezVertexID vertexId = taskAttemptID.getVertexID();
sendEvent(
new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex)));
}
taskHeartbeatHandler.pinged(taskAttemptID);
eventInfo = context
.getCurrentDAG()
.getVertex(taskAttemptID.getTaskID().getVertexID())
.getVertex(taskAttemptID.getVertexID())
.getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
request.getMaxEvents());
}
Expand Down Expand Up @@ -442,7 +442,7 @@ public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {

DAG job = context.getCurrentDAG();
Task task =
job.getVertex(taskAttemptId.getTaskID().getVertexID()).
job.getVertex(taskAttemptId.getVertexID()).
getTask(taskAttemptId.getTaskID());
return task.canCommit(taskAttemptId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ public void addVertexConcurrencyLimit(TezVertexID vId, int concurrency) {
public void scheduleTask(DAGEventSchedulerUpdate event) {
VertexInfo vInfo = null;
if (vertexInfo != null) {
vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
vInfo = vertexInfo.get(event.getVertexID());
}
scheduleTaskWithLimit(event, vInfo);
}

private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vInfo) {
if (vInfo != null) {
if (vInfo.concurrency >= vInfo.concurrencyLimit) {
vInfo.pendingAttempts.put(event.getAttempt().getID(), event);
vInfo.pendingAttempts.put(event.getTaskAttemptID(), event);
return; // already at max concurrency
}
vInfo.concurrency++;
Expand All @@ -71,9 +71,9 @@ private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vIn
public void taskCompleted(DAGEventSchedulerUpdate event) {
taskCompletedEx(event);
if (vertexInfo != null) {
VertexInfo vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
VertexInfo vInfo = vertexInfo.get(event.getVertexID());
if (vInfo != null) {
if(vInfo.pendingAttempts.remove(event.getAttempt().getID()) == null) {
if(vInfo.pendingAttempts.remove(event.getTaskAttemptID()) == null) {
vInfo.concurrency--;
if(!vInfo.pendingAttempts.isEmpty()) {
Iterator<DAGEventSchedulerUpdate> i = vInfo.pendingAttempts.values().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ public void onTaskSucceeded(String vertexName, TezTaskID taskId, int attemptId)
Iterator<TezEvent> eventIterator = events.iterator();
while (eventIterator.hasNext()) {
TezEvent tezEvent = eventIterator.next();
int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
int taskIndex = tezEvent.getSourceInfo().getTaskID().getId();
int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();
if (taskIndex == taskId.getId()) {
// Process only if there's a pending event for the specific succeeded task
Expand All @@ -476,7 +476,7 @@ public void handleInputInitializerEvents(Collection<TezEvent> tezEvents) {
List<InputInitializerEvent> toForwardEvents = new LinkedList<InputInitializerEvent>();
for (TezEvent tezEvent : tezEvents) {
String srcVertexName = tezEvent.getSourceInfo().getTaskVertexName();
int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
int taskIndex = tezEvent.getSourceInfo().getTaskID().getId();
int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();

Map<Integer, Integer> vertexSuccessfulAttemptMap =
Expand All @@ -496,7 +496,7 @@ public void handleInputInitializerEvents(Collection<TezEvent> tezEvents) {
Vertex srcVertex = appContext.getCurrentDAG().getVertex(srcVertexName);
Task task = srcVertex.getTask(taskIndex);
if (task.getState() == TaskState.SUCCEEDED) {
successfulAttemptInteger = task.getSuccessfulAttempt().getID().getId();
successfulAttemptInteger = task.getSuccessfulAttempt().getTaskAttemptID().getId();
vertexSuccessfulAttemptMap.put(taskIndex, successfulAttemptInteger);
}
}
Expand Down
5 changes: 2 additions & 3 deletions tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.oldrecords.TaskReport;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.records.TaskIDHolder;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;

/**
* Read only view of Task.
*/
public interface Task {
TezTaskID getTaskId();
public interface Task extends TaskIDHolder {
TaskReport getReport();
TaskState getState();
TezCounters getCounters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.records.TaskAttemptIDHolder;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.impl.TezEvent;

/**
* Read only view of TaskAttempt.
*/
public interface TaskAttempt {
public interface TaskAttempt extends TaskAttemptIDHolder {

public static class TaskAttemptStatus {
public TezTaskAttemptID id;
Expand Down Expand Up @@ -66,11 +64,6 @@ public void setLocalityCounter(DAGCounter localityCounter) {
}
}
}

TezTaskAttemptID getID();
TezTaskID getTaskID();
TezVertexID getVertexID();
TezDAGID getDAGID();

Task getTask();
TaskAttemptReport getReport();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
package org.apache.tez.dag.app.dag.event;

import org.apache.tez.common.TezAbstractEvent;
import org.apache.tez.dag.records.DAGIDHolder;
import org.apache.tez.dag.records.TezDAGID;

/**
* This class encapsulates job related events.
*
*/
public class DAGEvent extends TezAbstractEvent<DAGEventType> {
public class DAGEvent extends TezAbstractEvent<DAGEventType> implements DAGIDHolder {

private TezDAGID dagId;

Expand All @@ -34,6 +35,7 @@ public DAGEvent(TezDAGID dagId, DAGEventType type) {
this.dagId = dagId;
}

@Override
public TezDAGID getDAGId() {
return dagId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.tez.dag.app.dag.event;

import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.records.TaskAttemptIDHolder;
import org.apache.tez.dag.records.TezTaskAttemptID;

public class DAGEventSchedulerUpdate extends DAGEvent implements TaskAttemptIDHolder {

public class DAGEventSchedulerUpdate extends DAGEvent {

public enum UpdateType {
TA_SCHEDULE,
TA_COMPLETED
Expand All @@ -31,7 +33,7 @@ public enum UpdateType {
private final UpdateType updateType;

public DAGEventSchedulerUpdate(UpdateType updateType, TaskAttempt attempt) {
super(attempt.getDAGID(),
super(attempt.getDAGId(),
DAGEventType.DAG_SCHEDULER_UPDATE);
this.attempt = attempt;
this.updateType = updateType;
Expand All @@ -44,4 +46,9 @@ public UpdateType getUpdateType() {
public TaskAttempt getAttempt() {
return attempt;
}

@Override
public TezTaskAttemptID getTaskAttemptID() {
return attempt.getTaskAttemptID();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public SpeculatorEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttempt

public SpeculatorEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
long timestamp, boolean justStarted) {
super(SpeculatorEventType.S_TASK_ATTEMPT_STATUS_UPDATE, taId.getTaskID().getVertexID());
super(SpeculatorEventType.S_TASK_ATTEMPT_STATUS_UPDATE, taId.getVertexID());
this.id = taId;
this.state = state;
this.timestamp = timestamp;
Expand Down
Loading

0 comments on commit afbc774

Please sign in to comment.