Skip to content

Commit

Permalink
added back custom executor (#274)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #274

Make executor wired up. There is no need to pass in an executor since we can reference that directly from the task.

Differential Revision: D31124244

fbshipit-source-id: eb434e9136dd57f84c283e8bf3e34685ac025a57
  • Loading branch information
zzhao0 authored and facebook-github-bot committed Sep 29, 2021
1 parent 031066c commit 6315bf6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 22 deletions.
25 changes: 12 additions & 13 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,12 @@ BlockingState::BlockingState(
}

// static
void BlockingState::setResume(
std::shared_ptr<BlockingState> state,
folly::Executor* executor) {
void BlockingState::setResume(std::shared_ptr<BlockingState> state) {
VELOX_CHECK(!state->driver_->isOnThread());
auto& exec = folly::QueuedImmediateExecutor::instance();
std::move(state->future_)
.via(&exec)
.thenValue([state, executor](bool /* unused */) {
.thenValue([state](bool /* unused */) {
state->operator_->recordBlockingTime(state->sinceMicros_);
auto driver = state->driver_;
{
Expand Down Expand Up @@ -211,15 +209,16 @@ void Driver::testingJoinAndReinitializeExecutor(int32_t threads) {
}

// static
void Driver::enqueue(
std::shared_ptr<Driver> driver,
folly::Executor* executor) {
void Driver::enqueue(std::shared_ptr<Driver> driver) {
// This is expected to be called inside the Driver's CancelPool mutex.
VELOX_CHECK(!driver->state().isEnqueued);
driver->state().isEnqueued = true;
auto currentExecutor = (executor ? executor : Driver::executor());
currentExecutor->add(
[driver, currentExecutor]() { Driver::run(driver, currentExecutor); });
auto& task = driver->task_;
auto executor = task ? task->queryCtx()->executor() : nullptr;
if (!executor) {
executor = Driver::executor();
}
executor->add([driver]() { Driver::run(driver); });
}

Driver::Driver(
Expand Down Expand Up @@ -447,20 +446,20 @@ core::StopReason Driver::runInternal(
}

// static
void Driver::run(std::shared_ptr<Driver> self, folly::Executor* executor) {
void Driver::run(std::shared_ptr<Driver> self) {
std::shared_ptr<BlockingState> blockingState;
auto reason = self->runInternal(self, &blockingState);
switch (reason) {
case core::StopReason::kBlock:
// Set the resume action outside of the CancelPool so that, if the
// future is already realized we do not have a second thread
// entering the same Driver.
BlockingState::setResume(blockingState, executor);
BlockingState::setResume(blockingState);
return;

case core::StopReason::kYield:
// Go to the end of the queue.
enqueue(self, executor);
enqueue(self);
return;

case core::StopReason::kPause:
Expand Down
12 changes: 3 additions & 9 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ class BlockingState {
Operator* FOLLY_NONNULL op,
BlockingReason reason);

static void setResume(
std::shared_ptr<BlockingState> state,
folly::Executor* FOLLY_NULLABLE executor = nullptr);
static void setResume(std::shared_ptr<BlockingState> state);

Operator* FOLLY_NONNULL op() {
return operator_;
Expand Down Expand Up @@ -101,13 +99,9 @@ class Driver {
static folly::CPUThreadPoolExecutor* FOLLY_NONNULL
executor(int32_t threads = 0);

static void run(
std::shared_ptr<Driver> self,
folly::Executor* FOLLY_NULLABLE executor = nullptr);
static void run(std::shared_ptr<Driver> self);

static void enqueue(
std::shared_ptr<Driver> instance,
folly::Executor* FOLLY_NULLABLE executor = nullptr);
static void enqueue(std::shared_ptr<Driver> instance);

// Waits for activity on 'executor_' to finish and then makes a new
// executor. Testing uses this to ensure that there are no live
Expand Down

0 comments on commit 6315bf6

Please sign in to comment.