Skip to content

Commit

Permalink
Handle race condition prestodb#2
Browse files Browse the repository at this point in the history
  • Loading branch information
abhiseksaikia authored and MnO2 committed Oct 27, 2023
1 parent 99a6de9 commit ae2918f
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface RemoteTask

void start();

void addSplits(Multimap<PlanNodeId, Split> splits);
boolean addSplits(Multimap<PlanNodeId, Split> splits);

void noMoreSplits(PlanNodeId sourceId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void gracefulShutdown()
isGracefulShutdownFinished.set(true);
}

private ImmutableList<TaskHandle> getActiveTasks()
private synchronized ImmutableList<TaskHandle> getActiveTasks()
{
return tasks.stream().filter(taskHandle -> !taskHandle.isDestroyed() && !taskHandle.isShutdownInProgress()).collect(toImmutableList());
}
Expand Down Expand Up @@ -261,17 +261,22 @@ private void gracefulShutdown(List<TaskHandle> currentTasksSnapshot)
builderWithOutputBufferInfo("init", shuttingdownNode, taskHandle.getOutputBuffer())
.build());

while (!isEligibleForGracefulShutdown(taskId)) {
TaskShutdownStats waitingForSplitStats = builderWithOutputBufferInfo(SPLIT_WAIT, shuttingdownNode, taskHandle.getOutputBuffer())
.setPendingRunningSplitState(SPLIT_WAIT, System.nanoTime() - startTime)
.build();
taskHandle.updateTaskShutdownState(waitingForSplitStats);
long currentTime = System.currentTimeMillis();
if (currentTime - lastLogTime >= logFrequencyMillis) {
log.info("Num running splits for task %s = %s, Num blocked splits = %s", taskId, runningSplits.size(), blockedSplits.size());
logRunningWaitingAndBlockedSplits(String.format("SplitView:state:%s for task %s", SPLIT_WAIT, taskId), taskId);
while (taskHandle.getRunningLeafSplits() > 0 || taskHandle.getRunningIntermediateSplits() > 0) {
try {
TaskShutdownStats waitingForSplitStats = builderWithOutputBufferInfo(SPLIT_WAIT, shuttingdownNode, taskHandle.getOutputBuffer())
.setPendingRunningSplitState(SPLIT_WAIT, System.nanoTime() - startTime)
.build();
taskHandle.updateTaskShutdownState(waitingForSplitStats);
long currentTime = System.currentTimeMillis();
if (currentTime - lastLogTime >= logFrequencyMillis) {
log.info("Num running splits for task %s = %s, Num blocked splits = %s", taskId, runningSplits.size(), blockedSplits.size());
logRunningWaitingAndBlockedSplits(String.format("SplitView:state:%s for task %s", SPLIT_WAIT, taskId), taskId);
}
Thread.sleep(waitTimeMillis);
}
catch (Exception ex) {
log.error(ex, "GracefulShutdown got interrupted while waiting for split completion for task %s", taskId);
}
Thread.sleep(waitTimeMillis);
}

TaskShutdownStats waitingForSplitStats = builderWithOutputBufferInfo(SPLIT_WAIT_OVER, shuttingdownNode, taskHandle.getOutputBuffer())
Expand Down Expand Up @@ -300,7 +305,7 @@ private void gracefulShutdown(List<TaskHandle> currentTasksSnapshot)
Thread.sleep(waitTimeMillis);
}
catch (InterruptedException e) {
log.error("GracefulShutdown got interrupted for task %s", taskId, e);
log.error(e, "GracefulShutdown got interrupted for task %s", taskId);
}
}
outputBufferEmptyWaitTime.add(Duration.nanosSince(startTime));
Expand Down Expand Up @@ -591,8 +596,8 @@ public synchronized TaskHandle addTask(
maxDriversPerTask,
taskKillListener,
outputBuffer);

tasks.add(taskHandle);

return taskHandle;
}

Expand Down Expand Up @@ -786,10 +791,8 @@ private synchronized void startIntermediateSplit(PrioritizedSplitRunner split)

