Skip to content

Commit

Permalink
Showing 5 changed files with 338 additions and 212 deletions.
11 changes: 11 additions & 0 deletions presto-native-execution/presto_cpp/main/QueryContextManager.cpp
Original file line number Diff line number Diff line change
@@ -31,6 +31,13 @@ static std::shared_ptr<folly::CPUThreadPoolExecutor>& executor() {
return executor;
}

std::shared_ptr<folly::CPUThreadPoolExecutor>& httpProcessingExecutor() {
static auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(
SystemConfig::instance()->numQueryThreads(),
std::make_shared<folly::NamedThreadFactory>("HttpProcessing"));
return executor;
}

std::shared_ptr<folly::IOThreadPoolExecutor> spillExecutor() {
const int32_t numSpillThreads = SystemConfig::instance()->numSpillThreads();
if (numSpillThreads <= 0) {
@@ -46,6 +53,10 @@ folly::CPUThreadPoolExecutor* driverCPUExecutor() {
return executor().get();
}

folly::CPUThreadPoolExecutor* httpProcessingExecutorPtr() {
return httpProcessingExecutor().get();
}

folly::IOThreadPoolExecutor* spillExecutorPtr() {
return spillExecutor().get();
}
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
namespace facebook::presto {

folly::CPUThreadPoolExecutor* driverCPUExecutor();
folly::CPUThreadPoolExecutor* httpProcessingExecutorPtr();
folly::IOThreadPoolExecutor* spillExecutorPtr();

class QueryContextCache {
43 changes: 22 additions & 21 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
@@ -714,15 +714,14 @@ folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo(
std::optional<protocol::TaskState> currentState,
std::optional<protocol::Duration> maxWait,
std::shared_ptr<http::CallbackRequestHandlerState> state) {
auto eventBase = folly::EventBaseManager::get()->getEventBase();
auto [promise, future] =
folly::makePromiseContract<std::unique_ptr<protocol::TaskInfo>>();
auto prestoTask = findOrCreateTask(taskId);
if (!currentState || !maxWait) {
// Return current TaskInfo without waiting.
promise.setValue(
std::make_unique<protocol::TaskInfo>(prestoTask->updateInfo()));
return std::move(future).via(eventBase);
return std::move(future).via(httpProcessingExecutor_);
}

uint64_t maxWaitMicros =
@@ -738,8 +737,9 @@ folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo(
keepPromiseAlive(promiseHolder, state);
prestoTask->infoRequest = folly::to_weak_ptr(promiseHolder);

return std::move(future).via(eventBase).onTimeout(
std::chrono::microseconds(maxWaitMicros), [prestoTask]() {
return std::move(future)
.via(httpProcessingExecutor_)
.onTimeout(std::chrono::microseconds(maxWaitMicros), [prestoTask]() {
return std::make_unique<protocol::TaskInfo>(
prestoTask->updateInfo());
});
@@ -749,15 +749,15 @@ folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo(
if (currentState.value() != info.taskStatus.state ||
isFinalState(info.taskStatus.state)) {
promise.setValue(std::make_unique<protocol::TaskInfo>(info));
return std::move(future).via(eventBase);
return std::move(future).via(httpProcessingExecutor_);
}

auto promiseHolder =
std::make_shared<PromiseHolder<std::unique_ptr<protocol::TaskInfo>>>(
std::move(promise));

prestoTask->task->stateChangeFuture(maxWaitMicros)
.via(eventBase)
.via(httpProcessingExecutor_)
.thenValue([promiseHolder, prestoTask](auto&& /*done*/) {
promiseHolder->promise.setValue(
std::make_unique<protocol::TaskInfo>(prestoTask->updateInfo()));
@@ -769,7 +769,7 @@ folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo(
promiseHolder->promise.setValue(
std::make_unique<protocol::TaskInfo>(prestoTask->updateInfo()));
});
return std::move(future).via(eventBase);
return std::move(future).via(httpProcessingExecutor_);
}

folly::Future<std::unique_ptr<Result>> TaskManager::getResults(
@@ -805,7 +805,6 @@ folly::Future<std::unique_ptr<Result>> TaskManager::getResults(

auto timeoutFn = [this, token]() { return createTimeOutResult(token); };

auto eventBase = folly::EventBaseManager::get()->getEventBase();
try {
auto prestoTask = findOrCreateTask(taskId);

@@ -838,7 +837,7 @@ folly::Future<std::unique_ptr<Result>> TaskManager::getResults(
// If the task has finished, then send completion result.
if (prestoTask->task->state() == exec::kFinished) {
promiseHolder->promise.setValue(createCompleteResult(token));
return std::move(future).via(eventBase);
return std::move(future).via(httpProcessingExecutor_);
}
// If task is not running let the request timeout. The task may have
// failed at creation time and the coordinator hasn't yet caught up.
@@ -851,8 +850,9 @@ folly::Future<std::unique_ptr<Result>> TaskManager::getResults(
maxSize,
*bufferManager_);
}
return std::move(future).via(eventBase).onTimeout(
std::chrono::microseconds(maxWaitMicros), timeoutFn);
return std::move(future)
.via(httpProcessingExecutor_)
.onTimeout(std::chrono::microseconds(maxWaitMicros), timeoutFn);
}

std::lock_guard<std::mutex> l(prestoTask->mutex);
@@ -872,15 +872,16 @@ folly::Future<std::unique_ptr<Result>> TaskManager::getResults(
request->token = token;
request->maxSize = maxSize;
prestoTask->resultRequests.insert({destination, std::move(request)});
return std::move(future).via(eventBase).onTimeout(
std::chrono::microseconds(maxWaitMicros), timeoutFn);
return std::move(future)
.via(httpProcessingExecutor_)
.onTimeout(std::chrono::microseconds(maxWaitMicros), timeoutFn);
}
} catch (const velox::VeloxException& e) {
promiseHolder->promise.setException(e);
return std::move(future).via(eventBase);
return std::move(future).via(httpProcessingExecutor_);
} catch (const std::exception& e) {
promiseHolder->promise.setException(e);
return std::move(future).via(eventBase);
return std::move(future).via(httpProcessingExecutor_);
}
}

@@ -889,7 +890,6 @@ folly::Future<std::unique_ptr<protocol::TaskStatus>> TaskManager::getTaskStatus(
std::optional<protocol::TaskState> currentState,
std::optional<protocol::Duration> maxWait,
std::shared_ptr<http::CallbackRequestHandlerState> state) {
auto eventBase = folly::EventBaseManager::get()->getEventBase();
auto [promise, future] =
folly::makePromiseContract<std::unique_ptr<protocol::TaskStatus>>();

@@ -913,8 +913,9 @@ folly::Future<std::unique_ptr<protocol::TaskStatus>> TaskManager::getTaskStatus(

keepPromiseAlive(promiseHolder, state);
prestoTask->statusRequest = folly::to_weak_ptr(promiseHolder);
return std::move(future).via(eventBase).onTimeout(
std::chrono::microseconds(maxWaitMicros), [prestoTask]() {
return std::move(future)
.via(httpProcessingExecutor_)
.onTimeout(std::chrono::microseconds(maxWaitMicros), [prestoTask]() {
return std::make_unique<protocol::TaskStatus>(
prestoTask->updateStatus());
});
@@ -925,15 +926,15 @@ folly::Future<std::unique_ptr<protocol::TaskStatus>> TaskManager::getTaskStatus(

if (currentState.value() != status.state || isFinalState(status.state)) {
promise.setValue(std::make_unique<protocol::TaskStatus>(status));
return std::move(future).via(eventBase);
return std::move(future).via(httpProcessingExecutor_);
}

auto promiseHolder =
std::make_shared<PromiseHolder<std::unique_ptr<protocol::TaskStatus>>>(
std::move(promise));

prestoTask->task->stateChangeFuture(maxWaitMicros)
.via(eventBase)
.via(httpProcessingExecutor_)
.thenValue([promiseHolder, prestoTask](auto&& /*done*/) {
promiseHolder->promise.setValue(
std::make_unique<protocol::TaskStatus>(prestoTask->updateStatus()));
@@ -946,7 +947,7 @@ folly::Future<std::unique_ptr<protocol::TaskStatus>> TaskManager::getTaskStatus(
std::make_unique<protocol::TaskStatus>(
prestoTask->updateStatus()));
});
return std::move(future).via(eventBase);
return std::move(future).via(httpProcessingExecutor_);
}

void TaskManager::removeRemoteSource(
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/TaskManager.h
Original file line number Diff line number Diff line change
@@ -179,6 +179,7 @@ class TaskManager {
std::shared_ptr<velox::exec::OutputBufferManager> bufferManager_;
folly::Synchronized<TaskMap> taskMap_;
QueryContextManager queryContextManager_;
folly::Executor* httpProcessingExecutor_{httpProcessingExecutorPtr()};
};

} // namespace facebook::presto
494 changes: 303 additions & 191 deletions presto-native-execution/presto_cpp/main/TaskResource.cpp

Large diffs are not rendered by default.

0 comments on commit 6376fa0

Please sign in to comment.