diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java index acc5f12933..18c78c6847 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -624,23 +624,24 @@ public static TezCountersProto convertTezCountersToProto( } public static DAGProtos.StatusGetOptsProto convertStatusGetOptsToProto( - StatusGetOpts statusGetOpts) { + StatusGetOpts statusGetOpts) { switch (statusGetOpts) { - case GET_COUNTERS: - return DAGProtos.StatusGetOptsProto.GET_COUNTERS; + case GET_COUNTERS: + return DAGProtos.StatusGetOptsProto.GET_COUNTERS; + case GET_DETAILS: + return DAGProtos.StatusGetOptsProto.GET_DETAILS; } - throw new TezUncheckedException("Could not convert StatusGetOpts to" - + " proto"); + throw new TezUncheckedException("Could not convert StatusGetOpts to" + " proto"); } - public static StatusGetOpts convertStatusGetOptsFromProto( - DAGProtos.StatusGetOptsProto proto) { + public static StatusGetOpts convertStatusGetOptsFromProto(DAGProtos.StatusGetOptsProto proto) { switch (proto) { - case GET_COUNTERS: - return StatusGetOpts.GET_COUNTERS; + case GET_COUNTERS: + return StatusGetOpts.GET_COUNTERS; + case GET_DETAILS: + return StatusGetOpts.GET_DETAILS; } - throw new TezUncheckedException("Could not convert to StatusGetOpts from" - + " proto"); + throw new TezUncheckedException("Could not convert to StatusGetOpts from" + " proto"); } public static List convertStatusGetOptsToProto( diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java index 1a9df7afa1..97b64f7205 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java @@ -29,5 +29,6 @@ @Evolving public enum StatusGetOpts { /** Retrieve Counters with Status */ - GET_COUNTERS + GET_COUNTERS, + GET_DETAILS } diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java index dfb9bbe8cd..57b8dfca5d 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java @@ -59,6 +59,10 @@ public VertexStatus(VertexStatusProtoOrBuilder proxy) { this.proxy = proxy; } + public String getId() { + return proxy.getId(); + } + public State getState() { return getState(proxy.getState()); } @@ -96,6 +100,10 @@ public List getDiagnostics() { return proxy.getDiagnosticsList(); } + public List getTasks() { + return proxy.getTasksList(); + } + public Progress getProgress() { if(progress == null && proxy.hasProgress()) { progress = new Progress(proxy.getProgress()); diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto index 34c369d430..4bfcc6f556 100644 --- a/tez-api/src/main/proto/DAGApiRecords.proto +++ b/tez-api/src/main/proto/DAGApiRecords.proto @@ -245,10 +245,12 @@ enum VertexStatusStateProto { } message VertexStatusProto { - optional VertexStatusStateProto state = 1; - repeated string diagnostics = 2; - optional ProgressProto progress = 3; - optional TezCountersProto vertexCounters = 4; + required string id = 1; + optional VertexStatusStateProto state = 2; + repeated string diagnostics = 3; + optional ProgressProto progress = 4; + optional TezCountersProto vertexCounters = 5; + repeated string tasks = 6; } enum DAGStatusStateProto { @@ -298,6 +300,7 @@ message TezCountersProto { enum StatusGetOptsProto { GET_COUNTERS = 0; + GET_DETAILS = 1; } message VertexLocationHintProto { diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java index 265fce9d43..edb7fd8445 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java @@ -34,8 +34,10 @@ import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.security.DAGAccessControls; import org.apache.tez.dag.api.Vertex.VertexExecutionContext; +import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.records.DAGProtos.ACLInfo; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; +import org.apache.tez.dag.api.records.DAGProtos.StatusGetOptsProto; import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto; @@ -235,6 +237,23 @@ public void testAclConversions() { assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo); } + /* + * This unit test can catch if a StatusGetOpts <-> StatusGetOptsProto value is not defined at any + * side. + */ + @Test + public void testConvertStatusGetOptsToProtoCoverage() { + StatusGetOpts[] opts = StatusGetOpts.values(); + for (StatusGetOpts opt : opts) { + DagTypeConverters.convertStatusGetOptsToProto(opt); + } + + StatusGetOptsProto[] optProtos = StatusGetOptsProto.values(); + for (StatusGetOptsProto proto : optProtos) { + DagTypeConverters.convertStatusGetOptsFromProto(proto); + } + } + private void assertSame(DAGAccessControls dagAccessControls, ACLInfo aclInfo) { assertEquals(dagAccessControls.getUsersWithViewACLs(), Sets.newHashSet(aclInfo.getUsersWithViewAccessList())); diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java index 6a5e817d44..50c9a6061c 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java @@ -152,6 +152,7 @@ private void setUpData(){ .build(); vertexStatusProtoWithoutCounters = VertexStatusProto.newBuilder() + .setId("vertex_1") .addDiagnostics("V_Diagnostics_0") .setProgress(vertexProgressProto) .setState(VertexStatusStateProto.VERTEX_SUCCEEDED) // make sure the waitForCompletion be able to finish diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java index 4de321cf2c..dffb828824 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java @@ -28,6 +28,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexStatusStateProto; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.records.TezVertexID; public class VertexStatusBuilder extends VertexStatus { @@ -35,6 +36,10 @@ public VertexStatusBuilder() { super(VertexStatusProto.newBuilder()); } + public void setId(TezVertexID vertexId) { + getBuilder().setId(vertexId.toString()); + } + public void setState(VertexState state) { getBuilder().setState(getProtoState(state)); } @@ -54,6 +59,12 @@ public void setVertexCounters(TezCounters counters) { DagTypeConverters.convertTezCountersToProto(counters)); } + public void setVertexTasks(List taskIds) { + Builder builder = getBuilder(); + builder.clearTasks(); + builder.addAllTasks(taskIds); + } + public VertexStatusProto getProto() { return getBuilder().build(); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index e21add0e3b..c3595f2385 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -39,6 +39,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -1536,12 +1537,17 @@ public VertexStatusBuilder getVertexStatus( this.readLock.lock(); try { VertexStatusBuilder status = new VertexStatusBuilder(); + status.setId(getVertexId()); status.setState(getInternalState()); status.setDiagnostics(diagnostics); status.setProgress(getVertexProgress()); if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) { status.setVertexCounters(getAllCounters()); } + if (statusOptions.contains(StatusGetOpts.GET_DETAILS)) { + status.setVertexTasks( + getTasks().keySet().stream().map(Object::toString).collect(Collectors.toList())); + } return status; } finally { this.readLock.unlock();