From 9e04ff4a9fe12473be10b043887206134fcc66fd Mon Sep 17 00:00:00 2001 From: "zhangzhao.08" Date: Sun, 22 Dec 2024 23:08:36 -0800 Subject: [PATCH] [CELEBORN-1763] Fix DataPusher be blocked for a long time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? fix DataPusher be blocked for a long time ### Why are the changes needed? The worker has been at a performance bottleneck for a long time, the slow start strategy adjusts its maxInFlight to 1, which may cause RequestInFlight to exceed maxInFlight. If the task’s main thread has been blocked in the waitIdleQueueFullWithLock call, then the main thread will not be able to detect the sending failure since this failure changes the exception in the push state, and the waitIdleQueueFullWithLock function does not check for it ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? GA Closes #2978 from zhaostu4/fix_pusher_block. Authored-by: zhangzhao.08 Signed-off-by: Wang, Fei --- .../apache/celeborn/client/write/DataPusher.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java index e366ae7c2f3..bc02570da52 100644 --- a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java +++ b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java @@ -35,6 +35,8 @@ import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.exception.CelebornIOException; import org.apache.celeborn.common.util.ThreadUtils; +import org.apache.celeborn.common.util.Utils; +import org.apache.celeborn.common.write.PushState; public class DataPusher { private static final Logger logger = LoggerFactory.getLogger(DataPusher.class); @@ -43,6 +45,7 @@ public class DataPusher { private LinkedBlockingQueue idleQueue; // partition -> PushTask Queue + private final PushState pushState; private final DataPushQueue dataPushQueue; private final ReentrantLock idleLock = new ReentrantLock(); private final Condition idleFull = idleLock.newCondition(); @@ -98,6 +101,8 @@ public DataPusher( this.client = client; this.afterPush = afterPush; this.mapStatusLengths = mapStatusLengths; + final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId); + this.pushState = client.getPushState(mapKey); pushThread = ThreadUtils.newDaemonThread( @@ -194,6 +199,9 @@ public void checkException() throws IOException { if (exceptionRef.get() != null) { throw exceptionRef.get(); } + if (pushState.exception.get() != null) { + throw pushState.exception.get(); + } } protected void pushData(PushTask task) throws IOException { @@ -217,6 +225,7 @@ private void waitIdleQueueFullWithLock() throws InterruptedException { while (idleQueue != null && idleQueue.remainingCapacity() > 0 && exceptionRef.get() == null + && pushState.exception.get() == null && (pushThread != null && pushThread.isAlive())) { idleFull.await(WAIT_TIME_NANOS, TimeUnit.NANOSECONDS); } @@ -229,7 +238,9 @@ private void waitIdleQueueFullWithLock() throws InterruptedException { } protected boolean stillRunning() { - return !terminated && !Objects.nonNull(exceptionRef.get()); + return !terminated + && !Objects.nonNull(exceptionRef.get()) + && !Objects.nonNull(pushState.exception.get()); } public DataPushQueue getDataPushQueue() {