Skip to content

Commit

Permalink
TEZ-4279: Support some dag/vertex details in DagClient
Browse files Browse the repository at this point in the history
  • Loading branch information
abstractdog committed Apr 6, 2021
1 parent 73bcabd commit 5d8a2f9
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 16 deletions.
23 changes: 12 additions & 11 deletions tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
Original file line number Diff line number Diff line change
Expand Up @@ -624,23 +624,24 @@ public static TezCountersProto convertTezCountersToProto(
}

public static DAGProtos.StatusGetOptsProto convertStatusGetOptsToProto(
StatusGetOpts statusGetOpts) {
StatusGetOpts statusGetOpts) {
switch (statusGetOpts) {
case GET_COUNTERS:
return DAGProtos.StatusGetOptsProto.GET_COUNTERS;
case GET_COUNTERS:
return DAGProtos.StatusGetOptsProto.GET_COUNTERS;
case GET_DETAILS:
return DAGProtos.StatusGetOptsProto.GET_DETAILS;
}
throw new TezUncheckedException("Could not convert StatusGetOpts to"
+ " proto");
throw new TezUncheckedException("Could not convert StatusGetOpts to" + " proto");
}

public static StatusGetOpts convertStatusGetOptsFromProto(
DAGProtos.StatusGetOptsProto proto) {
public static StatusGetOpts convertStatusGetOptsFromProto(DAGProtos.StatusGetOptsProto proto) {
switch (proto) {
case GET_COUNTERS:
return StatusGetOpts.GET_COUNTERS;
case GET_COUNTERS:
return StatusGetOpts.GET_COUNTERS;
case GET_DETAILS:
return StatusGetOpts.GET_DETAILS;
}
throw new TezUncheckedException("Could not convert to StatusGetOpts from"
+ " proto");
throw new TezUncheckedException("Could not convert to StatusGetOpts from" + " proto");
}

public static List<DAGProtos.StatusGetOptsProto> convertStatusGetOptsToProto(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@
@Evolving
public enum StatusGetOpts {
/** Retrieve Counters with Status */
GET_COUNTERS
GET_COUNTERS,
GET_DETAILS
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public VertexStatus(VertexStatusProtoOrBuilder proxy) {
this.proxy = proxy;
}

public String getId() {
return proxy.getId();
}

public State getState() {
return getState(proxy.getState());
}
Expand Down Expand Up @@ -96,6 +100,10 @@ public List<String> getDiagnostics() {
return proxy.getDiagnosticsList();
}

public List<String> getTasks() {
return proxy.getTasksList();
}

public Progress getProgress() {
if(progress == null && proxy.hasProgress()) {
progress = new Progress(proxy.getProgress());
Expand Down
11 changes: 7 additions & 4 deletions tez-api/src/main/proto/DAGApiRecords.proto
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,12 @@ enum VertexStatusStateProto {
}

message VertexStatusProto {
optional VertexStatusStateProto state = 1;
repeated string diagnostics = 2;
optional ProgressProto progress = 3;
optional TezCountersProto vertexCounters = 4;
required string id = 1;
optional VertexStatusStateProto state = 2;
repeated string diagnostics = 3;
optional ProgressProto progress = 4;
optional TezCountersProto vertexCounters = 5;
repeated string tasks = 6;
}

enum DAGStatusStateProto {
Expand Down Expand Up @@ -298,6 +300,7 @@ message TezCountersProto {

enum StatusGetOptsProto {
GET_COUNTERS = 0;
GET_DETAILS = 1;
}

message VertexLocationHintProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.security.DAGAccessControls;
import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.StatusGetOptsProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto;
Expand Down Expand Up @@ -235,6 +237,23 @@ public void testAclConversions() {
assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo);
}

/*
* This unit test can catch if a StatusGetOpts <-> StatusGetOptsProto value is not defined at any
* side.
*/
@Test
public void testConvertStatusGetOptsToProtoCoverage() {
StatusGetOpts[] opts = StatusGetOpts.values();
for (StatusGetOpts opt : opts) {
DagTypeConverters.convertStatusGetOptsToProto(opt);
}

StatusGetOptsProto[] optProtos = StatusGetOptsProto.values();
for (StatusGetOptsProto proto : optProtos) {
DagTypeConverters.convertStatusGetOptsFromProto(proto);
}
}

private void assertSame(DAGAccessControls dagAccessControls, ACLInfo aclInfo) {
assertEquals(dagAccessControls.getUsersWithViewACLs(),
Sets.newHashSet(aclInfo.getUsersWithViewAccessList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ private void setUpData(){
.build();

vertexStatusProtoWithoutCounters = VertexStatusProto.newBuilder()
.setId("vertex_1")
.addDiagnostics("V_Diagnostics_0")
.setProgress(vertexProgressProto)
.setState(VertexStatusStateProto.VERTEX_SUCCEEDED) // make sure the waitForCompletion be able to finish
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@
import org.apache.tez.dag.api.records.DAGProtos.VertexStatusStateProto;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.records.TezVertexID;

public class VertexStatusBuilder extends VertexStatus {

public VertexStatusBuilder() {
super(VertexStatusProto.newBuilder());
}

public void setId(TezVertexID vertexId) {
getBuilder().setId(vertexId.toString());
}

public void setState(VertexState state) {
getBuilder().setState(getProtoState(state));
}
Expand All @@ -54,6 +59,12 @@ public void setVertexCounters(TezCounters counters) {
DagTypeConverters.convertTezCountersToProto(counters));
}

public void setVertexTasks(List<String> taskIds) {
Builder builder = getBuilder();
builder.clearTasks();
builder.addAllTasks(taskIds);
}

public VertexStatusProto getProto() {
return getBuilder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -1536,12 +1537,17 @@ public VertexStatusBuilder getVertexStatus(
this.readLock.lock();
try {
VertexStatusBuilder status = new VertexStatusBuilder();
status.setId(getVertexId());
status.setState(getInternalState());
status.setDiagnostics(diagnostics);
status.setProgress(getVertexProgress());
if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
status.setVertexCounters(getAllCounters());
}
if (statusOptions.contains(StatusGetOpts.GET_DETAILS)) {
status.setVertexTasks(
getTasks().keySet().stream().map(Object::toString).collect(Collectors.toList()));
}
return status;
} finally {
this.readLock.unlock();
Expand Down

0 comments on commit 5d8a2f9

Please sign in to comment.