Skip to content

Commit

Permalink
[native]Send completed get data result if velox task has finished
Browse files Browse the repository at this point in the history
The following race condition can cause table writer hang when scale
writer is enabled

T1 coordinator schedule a new task writer
T2 one or any source task has produced all the data and all have been
consumed. It triggers task to complete
T3 the new task writer tries to fetch data from the closed source,
and find the source
task is not running, then wait for timeout and return an empty result
to retry from the writer task
T4 the new task writer keeps retrying and after the presto task has been
reclaimed, the new data fetch will triggers a new presto task creation
and continue retrying with empty result.

The short-term fix for this is to return completion result in T3 if the
velox task is in finished state instead of returning empty result.
A completed task will stay in one-minute which is sufficient long in
practice to handle this race condition. Have verified in Meta internal
testing.
  • Loading branch information
xiaoxmeng committed Oct 12, 2023
1 parent c4df72a commit deafa02
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ std::unique_ptr<Result> createTimeOutResult(long token) {
return result;
}

std::unique_ptr<Result> createCompleteResult(long token) {
auto result = std::make_unique<Result>();
result->sequence = result->nextSequence = token;
result->data = folly::IOBuf::create(0);
result->complete = true;
return result;
}

void getData(
PromiseHolderPtr<std::unique_ptr<Result>> promiseHolder,
const TaskId& taskId,
Expand Down Expand Up @@ -822,6 +830,11 @@ folly::Future<std::unique_ptr<Result>> TaskManager::getResults(

for (;;) {
if (prestoTask->taskStarted) {
// If the task has finished, then send completion result.
if (prestoTask->task->state() == exec::kFinished) {
promiseHolder->promise.setValue(createCompleteResult(token));
return std::move(future).via(eventBase);
}
// If task is not running let the request timeout. The task may have
// failed at creation time and the coordinator hasn't yet caught up.
if (prestoTask->task->state() == exec::kRunning) {
Expand All @@ -831,6 +844,7 @@ folly::Future<std::unique_ptr<Result>> TaskManager::getResults(
return std::move(future).via(eventBase).onTimeout(
std::chrono::microseconds(maxWaitMicros), timeoutFn);
}

std::lock_guard<std::mutex> l(prestoTask->mutex);
if (prestoTask->taskStarted) {
continue;
Expand Down

0 comments on commit deafa02

Please sign in to comment.