Skip to content

Commit

Permalink
TEZ-4339: Expose real-time memory consumption of AM and task containe…
Browse files Browse the repository at this point in the history
…rs via DagClient
  • Loading branch information
abstractdog committed Oct 24, 2021
1 parent 58fca8b commit 8acabcd
Show file tree
Hide file tree
Showing 13 changed files with 74 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ public static DAGProtos.StatusGetOptsProto convertStatusGetOptsToProto(
switch (statusGetOpts) {
case GET_COUNTERS:
return DAGProtos.StatusGetOptsProto.GET_COUNTERS;
case GET_MEMORY_USAGE:
return DAGProtos.StatusGetOptsProto.GET_MEMORY_USAGE;
}
throw new TezUncheckedException("Could not convert StatusGetOpts to" + " proto");
}
Expand All @@ -636,6 +638,8 @@ public static StatusGetOpts convertStatusGetOptsFromProto(DAGProtos.StatusGetOpt
switch (proto) {
case GET_COUNTERS:
return StatusGetOpts.GET_COUNTERS;
case GET_MEMORY_USAGE:
return StatusGetOpts.GET_MEMORY_USAGE;
}
throw new TezUncheckedException("Could not convert to StatusGetOpts from" + " proto");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ public int hashCode() {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("status=" + getState()
+ ", progress=" + getDAGProgress()
+ ", diagnostics="
+ StringUtils.join(getDiagnostics(), LINE_SEPARATOR)
+ ", counters="
+ (getDAGCounters() == null ? "null" : getDAGCounters().toString()));
sb.append("status=" + getState());
sb.append(", progress=" + getDAGProgress());
sb.append(", diagnostics=" + StringUtils.join(getDiagnostics(), LINE_SEPARATOR));
sb.append(", memoryUsedByAM=").append(proxy.getMemoryUsedByAM());
sb.append(", memoryUsedByTasks=").append(proxy.getMemoryUsedByTasks());
sb.append(", counters=" + (getDAGCounters() == null ? "null" : getDAGCounters().toString()));
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@
@Evolving
public enum StatusGetOpts {
/** Retrieve Counters with Status */
GET_COUNTERS
GET_COUNTERS,
GET_MEMORY_USAGE
}
3 changes: 3 additions & 0 deletions tez-api/src/main/proto/DAGApiRecords.proto
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ message DAGStatusProto {
optional ProgressProto DAGProgress = 3;
repeated StringProgressPairProto vertexProgress = 4;
optional TezCountersProto dagCounters = 5;
optional int64 memoryUsedByAM = 6;
optional int64 memoryUsedByTasks = 7;
}

message PlanLocalResourcesProto {
Expand All @@ -299,6 +301,7 @@ message TezCountersProto {

enum StatusGetOptsProto {
GET_COUNTERS = 0;
GET_MEMORY_USAGE = 1;
}

message VertexLocationHintProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public void addVertexProgress(String name, ProgressBuilder progress) {
getBuilder().addVertexProgress(builder.build());
}

//TODO: let this be a map of values in protobuf 3.x
public void setMemoryUsage(long memoryUsedByAM, long memoryUsedByTasks) {
Builder builder = getBuilder();
builder.setMemoryUsedByAM(memoryUsedByAM);
builder.setMemoryUsedByTasks(memoryUsedByTasks);
}

public DAGStatusProto getProto() {
return getBuilder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,4 +678,12 @@ public String getCompletedLogsUrl(int taskCommId, TezTaskAttemptID attemptID, No
return null;
}

@Override
public long getTotalUsedMemory() {
long totalUsedMemory = 0;
for (int i = 0; i < taskCommunicators.length; i++) {
totalUsedMemory += taskCommunicators[i].getTaskCommunicator().getTotalUsedMemory();
}
return totalUsedMemory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ public interface TaskCommunicatorManagerInterface {

String getCompletedLogsUrl(int taskCommId, TezTaskAttemptID attemptID, NodeId containerNodeId);

long getTotalUsedMemory();
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public static final class ContainerInfo {
Credentials credentials = null;
boolean credentialsChanged = false;
boolean taskPulled = false;
long usedMemory = 0;

void reset() {
taskSpec = null;
Expand Down Expand Up @@ -382,6 +383,7 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce
response.setLastRequestId(requestId);
containerInfo.lastRequestId = requestId;
containerInfo.lastResponse = response;
containerInfo.usedMemory = request.getUsedMemory();
return response;
}

Expand Down Expand Up @@ -466,4 +468,8 @@ protected ContainerInfo getContainerInfo(ContainerId containerId) {
protected ContainerId getContainerForAttempt(TezTaskAttemptID taskAttemptId) {
return attemptToContainerMap.get(taskAttemptId);
}

public long getTotalUsedMemory() {
return registeredContainers.values().stream().mapToLong(c -> c.usedMemory).sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.tez.dag.app.dag.impl;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
Expand Down Expand Up @@ -244,6 +246,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private static final CommitCompletedTransition COMMIT_COMPLETED_TRANSITION =
new CommitCompletedTransition();

private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();

protected static final
StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
stateMachineFactory
Expand Down Expand Up @@ -940,6 +944,10 @@ public DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions) {
if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
status.setDAGCounters(getAllCounters());
}
if (statusOptions.contains(StatusGetOpts.GET_MEMORY_USAGE)) {
status.setMemoryUsage(memoryMXBean.getHeapMemoryUsage().getUsed(),
taskCommunicatorManagerInterface.getTotalUsedMemory());
}
return status;
} finally {
readLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,13 @@ public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNo
return null;
}

/**
* Return the amount of memory used by the containers. Each container is supposed to refresh
* its current state via heartbeat requests, and the TaskCommunicator implementation is supposed
* to aggregate this properly.
* @return memory in MB
*/
public long getTotalUsedMemory() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ public Void call() throws Exception {
EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
MockDAGAppMaster.this.getContext().getClock().getTime()));
TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000, 0);
doHeartbeat(request, cData);
} else if (version != null && cData.taId.getId() <= version.intValue()) {
preemptContainer(cData);
Expand All @@ -443,7 +443,7 @@ public Void call() throws Exception {
EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
MockDAGAppMaster.this.getContext().getClock().getTime()));
TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events,
cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000, 0);
doHeartbeat(request, cData);
cData.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,22 @@ public class TezHeartbeatRequest implements Writable {
private int preRoutedStartIndex;
private int maxEvents;
private long requestId;
private long usedMemory;

public TezHeartbeatRequest() {
}

public TezHeartbeatRequest(long requestId, List<TezEvent> events,
int preRoutedStartIndex, String containerIdentifier,
TezTaskAttemptID taskAttemptID, int startIndex, int maxEvents) {
TezTaskAttemptID taskAttemptID, int startIndex, int maxEvents, long usedMemory) {
this.containerIdentifier = containerIdentifier;
this.requestId = requestId;
this.events = Collections.unmodifiableList(events);
this.startIndex = startIndex;
this.preRoutedStartIndex = preRoutedStartIndex;
this.maxEvents = maxEvents;
this.currentTaskAttemptID = taskAttemptID;
this.usedMemory = usedMemory;
}

public String getContainerIdentifier() {
Expand Down Expand Up @@ -83,6 +85,10 @@ public TezTaskAttemptID getCurrentTaskAttemptID() {
return currentTaskAttemptID;
}

public long getUsedMemory() {
return usedMemory;
}

@Override
public void write(DataOutput out) throws IOException {
if (events != null) {
Expand All @@ -105,6 +111,7 @@ public void write(DataOutput out) throws IOException {
out.writeInt(maxEvents);
out.writeLong(requestId);
Text.writeString(out, containerIdentifier);
out.writeLong(usedMemory);
}

@Override
Expand All @@ -128,6 +135,7 @@ public void readFields(DataInput in) throws IOException {
maxEvents = in.readInt();
requestId = in.readLong();
containerIdentifier = Text.readString(in);
usedMemory = in.readLong();
}

@Override
Expand All @@ -140,6 +148,7 @@ public String toString() {
+ ", maxEventsToGet=" + maxEvents
+ ", taskAttemptId=" + currentTaskAttemptID
+ ", eventCount=" + (events != null ? events.size() : 0)
+ ", usedMemory=" + usedMemory
+ " }";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.tez.runtime.task;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class TaskReporter implements TaskReporterInterface {
private final String containerIdStr;

private final ListeningExecutorService heartbeatExecutor;
private static final MemoryUsage heapMemoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();

@VisibleForTesting
HeartbeatCallable currentCallable;
Expand Down Expand Up @@ -263,7 +266,7 @@ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) t
int fromPreRoutedEventId = task.getNextPreRoutedEventId();
int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle());
TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId,
containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents);
containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents, getUsedMemory());
LOG.debug("Sending heartbeat to AM, request={}", request);

maybeLogCounters();
Expand Down Expand Up @@ -305,6 +308,10 @@ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) t
return new ResponseWrapper(false, numEventsReceived);
}

private long getUsedMemory() {
return heapMemoryUsage.getUsed();
}

public void markComplete() {
// Notify to clear pending events, if any.
lock.lock();
Expand Down

0 comments on commit 8acabcd

Please sign in to comment.