Skip to content

Commit

Permalink
TEZ-4227 Introduce convenient methods in TezID subclasses (#166) (Ger…
Browse files Browse the repository at this point in the history
…gely Hanko reviewed by Laszlo Bodor)
ghanko authored Feb 10, 2022

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 6d7ef20 commit 7b66d3d
Showing 91 changed files with 834 additions and 711 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.records;

import org.apache.hadoop.yarn.api.records.ApplicationId;

public interface DAGIDAware {
TezDAGID getDAGID();

default ApplicationId getApplicationId() {
return getDAGID().getApplicationId();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.records;

public interface TaskAttemptIDAware extends TaskIDAware {
TezTaskAttemptID getTaskAttemptID();

@Override
default TezTaskID getTaskID() {
return getTaskAttemptID().getTaskID();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.records;

public interface TaskIDAware extends VertexIDAware {
TezTaskID getTaskID();

@Override
default TezVertexID getVertexID() {
return getTaskID().getVertexID();
}
}
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TezTaskAttemptID extends TezID {
public class TezTaskAttemptID extends TezID implements TaskIDAware {
public static final String ATTEMPT = "attempt";
private TezTaskID taskId;

@@ -73,6 +73,7 @@ private TezTaskAttemptID(TezTaskID taskId, int id) {
}

/** Returns the {@link TezTaskID} object that this task attempt belongs to */
@Override
public TezTaskID getTaskID() {
return taskId;
}
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TezTaskID extends TezID {
public class TezTaskID extends TezID implements VertexIDAware {
public static final String TASK = "task";
private final int serializingHash;

@@ -79,6 +79,7 @@ public int getSerializingHash() {
}

/** Returns the {@link TezVertexID} object that this task belongs to */
@Override
public TezVertexID getVertexID() {
return vertexId;
}
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TezVertexID extends TezID {
public class TezVertexID extends TezID implements DAGIDAware {
public static final String VERTEX = "vertex";
static final ThreadLocal<FastNumberFormat> tezVertexIdFormat = new ThreadLocal<FastNumberFormat>() {

@@ -80,7 +80,8 @@ private TezVertexID(TezDAGID dagId, int id) {
}

/** Returns the {@link TezDAGID} object that this tip belongs to */
public TezDAGID getDAGId() {
@Override
public TezDAGID getDAGID() {
return dagId;
}

@@ -159,5 +160,4 @@ public static TezVertexID fromString(String vertexIdStr) {
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.records;

public interface VertexIDAware extends DAGIDAware {
TezVertexID getVertexID();

@Override
default TezDAGID getDAGID() {
return getVertexID().getDAGID();
}
}
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ public class VertexIdentifierImpl implements VertexIdentifier {
public VertexIdentifierImpl(String dagName, String vertexName, TezVertexID vertexId) {
this.vertexId = vertexId;
this.vertexName = vertexName;
this.dagIdentifier = new DagIdentifierImpl(dagName, vertexId.getDAGId());
this.dagIdentifier = new DagIdentifierImpl(dagName, vertexId.getDAGID());
}

@Override
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ private void verifyDagInfo(String[] splits, TezDAGID dagId) {
}

private void verifyVertexInfo(String[] splits, TezVertexID vId) {
verifyDagInfo(splits, vId.getDAGId());
verifyDagInfo(splits, vId.getDAGID());
Assert.assertEquals(vId.getId(),
Integer.valueOf(splits[4]).intValue());
}
17 changes: 8 additions & 9 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
@@ -50,7 +50,6 @@
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -2173,7 +2172,7 @@ private class DagEventDispatcher implements EventHandler<DAGEvent> {
@Override
public void handle(DAGEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex = event.getDAGId().getId();
int eventDagIndex = event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
@@ -2187,12 +2186,12 @@ private class TaskEventDispatcher implements EventHandler<TaskEvent> {
public void handle(TaskEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getTaskID().getVertexID().getDAGId().getId();
event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Task task =
dag.getVertex(event.getTaskID().getVertexID()).
dag.getVertex(event.getVertexID()).
getTask(event.getTaskID());
((EventHandler<TaskEvent>)task).handle(event);
}
@@ -2217,13 +2216,13 @@ private class TaskAttemptEventDispatcher
public void handle(TaskAttemptEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Task task =
dag.getVertex(event.getTaskAttemptID().getTaskID().getVertexID()).
getTask(event.getTaskAttemptID().getTaskID());
dag.getVertex(event.getVertexID()).
getTask(event.getTaskID());
TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
((EventHandler<TaskAttemptEvent>) attempt).handle(event);
}
@@ -2236,13 +2235,13 @@ private class VertexEventDispatcher
public void handle(VertexEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getVertexId().getDAGId().getId();
event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}

Vertex vertex =
dag.getVertex(event.getVertexId());
dag.getVertex(event.getVertexID());
((EventHandler<VertexEvent>) vertex).handle(event);
}
}
12 changes: 6 additions & 6 deletions tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
Original file line number Diff line number Diff line change
@@ -846,19 +846,19 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
case TASK_STARTED:
{
TaskStartedEvent taskStartedEvent = (TaskStartedEvent) event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getTaskID().getVertexID());
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getTaskID().getVertexID());
"Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getVertexID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskStartedEvent.getTaskID());
taskRecoveryData.taskStartedEvent = taskStartedEvent;
break;
}
case TASK_FINISHED:
{
TaskFinishedEvent taskFinishedEvent = (TaskFinishedEvent) event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getTaskID().getVertexID());
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getTaskID().getVertexID());
"Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getVertexID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskFinishedEvent.getTaskID());
taskRecoveryData.taskFinishedEvent = taskFinishedEvent;
break;
@@ -867,7 +867,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
{
TaskAttemptStartedEvent taStartedEvent = (TaskAttemptStartedEvent)event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(
taStartedEvent.getTaskAttemptID().getTaskID().getVertexID());
taStartedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskAttemptStartedEvent, its vertexId does not exist, taId=" + taStartedEvent.getTaskAttemptID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap
@@ -882,7 +882,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
{
TaskAttemptFinishedEvent taFinishedEvent = (TaskAttemptFinishedEvent)event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(
taFinishedEvent.getTaskAttemptID().getTaskID().getVertexID());
taFinishedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskAttemtFinishedEvent, its vertexId does not exist, taId=" + taFinishedEvent.getTaskAttemptID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap
Original file line number Diff line number Diff line change
@@ -350,14 +350,14 @@ public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
}
}
if (!eventsForVertex.isEmpty()) {
TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
TezVertexID vertexId = taskAttemptID.getVertexID();
sendEvent(
new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex)));
}
taskHeartbeatHandler.pinged(taskAttemptID);
eventInfo = context
.getCurrentDAG()
.getVertex(taskAttemptID.getTaskID().getVertexID())
.getVertex(taskAttemptID.getVertexID())
.getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
request.getMaxEvents());
}
@@ -442,7 +442,7 @@ public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {

DAG job = context.getCurrentDAG();
Task task =
job.getVertex(taskAttemptId.getTaskID().getVertexID()).
job.getVertex(taskAttemptId.getVertexID()).
getTask(taskAttemptId.getTaskID());
return task.canCommit(taskAttemptId);
}
Original file line number Diff line number Diff line change
@@ -52,15 +52,15 @@ public void addVertexConcurrencyLimit(TezVertexID vId, int concurrency) {
public void scheduleTask(DAGEventSchedulerUpdate event) {
VertexInfo vInfo = null;
if (vertexInfo != null) {
vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
vInfo = vertexInfo.get(event.getVertexID());
}
scheduleTaskWithLimit(event, vInfo);
}

private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vInfo) {
if (vInfo != null) {
if (vInfo.concurrency >= vInfo.concurrencyLimit) {
vInfo.pendingAttempts.put(event.getAttempt().getID(), event);
vInfo.pendingAttempts.put(event.getTaskAttemptID(), event);
return; // already at max concurrency
}
vInfo.concurrency++;
@@ -71,9 +71,9 @@ private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vIn
public void taskCompleted(DAGEventSchedulerUpdate event) {
taskCompletedEx(event);
if (vertexInfo != null) {
VertexInfo vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
VertexInfo vInfo = vertexInfo.get(event.getVertexID());
if (vInfo != null) {
if(vInfo.pendingAttempts.remove(event.getAttempt().getID()) == null) {
if(vInfo.pendingAttempts.remove(event.getTaskAttemptID()) == null) {
vInfo.concurrency--;
if(!vInfo.pendingAttempts.isEmpty()) {
Iterator<DAGEventSchedulerUpdate> i = vInfo.pendingAttempts.values().iterator();
Original file line number Diff line number Diff line change
@@ -457,7 +457,7 @@ public void onTaskSucceeded(String vertexName, TezTaskID taskId, int attemptId)
Iterator<TezEvent> eventIterator = events.iterator();
while (eventIterator.hasNext()) {
TezEvent tezEvent = eventIterator.next();
int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
int taskIndex = tezEvent.getSourceInfo().getTaskID().getId();
int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();
if (taskIndex == taskId.getId()) {
// Process only if there's a pending event for the specific succeeded task
@@ -476,7 +476,7 @@ public void handleInputInitializerEvents(Collection<TezEvent> tezEvents) {
List<InputInitializerEvent> toForwardEvents = new LinkedList<InputInitializerEvent>();
for (TezEvent tezEvent : tezEvents) {
String srcVertexName = tezEvent.getSourceInfo().getTaskVertexName();
int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
int taskIndex = tezEvent.getSourceInfo().getTaskID().getId();
int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();

Map<Integer, Integer> vertexSuccessfulAttemptMap =
@@ -496,7 +496,7 @@ public void handleInputInitializerEvents(Collection<TezEvent> tezEvents) {
Vertex srcVertex = appContext.getCurrentDAG().getVertex(srcVertexName);
Task task = srcVertex.getTask(taskIndex);
if (task.getState() == TaskState.SUCCEEDED) {
successfulAttemptInteger = task.getSuccessfulAttempt().getID().getId();
successfulAttemptInteger = task.getSuccessfulAttempt().getTaskAttemptID().getId();
vertexSuccessfulAttemptMap.put(taskIndex, successfulAttemptInteger);
}
}
Loading

0 comments on commit 7b66d3d

Please sign in to comment.