private synchronized void startSplit(PrioritizedSplitRunner split)
{
if (!isShuttingDown()) {
allSplits.add(split);
waitingSplits.offer(split);
}
allSplits.add(split);
waitingSplits.offer(split);
}

private synchronized PrioritizedSplitRunner pollNextSplitWorker()
Expand Down Expand Up @@ -855,9 +858,6 @@ public void run()
Thread.currentThread().interrupt();
return;
}
if (isGracefulShutdownStarted.get() && !split.isSplitAlreadyStarted()) {
continue;
}

String threadId = split.getTaskHandle().getTaskId() + "-" + split.getSplitId();
try (SetThreadName splitName = new SetThreadName(threadId)) {
Expand Down Expand Up @@ -1352,7 +1352,7 @@ public boolean isLowMemory()
return this.lowMemory;
}

public boolean isShuttingDown()
public boolean isShuttingDownStarted()
{
return isGracefulShutdownStarted.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ public synchronized List<PrioritizedSplitRunner> destroy()
destroyed = true;

ImmutableList.Builder<PrioritizedSplitRunner> builder = ImmutableList.builderWithExpectedSize(runningIntermediateSplits.size() + runningLeafSplits.size() + queuedLeafSplits.size());
builder.addAll(runningIntermediateSplits);
builder.addAll(runningLeafSplits);
//FIXME hack to avoid queued split marked as completed splits to pollute the retryable splits
//To avoid queued split marked as completed splits to pollute the retryable splits
if (isShuttingDown.get()) {
builder.addAll(runningIntermediateSplits);
builder.addAll(runningLeafSplits);
builder.addAll(queuedLeafSplits);
}
runningIntermediateSplits.clear();
Expand Down Expand Up @@ -171,6 +171,11 @@ synchronized int getRunningLeafSplits()
return runningLeafSplits.size();
}

synchronized int getRunningIntermediateSplits()
{
return runningIntermediateSplits.size();
}

