Skip to content

Commit

Permalink
added back custom executor (#274)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #274

Make executor wired up. There is no need to pass in an executor since we can reference that directly from the task.

Differential Revision: D31124244

fbshipit-source-id: e620d4e39141ec17449a1b1896be915327baecb8
  • Loading branch information
zzhao0 authored and facebook-github-bot committed Sep 23, 2021
1 parent e5861d6 commit 0c1cd0b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 22 deletions.
24 changes: 11 additions & 13 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,12 @@ BlockingState::BlockingState(
}

// static
void BlockingState::setResume(
std::shared_ptr<BlockingState> state,
folly::Executor* executor) {
void BlockingState::setResume(std::shared_ptr<BlockingState> state) {
VELOX_CHECK(!state->driver_->isOnThread());
auto& exec = folly::QueuedImmediateExecutor::instance();
std::move(state->future_)
.via(&exec)
.thenValue([state, executor](bool /* unused */) {
.thenValue([state](bool /* unused */) {
state->operator_->recordBlockingTime(state->sinceMicros_);
auto driver = state->driver_;
{
Expand Down Expand Up @@ -213,15 +211,15 @@ void Driver::testingJoinAndReinitializeExecutor(int32_t threads) {
}

// static
void Driver::enqueue(
std::shared_ptr<Driver> driver,
folly::Executor* executor) {
void Driver::enqueue(std::shared_ptr<Driver> driver) {
// This is expected to be called inside the Driver's CancelPool mutex.
VELOX_CHECK(!driver->state().isEnqueued);
driver->state().isEnqueued = true;
auto currentExecutor = (executor ? executor : Driver::executor());
currentExecutor->add(
[driver, currentExecutor]() { Driver::run(driver, currentExecutor); });
auto executor = driver->task_->queryCtx()->executor();
if (!executor) {
executor = Driver::executor();
}
executor->add([driver]() { Driver::run(driver); });
}

Driver::Driver(
Expand Down Expand Up @@ -449,20 +447,20 @@ core::StopReason Driver::runInternal(
}

// static
void Driver::run(std::shared_ptr<Driver> self, folly::Executor* executor) {
void Driver::run(std::shared_ptr<Driver> self) {
std::shared_ptr<BlockingState> blockingState;
auto reason = self->runInternal(self, &blockingState);
switch (reason) {
case core::StopReason::kBlock:
// Set the resume action outside of the CancelPool so that, if the
// future is already realized we do not have a second thread
// entering the same Driver.
BlockingState::setResume(blockingState, executor);
BlockingState::setResume(blockingState);
return;

case core::StopReason::kYield:
// Go to the end of the queue.
enqueue(self, executor);
enqueue(self);
return;

case core::StopReason::kPause:
Expand Down
12 changes: 3 additions & 9 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ class BlockingState {
Operator* FOLLY_NONNULL op,
BlockingReason reason);

static void setResume(
std::shared_ptr<BlockingState> state,
folly::Executor* FOLLY_NULLABLE executor = nullptr);
static void setResume(std::shared_ptr<BlockingState> state);

Operator* FOLLY_NONNULL op() {
return operator_;
Expand Down Expand Up @@ -121,13 +119,9 @@ class Driver {
static folly::CPUThreadPoolExecutor* FOLLY_NONNULL
executor(int32_t threads = 0);

static void run(
std::shared_ptr<Driver> self,
folly::Executor* FOLLY_NULLABLE executor = nullptr);
static void run(std::shared_ptr<Driver> self);

static void enqueue(
std::shared_ptr<Driver> instance,
folly::Executor* FOLLY_NULLABLE executor = nullptr);
static void enqueue(std::shared_ptr<Driver> instance);

// Waits for activity on 'executor_' to finish and then makes a new
// executor. Testing uses this to ensure that there are no live
Expand Down

0 comments on commit 0c1cd0b

Please sign in to comment.