diff --git a/presto-main/src/main/java/com/facebook/presto/execution/RemoteTask.java b/presto-main/src/main/java/com/facebook/presto/execution/RemoteTask.java index 0d18cab14ff5..c35cb4ffdacb 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/RemoteTask.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/RemoteTask.java @@ -39,7 +39,7 @@ public interface RemoteTask void start(); - void addSplits(Multimap splits); + boolean addSplits(Multimap splits); void noMoreSplits(PlanNodeId sourceId); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java b/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java index ad9ba6fff949..2fc74ffe4807 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java @@ -233,7 +233,7 @@ public void gracefulShutdown() isGracefulShutdownFinished.set(true); } - private ImmutableList getActiveTasks() + private synchronized ImmutableList getActiveTasks() { return tasks.stream().filter(taskHandle -> !taskHandle.isDestroyed() && !taskHandle.isShutdownInProgress()).collect(toImmutableList()); } @@ -261,17 +261,22 @@ private void gracefulShutdown(List currentTasksSnapshot) builderWithOutputBufferInfo("init", shuttingdownNode, taskHandle.getOutputBuffer()) .build()); - while (!isEligibleForGracefulShutdown(taskId)) { - TaskShutdownStats waitingForSplitStats = builderWithOutputBufferInfo(SPLIT_WAIT, shuttingdownNode, taskHandle.getOutputBuffer()) - .setPendingRunningSplitState(SPLIT_WAIT, System.nanoTime() - startTime) - .build(); - taskHandle.updateTaskShutdownState(waitingForSplitStats); - long currentTime = System.currentTimeMillis(); - if (currentTime - lastLogTime >= logFrequencyMillis) { - log.info("Num running splits for task %s = %s, Num blocked splits = %s", taskId, runningSplits.size(), blockedSplits.size()); - logRunningWaitingAndBlockedSplits(String.format("SplitView:state:%s for task %s", SPLIT_WAIT, taskId), taskId); + while (taskHandle.getRunningLeafSplits() > 0 || taskHandle.getRunningIntermediateSplits() > 0) { + try { + TaskShutdownStats waitingForSplitStats = builderWithOutputBufferInfo(SPLIT_WAIT, shuttingdownNode, taskHandle.getOutputBuffer()) + .setPendingRunningSplitState(SPLIT_WAIT, System.nanoTime() - startTime) + .build(); + taskHandle.updateTaskShutdownState(waitingForSplitStats); + long currentTime = System.currentTimeMillis(); + if (currentTime - lastLogTime >= logFrequencyMillis) { + log.info("Num running splits for task %s = %s, Num blocked splits = %s", taskId, runningSplits.size(), blockedSplits.size()); + logRunningWaitingAndBlockedSplits(String.format("SplitView:state:%s for task %s", SPLIT_WAIT, taskId), taskId); + } + Thread.sleep(waitTimeMillis); + } + catch (Exception ex) { + log.error(ex, "GracefulShutdown got interrupted while waiting for split completion for task %s", taskId); } - Thread.sleep(waitTimeMillis); } TaskShutdownStats waitingForSplitStats = builderWithOutputBufferInfo(SPLIT_WAIT_OVER, shuttingdownNode, taskHandle.getOutputBuffer()) @@ -300,7 +305,7 @@ private void gracefulShutdown(List currentTasksSnapshot) Thread.sleep(waitTimeMillis); } catch (InterruptedException e) { - log.error("GracefulShutdown got interrupted for task %s", taskId, e); + log.error(e, "GracefulShutdown got interrupted for task %s", taskId); } } outputBufferEmptyWaitTime.add(Duration.nanosSince(startTime)); @@ -591,8 +596,8 @@ public synchronized TaskHandle addTask( maxDriversPerTask, taskKillListener, outputBuffer); - tasks.add(taskHandle); + return taskHandle; } @@ -786,10 +791,8 @@ private synchronized void startIntermediateSplit(PrioritizedSplitRunner split) private synchronized void startSplit(PrioritizedSplitRunner split) { - if (!isShuttingDown()) { - allSplits.add(split); - waitingSplits.offer(split); - } + allSplits.add(split); + waitingSplits.offer(split); } private synchronized PrioritizedSplitRunner pollNextSplitWorker() @@ -855,9 +858,6 @@ public void run() Thread.currentThread().interrupt(); return; } - if (isGracefulShutdownStarted.get() && !split.isSplitAlreadyStarted()) { - continue; - } String threadId = split.getTaskHandle().getTaskId() + "-" + split.getSplitId(); try (SetThreadName splitName = new SetThreadName(threadId)) { @@ -1352,7 +1352,7 @@ public boolean isLowMemory() return this.lowMemory; } - public boolean isShuttingDown() + public boolean isShuttingDownStarted() { return isGracefulShutdownStarted.get(); } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskHandle.java b/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskHandle.java index 98ffd77ab46f..114ad4ff470c 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskHandle.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskHandle.java @@ -130,10 +130,10 @@ public synchronized List destroy() destroyed = true; ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(runningIntermediateSplits.size() + runningLeafSplits.size() + queuedLeafSplits.size()); - builder.addAll(runningIntermediateSplits); - builder.addAll(runningLeafSplits); - //FIXME hack to avoid queued split marked as completed splits to pollute the retryable splits + //To avoid queued split marked as completed splits to pollute the retryable splits if (isShuttingDown.get()) { + builder.addAll(runningIntermediateSplits); + builder.addAll(runningLeafSplits); builder.addAll(queuedLeafSplits); } runningIntermediateSplits.clear(); @@ -171,6 +171,11 @@ synchronized int getRunningLeafSplits() return runningLeafSplits.size(); } + synchronized int getRunningIntermediateSplits() + { + return runningIntermediateSplits.size(); + } + public synchronized long getScheduledNanos() { return priorityTracker.getScheduledNanos(); @@ -183,7 +188,7 @@ public synchronized PrioritizedSplitRunner pollNextSplit() } if (isShuttingDown.get()) { boolean isAnyQueuedSplitStarted = isAnySplitStarted(queuedLeafSplits); - checkState(!isAnyQueuedSplitStarted, "queued split contains started splits"); + checkState(!isAnyQueuedSplitStarted, String.format("queued split contains started splits for task %s", taskId)); return null; } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java index cb6fe6132a3d..f55187fa47d6 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java @@ -372,8 +372,8 @@ private StageScheduler createStageScheduler( for (int i = 0; i < activeRemoteTasks.size() && splits.hasNext(); i++) { HttpRemoteTask httpRemoteTask = activeRemoteTasks.get(i); List scheduledSplit = splits.next(); - log.warn("Going to retry splits %s of failed task %s on active task %s", scheduledSplit.stream().map(ScheduledSplit::getSequenceId).collect(toImmutableList()), failedTask, httpRemoteTask.getTaskId()); - log.warn("Retrying splits %s of failed task %s on active task %s", scheduledSplit.stream().map(split -> split.getSplit().getInfoMap()).collect(toImmutableList()), failedTask, httpRemoteTask.getTaskId()); + List splitIds = scheduledSplit.stream().map(ScheduledSplit::getSequenceId).collect(toImmutableList()); + log.warn("Going to retry splits %s of failed task %s on active task %s", splitIds, failedTask, httpRemoteTask.getTaskId()); Multimap splitsToAdd = HashMultimap.create(); splitsToAdd.putAll(planNodeId, scheduledSplit.stream().map(ScheduledSplit::getSplit).collect(toImmutableList())); //FIXME metric to get the retried split information, add time element to it (how long its taking for coordinator to detect the failure) @@ -387,7 +387,10 @@ private StageScheduler createStageScheduler( //track how many splits we are retrying from the source task splitRetryStats.addMetricValue(retryMetricName, RuntimeUnit.NONE, scheduledSplit.size()); session.getRuntimeStats().update(splitRetryStats); - httpRemoteTask.addSplits(splitsToAdd); + boolean isSplitAdded = httpRemoteTask.addSplits(splitsToAdd); + if (!isSplitAdded) { + throw new RuntimeException(String.format("Error adding split %s for retry to task %s", splitIds, httpRemoteTask.getTaskId())); + } } } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClient.java b/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClient.java index 725080e41e4a..186e0630ebb2 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClient.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClient.java @@ -15,6 +15,7 @@ import com.facebook.airlift.http.client.HttpClient; import com.facebook.airlift.http.client.HttpUriBuilder; +import com.facebook.airlift.log.Logger; import com.facebook.drift.client.DriftClient; import com.facebook.presto.execution.TaskId; import com.facebook.presto.memory.context.LocalMemoryContext; @@ -80,6 +81,7 @@ public class ExchangeClient implements Closeable { + private static final Logger log = Logger.get(ExchangeClient.class); private static final SerializedPage NO_MORE_PAGES = new SerializedPage(EMPTY_SLICE, PageCodecMarker.none(), 0, 0, 0); private static final ListenableFuture NOT_BLOCKED = immediateFuture(null); @@ -492,7 +494,7 @@ private synchronized void clientFailed(PageBufferClient client, Throwable cause) // TODO: properly handle the failed vs closed state // it is important not to treat failures as a successful close if (!isClosed()) { - //failure.compareAndSet(null, cause); + failure.compareAndSet(null, cause); notifyBlockedCallers(); } } @@ -540,9 +542,8 @@ public void clientFailed(PageBufferClient client, Throwable cause) { requireNonNull(client, "client is null"); requireNonNull(cause, "cause is null"); - - ExchangeClient.this.clientFinished(client); -// ExchangeClient.this.clientFailed(client, cause); + log.error(cause, "Exchange client failed"); + ExchangeClient.this.clientFailed(client, cause); } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java index bb8db93ef3b5..8762197370aa 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java @@ -461,13 +461,15 @@ public synchronized boolean isNoMoreSplits(PlanNodeId sourceId) } @Override - public synchronized void addSplits(Multimap splitsBySource) + public synchronized boolean addSplits(Multimap splitsBySource) { requireNonNull(splitsBySource, "splitsBySource is null"); // only add pending split if not done - if (getTaskStatus().getState().isDone()) { - return; + TaskState state = getTaskStatus().getState(); + if (state.isDone()) { + return false; + //throw new RuntimeException(String.format("Adding split to a task in terminal state, state= %s", state)); } boolean needsUpdate = false; @@ -504,6 +506,7 @@ public synchronized void addSplits(Multimap splitsBySource) this.needsUpdate.set(true); scheduleUpdate(); } + return true; } @Override diff --git a/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java b/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java index 6f98f7d4b1ec..db3fc8374141 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java @@ -430,13 +430,14 @@ public void start() } @Override - public void addSplits(Multimap splits) + public boolean addSplits(Multimap splits) { synchronized (this) { this.splits.putAll(splits); } updateTaskStats(); updateSplitQueueSpace(); + return true; } @Override diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskExecution.java b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskExecution.java index a94021af387e..13f1e71b13be 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskExecution.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskExecution.java @@ -1467,7 +1467,7 @@ public void testGracefulShutdown() taskExecutor.gracefulShutdown(); }, 1, MILLISECONDS); - waitUntilEquals(taskExecutor::isShuttingDown, true, ASSERT_WAIT_TIMEOUT); + waitUntilEquals(taskExecutor::isShuttingDownStarted, true, ASSERT_WAIT_TIMEOUT); // resume operator execution testingScanOperatorFactory.getPauser().resume(); diff --git a/presto-main/src/test/java/com/facebook/presto/execution/executor/TestTaskExecutor.java b/presto-main/src/test/java/com/facebook/presto/execution/executor/TestTaskExecutor.java index 8b50fb51a1b7..fa5f0a4d69a2 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/executor/TestTaskExecutor.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/executor/TestTaskExecutor.java @@ -29,10 +29,8 @@ import org.testng.annotations.Test; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.OptionalInt; -import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; @@ -367,7 +365,7 @@ public void testGracefulShutdown() taskExecutor.enqueueSplits(taskHandle, false, ImmutableList.of(driver)); } new Thread(() -> taskExecutor.gracefulShutdown()).start(); - while (!taskExecutor.isShuttingDown()) { + while (!taskExecutor.isShuttingDownStarted()) { MILLISECONDS.sleep(500); } assertEquals(taskHandle.getRunningLeafSplits(), 4); @@ -375,12 +373,6 @@ public void testGracefulShutdown() // let the split continue to run beginPhase.arriveAndDeregister(); verificationComplete.arriveAndDeregister(); - Collection> pendingSplits = gracefulShutdownSplitTracker.getPendingSplits().values(); - System.out.println(pendingSplits); - assertEquals(pendingSplits - .stream() - .mapToInt(Set::size) - .sum(), 26); } finally { taskExecutor.stop();