public synchronized long getScheduledNanos()
{
return priorityTracker.getScheduledNanos();
Expand All @@ -183,7 +188,7 @@ public synchronized PrioritizedSplitRunner pollNextSplit()
}
if (isShuttingDown.get()) {
boolean isAnyQueuedSplitStarted = isAnySplitStarted(queuedLeafSplits);
checkState(!isAnyQueuedSplitStarted, "queued split contains started splits");
checkState(!isAnyQueuedSplitStarted, String.format("queued split contains started splits for task %s", taskId));
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ private StageScheduler createStageScheduler(
for (int i = 0; i < activeRemoteTasks.size() && splits.hasNext(); i++) {
HttpRemoteTask httpRemoteTask = activeRemoteTasks.get(i);
List<ScheduledSplit> scheduledSplit = splits.next();
log.warn("Going to retry splits %s of failed task %s on active task %s", scheduledSplit.stream().map(ScheduledSplit::getSequenceId).collect(toImmutableList()), failedTask, httpRemoteTask.getTaskId());
log.warn("Retrying splits %s of failed task %s on active task %s", scheduledSplit.stream().map(split -> split.getSplit().getInfoMap()).collect(toImmutableList()), failedTask, httpRemoteTask.getTaskId());
List<Long> splitIds = scheduledSplit.stream().map(ScheduledSplit::getSequenceId).collect(toImmutableList());
log.warn("Going to retry splits %s of failed task %s on active task %s", splitIds, failedTask, httpRemoteTask.getTaskId());
Multimap<PlanNodeId, Split> splitsToAdd = HashMultimap.create();
splitsToAdd.putAll(planNodeId, scheduledSplit.stream().map(ScheduledSplit::getSplit).collect(toImmutableList()));
//FIXME metric to get the retried split information, add time element to it (how long its taking for coordinator to detect the failure)
Expand All @@ -387,7 +387,10 @@ private StageScheduler createStageScheduler(
//track how many splits we are retrying from the source task
splitRetryStats.addMetricValue(retryMetricName, RuntimeUnit.NONE, scheduledSplit.size());
session.getRuntimeStats().update(splitRetryStats);
httpRemoteTask.addSplits(splitsToAdd);
boolean isSplitAdded = httpRemoteTask.addSplits(splitsToAdd);
if (!isSplitAdded) {
throw new RuntimeException(String.format("Error adding split %s for retry to task %s", splitIds, httpRemoteTask.getTaskId()));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.log.Logger;
import com.facebook.drift.client.DriftClient;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.memory.context.LocalMemoryContext;
Expand Down Expand Up @@ -80,6 +81,7 @@
public class ExchangeClient
implements Closeable
{
private static final Logger log = Logger.get(ExchangeClient.class);
private static final SerializedPage NO_MORE_PAGES = new SerializedPage(EMPTY_SLICE, PageCodecMarker.none(), 0, 0, 0);
private static final ListenableFuture<?> NOT_BLOCKED = immediateFuture(null);

Expand Down Expand Up @@ -492,7 +494,7 @@ private synchronized void clientFailed(PageBufferClient client, Throwable cause)
// TODO: properly handle the failed vs closed state
// it is important not to treat failures as a successful close
if (!isClosed()) {
//failure.compareAndSet(null, cause);
failure.compareAndSet(null, cause);
notifyBlockedCallers();
}
}
Expand Down Expand Up @@ -540,9 +542,8 @@ public void clientFailed(PageBufferClient client, Throwable cause)
{
requireNonNull(client, "client is null");
requireNonNull(cause, "cause is null");

ExchangeClient.this.clientFinished(client);
// ExchangeClient.this.clientFailed(client, cause);
log.error(cause, "Exchange client failed");
ExchangeClient.this.clientFailed(client, cause);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,15 @@ public synchronized boolean isNoMoreSplits(PlanNodeId sourceId)
}

@Override
public synchronized void addSplits(Multimap<PlanNodeId, Split> splitsBySource)
public synchronized boolean addSplits(Multimap<PlanNodeId, Split> splitsBySource)
{
requireNonNull(splitsBySource, "splitsBySource is null");

// only add pending split if not done
if (getTaskStatus().getState().isDone()) {
return;
TaskState state = getTaskStatus().getState();
if (state.isDone()) {
return false;
//throw new RuntimeException(String.format("Adding split to a task in terminal state, state= %s", state));
}

boolean needsUpdate = false;
Expand Down Expand Up @@ -504,6 +506,7 @@ public synchronized void addSplits(Multimap<PlanNodeId, Split> splitsBySource)
this.needsUpdate.set(true);
scheduleUpdate();
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,14 @@ public void start()
}

@Override
public void addSplits(Multimap<PlanNodeId, Split> splits)
public boolean addSplits(Multimap<PlanNodeId, Split> splits)
{
synchronized (this) {
this.splits.putAll(splits);
}
updateTaskStats();
updateSplitQueueSpace();
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,7 @@ public void testGracefulShutdown()
taskExecutor.gracefulShutdown();
}, 1, MILLISECONDS);

waitUntilEquals(taskExecutor::isShuttingDown, true, ASSERT_WAIT_TIMEOUT);
waitUntilEquals(taskExecutor::isShuttingDownStarted, true, ASSERT_WAIT_TIMEOUT);

// resume operator execution
testingScanOperatorFactory.getPauser().resume();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -367,20 +365,14 @@ public void testGracefulShutdown()
taskExecutor.enqueueSplits(taskHandle, false, ImmutableList.of(driver));
}
new Thread(() -> taskExecutor.gracefulShutdown()).start();
while (!taskExecutor.isShuttingDown()) {
while (!taskExecutor.isShuttingDownStarted()) {
MILLISECONDS.sleep(500);
}
assertEquals(taskHandle.getRunningLeafSplits(), 4);
assertEquals(taskHandle.getQueuedSplitSize(), 26);
// let the split continue to run
beginPhase.arriveAndDeregister();
verificationComplete.arriveAndDeregister();
Collection<Set<Long>> pendingSplits = gracefulShutdownSplitTracker.getPendingSplits().values();
System.out.println(pendingSplits);
assertEquals(pendingSplits
.stream()
.mapToInt(Set::size)
.sum(), 26);
}
finally {
taskExecutor.stop();
Expand Down

0 comments on commit ae2918f

Please sign in to comment.