Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4227 Introduce convenient methods in TezID subclasses #166

Merged
merged 1 commit into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.records;

import org.apache.hadoop.yarn.api.records.ApplicationId;

public interface DAGIDAware {
TezDAGID getDAGID();

default ApplicationId getApplicationId() {
return getDAGID().getApplicationId();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.records;

public interface TaskAttemptIDAware extends TaskIDAware {
TezTaskAttemptID getTaskAttemptID();

@Override
default TezTaskID getTaskID() {
return getTaskAttemptID().getTaskID();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.records;

public interface TaskIDAware extends VertexIDAware {
TezTaskID getTaskID();

@Override
default TezVertexID getVertexID() {
return getTaskID().getVertexID();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TezTaskAttemptID extends TezID {
public class TezTaskAttemptID extends TezID implements TaskIDAware {
public static final String ATTEMPT = "attempt";
private TezTaskID taskId;

Expand All @@ -73,6 +73,7 @@ private TezTaskAttemptID(TezTaskID taskId, int id) {
}

/** Returns the {@link TezTaskID} object that this task attempt belongs to */
@Override
public TezTaskID getTaskID() {
return taskId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TezTaskID extends TezID {
public class TezTaskID extends TezID implements VertexIDAware {
public static final String TASK = "task";
private final int serializingHash;

Expand Down Expand Up @@ -79,6 +79,7 @@ public int getSerializingHash() {
}

/** Returns the {@link TezVertexID} object that this task belongs to */
@Override
public TezVertexID getVertexID() {
return vertexId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TezVertexID extends TezID {
public class TezVertexID extends TezID implements DAGIDAware {
public static final String VERTEX = "vertex";
static final ThreadLocal<FastNumberFormat> tezVertexIdFormat = new ThreadLocal<FastNumberFormat>() {

Expand Down Expand Up @@ -80,7 +80,8 @@ private TezVertexID(TezDAGID dagId, int id) {
}

/** Returns the {@link TezDAGID} object that this tip belongs to */
public TezDAGID getDAGId() {
@Override
public TezDAGID getDAGID() {
return dagId;
}

Expand Down Expand Up @@ -159,5 +160,4 @@ public static TezVertexID fromString(String vertexIdStr) {
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.records;

public interface VertexIDAware extends DAGIDAware {
TezVertexID getVertexID();

@Override
default TezDAGID getDAGID() {
return getVertexID().getDAGID();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class VertexIdentifierImpl implements VertexIdentifier {
public VertexIdentifierImpl(String dagName, String vertexName, TezVertexID vertexId) {
this.vertexId = vertexId;
this.vertexName = vertexName;
this.dagIdentifier = new DagIdentifierImpl(dagName, vertexId.getDAGId());
this.dagIdentifier = new DagIdentifierImpl(dagName, vertexId.getDAGID());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private void verifyDagInfo(String[] splits, TezDAGID dagId) {
}

private void verifyVertexInfo(String[] splits, TezVertexID vId) {
verifyDagInfo(splits, vId.getDAGId());
verifyDagInfo(splits, vId.getDAGID());
Assert.assertEquals(vId.getId(),
Integer.valueOf(splits[4]).intValue());
}
Expand Down
17 changes: 8 additions & 9 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -2173,7 +2172,7 @@ private class DagEventDispatcher implements EventHandler<DAGEvent> {
@Override
public void handle(DAGEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex = event.getDAGId().getId();
int eventDagIndex = event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Expand All @@ -2187,12 +2186,12 @@ private class TaskEventDispatcher implements EventHandler<TaskEvent> {
public void handle(TaskEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getTaskID().getVertexID().getDAGId().getId();
event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Task task =
dag.getVertex(event.getTaskID().getVertexID()).
dag.getVertex(event.getVertexID()).
getTask(event.getTaskID());
((EventHandler<TaskEvent>)task).handle(event);
}
Expand All @@ -2217,13 +2216,13 @@ private class TaskAttemptEventDispatcher
public void handle(TaskAttemptEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Task task =
dag.getVertex(event.getTaskAttemptID().getTaskID().getVertexID()).
getTask(event.getTaskAttemptID().getTaskID());
dag.getVertex(event.getVertexID()).
getTask(event.getTaskID());
TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
((EventHandler<TaskAttemptEvent>) attempt).handle(event);
}
Expand All @@ -2236,13 +2235,13 @@ private class VertexEventDispatcher
public void handle(VertexEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getVertexId().getDAGId().getId();
event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}

Vertex vertex =
dag.getVertex(event.getVertexId());
dag.getVertex(event.getVertexID());
((EventHandler<VertexEvent>) vertex).handle(event);
}
}
Expand Down
12 changes: 6 additions & 6 deletions tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -846,19 +846,19 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
case TASK_STARTED:
{
TaskStartedEvent taskStartedEvent = (TaskStartedEvent) event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getTaskID().getVertexID());
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getTaskID().getVertexID());
"Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getVertexID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskStartedEvent.getTaskID());
taskRecoveryData.taskStartedEvent = taskStartedEvent;
break;
}
case TASK_FINISHED:
{
TaskFinishedEvent taskFinishedEvent = (TaskFinishedEvent) event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getTaskID().getVertexID());
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getTaskID().getVertexID());
"Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getVertexID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskFinishedEvent.getTaskID());
taskRecoveryData.taskFinishedEvent = taskFinishedEvent;
break;
Expand All @@ -867,7 +867,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
{
TaskAttemptStartedEvent taStartedEvent = (TaskAttemptStartedEvent)event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(
taStartedEvent.getTaskAttemptID().getTaskID().getVertexID());
taStartedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskAttemptStartedEvent, its vertexId does not exist, taId=" + taStartedEvent.getTaskAttemptID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap
Expand All @@ -882,7 +882,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
{
TaskAttemptFinishedEvent taFinishedEvent = (TaskAttemptFinishedEvent)event;
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(
taFinishedEvent.getTaskAttemptID().getTaskID().getVertexID());
taFinishedEvent.getVertexID());
Preconditions.checkArgument(vertexRecoveryData != null,
"Invalid TaskAttemtFinishedEvent, its vertexId does not exist, taId=" + taFinishedEvent.getTaskAttemptID());
TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,14 @@ public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
}
}
if (!eventsForVertex.isEmpty()) {
TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
TezVertexID vertexId = taskAttemptID.getVertexID();
sendEvent(
new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex)));
}
taskHeartbeatHandler.pinged(taskAttemptID);
eventInfo = context
.getCurrentDAG()
.getVertex(taskAttemptID.getTaskID().getVertexID())
.getVertex(taskAttemptID.getVertexID())
.getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
request.getMaxEvents());
}
Expand Down Expand Up @@ -442,7 +442,7 @@ public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {

DAG job = context.getCurrentDAG();
Task task =
job.getVertex(taskAttemptId.getTaskID().getVertexID()).
job.getVertex(taskAttemptId.getVertexID()).
getTask(taskAttemptId.getTaskID());
return task.canCommit(taskAttemptId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ public void addVertexConcurrencyLimit(TezVertexID vId, int concurrency) {
public void scheduleTask(DAGEventSchedulerUpdate event) {
VertexInfo vInfo = null;
if (vertexInfo != null) {
vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
vInfo = vertexInfo.get(event.getVertexID());
}
scheduleTaskWithLimit(event, vInfo);
}

private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vInfo) {
if (vInfo != null) {
if (vInfo.concurrency >= vInfo.concurrencyLimit) {
vInfo.pendingAttempts.put(event.getAttempt().getID(), event);
vInfo.pendingAttempts.put(event.getTaskAttemptID(), event);
return; // already at max concurrency
}
vInfo.concurrency++;
Expand All @@ -71,9 +71,9 @@ private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vIn
public void taskCompleted(DAGEventSchedulerUpdate event) {
taskCompletedEx(event);
if (vertexInfo != null) {
VertexInfo vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
VertexInfo vInfo = vertexInfo.get(event.getVertexID());
if (vInfo != null) {
if(vInfo.pendingAttempts.remove(event.getAttempt().getID()) == null) {
if(vInfo.pendingAttempts.remove(event.getTaskAttemptID()) == null) {
vInfo.concurrency--;
if(!vInfo.pendingAttempts.isEmpty()) {
Iterator<DAGEventSchedulerUpdate> i = vInfo.pendingAttempts.values().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ public void onTaskSucceeded(String vertexName, TezTaskID taskId, int attemptId)
Iterator<TezEvent> eventIterator = events.iterator();
while (eventIterator.hasNext()) {
TezEvent tezEvent = eventIterator.next();
int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
int taskIndex = tezEvent.getSourceInfo().getTaskID().getId();
int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();
if (taskIndex == taskId.getId()) {
// Process only if there's a pending event for the specific succeeded task
Expand All @@ -476,7 +476,7 @@ public void handleInputInitializerEvents(Collection<TezEvent> tezEvents) {
List<InputInitializerEvent> toForwardEvents = new LinkedList<InputInitializerEvent>();
for (TezEvent tezEvent : tezEvents) {
String srcVertexName = tezEvent.getSourceInfo().getTaskVertexName();
int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
int taskIndex = tezEvent.getSourceInfo().getTaskID().getId();
int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();

Map<Integer, Integer> vertexSuccessfulAttemptMap =
Expand All @@ -496,7 +496,7 @@ public void handleInputInitializerEvents(Collection<TezEvent> tezEvents) {
Vertex srcVertex = appContext.getCurrentDAG().getVertex(srcVertexName);
Task task = srcVertex.getTask(taskIndex);
if (task.getState() == TaskState.SUCCEEDED) {
successfulAttemptInteger = task.getSuccessfulAttempt().getID().getId();
successfulAttemptInteger = task.getSuccessfulAttempt().getTaskAttemptID().getId();
vertexSuccessfulAttemptMap.put(taskIndex, successfulAttemptInteger);
}
}
Expand Down
Loading