Skip to content

Commit

Permalink
[native pos] Use RequestErrorTracker to retry task update requests
Browse files Browse the repository at this point in the history
To provide additional resilience and consistent errors
  • Loading branch information
arhimondr committed Nov 7, 2023
1 parent b0542ce commit b20986f
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public BaseResponse<T> handle(Request request, Response response)
});
}

public ListenableFuture<BaseResponse<TaskInfo>> updateTask(
public BaseResponse<TaskInfo> updateTask(
List<TaskSource> sources,
PlanFragment planFragment,
TableWriteInfo tableWriteInfo,
Expand All @@ -213,7 +213,7 @@ public ListenableFuture<BaseResponse<TaskInfo>> updateTask(
URI batchTaskUri = uriBuilderFrom(taskUri)
.appendPath("batch")
.build();
return httpClient.executeAsync(
return httpClient.execute(
setContentTypeHeaders(false, preparePost())
.setUri(batchTaskUri)
.setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestCodec.toBytes(batchTaskUpdateRequest)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,29 @@
import com.facebook.presto.execution.TaskSource;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.server.RequestErrorTracker;
import com.facebook.presto.server.smile.BaseResponse;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient;
import com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskInfoFetcher;
import com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher;
import com.facebook.presto.spi.PrestoTransportException;
import com.facebook.presto.spi.page.SerializedPage;
import com.facebook.presto.spi.security.TokenAuthenticator;
import com.facebook.presto.sql.planner.PlanFragment;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue;
import static com.facebook.presto.execution.TaskState.ABORTED;
import static com.facebook.presto.execution.TaskState.CANCELED;
import static com.facebook.presto.execution.TaskState.FAILED;
import static com.facebook.presto.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static com.facebook.presto.spi.StandardErrorCode.NATIVE_EXECUTION_TASK_ERROR;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -69,6 +72,9 @@ public class NativeExecutionTask
private final Optional<String> broadcastBasePath;
private final List<TaskSource> sources;
private final Executor executor;

private final ScheduledExecutorService errorRetryScheduledExecutor;
private final Duration remoteTaskMaxErrorDuration;
private final HttpNativeExecutionTaskInfoFetcher taskInfoFetcher;
// Results will be fetched only if not written to shuffle.
private final Optional<HttpNativeExecutionTaskResultFetcher> taskResultFetcher;
Expand Down Expand Up @@ -96,26 +102,28 @@ public NativeExecutionTask(
this.broadcastBasePath = requireNonNull(broadcastBasePath, "broadcastBasePath is null");
this.sources = requireNonNull(sources, "sources is null");
this.executor = requireNonNull(executor, "executor is null");
this.errorRetryScheduledExecutor = requireNonNull(errorRetryScheduledExecutor, "errorRetryScheduledExecutor is null");
this.workerClient = requireNonNull(workerClient, "workerClient is null");
this.outputBuffers = createInitialEmptyOutputBuffers(planFragment.getPartitioningScheme().getPartitioning().getHandle()).withNoMoreBufferIds();
requireNonNull(taskManagerConfig, "taskManagerConfig is null");
requireNonNull(updateScheduledExecutor, "updateScheduledExecutor is null");
requireNonNull(errorRetryScheduledExecutor, "errorRetryScheduledExecutor is null");
this.remoteTaskMaxErrorDuration = queryManagerConfig.getRemoteTaskMaxErrorDuration();
this.taskInfoFetcher = new HttpNativeExecutionTaskInfoFetcher(
updateScheduledExecutor,
errorRetryScheduledExecutor,
this.workerClient,
this.executor,
taskManagerConfig.getInfoUpdateInterval(),
queryManagerConfig.getRemoteTaskMaxErrorDuration(),
remoteTaskMaxErrorDuration,
taskFinishedOrHasResult);
if (!shuffleWriteInfo.isPresent()) {
this.taskResultFetcher = Optional.of(new HttpNativeExecutionTaskResultFetcher(
updateScheduledExecutor,
errorRetryScheduledExecutor,
this.workerClient,
this.executor,
queryManagerConfig.getRemoteTaskMaxErrorDuration(),
remoteTaskMaxErrorDuration,
taskFinishedOrHasResult));
}
else {
Expand Down Expand Up @@ -196,28 +204,51 @@ public void stop(boolean success)

private TaskInfo sendUpdateRequest()
{
try {
ListenableFuture<BaseResponse<TaskInfo>> future = workerClient.updateTask(
sources,
planFragment,
tableWriteInfo,
shuffleWriteInfo,
broadcastBasePath,
session,
outputBuffers);
BaseResponse<TaskInfo> response = future.get();
if (response.hasValue()) {
return response.getValue();
RequestErrorTracker errorTracker = new RequestErrorTracker(
"NativeExecution",
workerClient.getLocation(),
NATIVE_EXECUTION_TASK_ERROR,
"sendUpdateRequest encountered too many errors talking to native process",
remoteTaskMaxErrorDuration,
errorRetryScheduledExecutor,
"sending update request to native process");

while (true) {
getFutureValue(errorTracker.acquireRequestPermit());
try {
errorTracker.startRequest();
BaseResponse<TaskInfo> response = doSendUpdateRequest();
errorTracker.requestSucceeded();
if (response.hasValue()) {
return response.getValue();
}
else {
String message = String.format("Create-or-update task request didn't return a result. %s: %s",
HttpStatus.fromStatusCode(response.getStatusCode()),
response.getStatusMessage());
throw new IllegalStateException(message);
}
}
else {
String message = String.format("Create-or-update task request didn't return a result. %s: %s",
HttpStatus.fromStatusCode(response.getStatusCode()),
response.getStatusMessage());
throw new IllegalStateException(message);
catch (RuntimeException e) {
errorTracker.requestFailed(e);
}
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

private BaseResponse<TaskInfo> doSendUpdateRequest()
{
return workerClient.updateTask(
sources,
planFragment,
tableWriteInfo,
shuffleWriteInfo,
broadcastBasePath,
session,
outputBuffers);
}

public static boolean isNativeExecutionTaskError(RuntimeException ex)
{
return ex instanceof PrestoTransportException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void testUpdateTask()
PrestoSparkHttpTaskClient workerClient = createWorkerClient(taskId);

List<TaskSource> sources = new ArrayList<>();
ListenableFuture<BaseResponse<TaskInfo>> future = workerClient.updateTask(
BaseResponse<TaskInfo> response = workerClient.updateTask(
sources,
createPlanFragment(),
new TableWriteInfo(Optional.empty(), Optional.empty(), Optional.empty()),
Expand All @@ -244,7 +244,7 @@ public void testUpdateTask()
createInitialEmptyOutputBuffers(PARTITIONED));

try {
TaskInfo taskInfo = future.get().getValue();
TaskInfo taskInfo = response.getValue();
assertEquals(taskInfo.getTaskId().toString(), taskId.toString());
}
catch (Exception e) {
Expand Down

0 comments on commit b20986f

Please sign in to comment.