From e8aea3fb38bed383cc9f8ed862c99461e0d5a7c1 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 24 Jan 2025 02:36:48 -0800 Subject: [PATCH] [Dataflow Streaming] Reduce contention on work submission (#33687) * [Dataflow Streaming] Reduce contention on work submission --- .../worker/util/BoundedQueueExecutor.java | 47 +++++++++++++++++-- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 9905c0ae5b5b..dc611174b7eb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -17,10 +17,12 @@ */ package org.apache.beam.runners.dataflow.worker.util; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.concurrent.GuardedBy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; @@ -36,6 +38,9 @@ public class BoundedQueueExecutor { // Used to guard elementsOutstanding and bytesOutstanding. private final Monitor monitor = new Monitor(); + private final ConcurrentLinkedQueue decrementQueue = new ConcurrentLinkedQueue<>(); + private final Object decrementQueueDrainLock = new Object(); + private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false); private int elementsOutstanding = 0; private long bytesOutstanding = 0; @@ -236,10 +241,44 @@ private void executeMonitorHeld(Runnable work, long workBytes) { } private void decrementCounters(long workBytes) { - monitor.enter(); - --elementsOutstanding; - bytesOutstanding -= workBytes; - monitor.leave(); + // All threads queue decrements and one thread grabs the monitor and updates + // counters. We do this to reduce contention on monitor which is locked by + // GetWork thread + decrementQueue.add(workBytes); + boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true); + if (submittedToExistingBatch) { + // There is already a thread about to drain the decrement queue + // Current thread does not need to drain. + return; + } + synchronized (decrementQueueDrainLock) { + // By setting false here, we may allow another decrement to claim submission of the next batch + // and start waiting on the decrementQueueDrainLock. + // + // However this prevents races that would leave decrements in the queue and unclaimed and we + // are ensured there is at most one additional thread blocked. This helps prevent the executor + // from creating threads over the limit if many were contending on the lock while their + // decrements were already applied. + isDecrementBatchPending.set(false); + long bytesToDecrement = 0; + int elementsToDecrement = 0; + while (true) { + Long pollResult = decrementQueue.poll(); + if (pollResult == null) { + break; + } + bytesToDecrement += pollResult; + ++elementsToDecrement; + } + if (elementsToDecrement == 0) { + return; + } + + monitor.enter(); + elementsOutstanding -= elementsToDecrement; + bytesOutstanding -= bytesToDecrement; + monitor.leave(); + } } private long bytesAvailable() {