Skip to content

Commit

Permalink
[Dataflow] Replace AtomicLong with LongAdder in LongSumCounterValue t…
Browse files Browse the repository at this point in the history
…o reduce contention between harness threads (#33731)
  • Loading branch information
arunpandianp authored Jan 23, 2025
1 parent 86f8354 commit da94e20
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import org.apache.beam.runners.dataflow.worker.counters.Counter.AtomicCounterValue;
import org.apache.beam.runners.dataflow.worker.counters.Counter.CounterUpdateExtractor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -380,15 +381,22 @@ public <UpdateT> UpdateT extractUpdate(
}

/** Implements a {@link Counter} for tracking the sum of long values. */
public static class LongSumCounterValue extends LongCounterValue {
public static class LongSumCounterValue extends BaseCounterValue<Long, Long> {
private final LongAdder aggregate = new LongAdder();

@Override
public Long getAggregate() {
return aggregate.sum();
}

@Override
public void addValue(Long value) {
aggregate.addAndGet(value);
aggregate.add(value);
}

@Override
public Long getAndReset() {
return aggregate.getAndSet(0);
return aggregate.sumThenReset();
}

@Override
Expand Down

0 comments on commit da94e20

Please sign in to comment.