From c539e822946f57bcb71f20325d4fb7eaef6e32cb Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 21 Jun 2023 13:25:16 +0530 Subject: [PATCH] TEZ-4250: Optimise TaskImpl::getCounters. Change-Id: I7db4db3a4d2cb70f2e1d96fbaa2f853524fbce8a --- .../org/apache/tez/dag/app/dag/TaskAttempt.java | 8 ++++++++ .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 11 +++++++++++ .../org/apache/tez/dag/app/dag/impl/TaskImpl.java | 15 +++++++++------ 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java index 563e4c7192..f51b576dfe 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -63,6 +64,11 @@ public void setLocalityCounter(DAGCounter localityCounter) { } } } + + @VisibleForTesting + public void setCounters(TezCounters counters) { + this.counters = counters; + } } Task getTask(); @@ -70,6 +76,8 @@ public void setLocalityCounter(DAGCounter localityCounter) { List getDiagnostics(); TaskAttemptTerminationCause getTerminationCause(); TezCounters getCounters(); + @VisibleForTesting + void setCounters(TezCounters counters); float getProgress(); TaskAttemptState getState(); TaskAttemptState getStateNoLock(); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index c8343c834b..289f1a1887 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -654,6 +654,17 @@ public TezCounters getCounters() { readLock.unlock(); } } + + @VisibleForTesting + @Override + public void setCounters(TezCounters counters) { + writeLock.lock(); + try { + reportedStatus.setCounters(counters); + } finally { + writeLock.unlock(); + } + } TaskStatistics getStatistics() { return this.statistics; diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index d08c8d3d29..c02d9184de 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -465,15 +465,19 @@ public TaskReport getReport() { @Override public TezCounters getCounters() { - TezCounters counters = new TezCounters(); - counters.incrAllCounters(this.counters); + TezCounters counters = null; + if (getVertex().isSpeculationEnabled()) { + counters = new TezCounters(); + counters.incrAllCounters(this.counters); + } readLock.lock(); try { TaskAttempt bestAttempt = selectBestAttempt(); - if (bestAttempt != null) { + if (bestAttempt != null && counters != null) { counters.incrAllCounters(bestAttempt.getCounters()); + return counters; } - return counters; + return (bestAttempt != null) ? bestAttempt.getCounters() : TaskAttemptImpl.EMPTY_COUNTERS; } finally { readLock.unlock(); } @@ -1522,10 +1526,9 @@ public void transition(TaskImpl task, TaskEvent event) { void setCounters(TezCounters counters) { try { writeLock.lock(); - this.counters = counters; + selectBestAttempt().setCounters(counters); } finally { writeLock.unlock(); } } - }