diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 8b582b0a0bab..5b4871346b21 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -109,14 +109,12 @@ BlockingState::BlockingState( } // static -void BlockingState::setResume( - std::shared_ptr state, - folly::Executor* executor) { +void BlockingState::setResume(std::shared_ptr state) { VELOX_CHECK(!state->driver_->isOnThread()); auto& exec = folly::QueuedImmediateExecutor::instance(); std::move(state->future_) .via(&exec) - .thenValue([state, executor](bool /* unused */) { + .thenValue([state](bool /* unused */) { state->operator_->recordBlockingTime(state->sinceMicros_); auto driver = state->driver_; { @@ -215,15 +213,16 @@ void Driver::testingJoinAndReinitializeExecutor(int32_t threads) { } // static -void Driver::enqueue( - std::shared_ptr driver, - folly::Executor* executor) { +void Driver::enqueue(std::shared_ptr driver) { // This is expected to be called inside the Driver's CancelPool mutex. VELOX_CHECK(!driver->state().isEnqueued); driver->state().isEnqueued = true; - auto currentExecutor = (executor ? executor : Driver::executor()); - currentExecutor->add( - [driver, currentExecutor]() { Driver::run(driver, currentExecutor); }); + auto& task = driver->task_; + auto executor = task ? task->queryCtx()->executor() : nullptr; + if (!executor) { + executor = Driver::executor(); + } + executor->add([driver]() { Driver::run(driver); }); } Driver::Driver( @@ -451,7 +450,7 @@ core::StopReason Driver::runInternal( } // static -void Driver::run(std::shared_ptr self, folly::Executor* executor) { +void Driver::run(std::shared_ptr self) { std::shared_ptr blockingState; auto reason = self->runInternal(self, &blockingState); switch (reason) { @@ -459,12 +458,12 @@ void Driver::run(std::shared_ptr self, folly::Executor* executor) { // Set the resume action outside of the CancelPool so that, if the // future is already realized we do not have a second thread // entering the same Driver. - BlockingState::setResume(blockingState, executor); + BlockingState::setResume(blockingState); return; case core::StopReason::kYield: // Go to the end of the queue. - enqueue(self, executor); + enqueue(self); return; case core::StopReason::kPause: diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index d007e77054ed..9c728c9a96e8 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -47,9 +47,7 @@ class BlockingState { Operator* FOLLY_NONNULL op, BlockingReason reason); - static void setResume( - std::shared_ptr state, - folly::Executor* FOLLY_NULLABLE executor = nullptr); + static void setResume(std::shared_ptr state); Operator* FOLLY_NONNULL op() { return operator_; @@ -105,13 +103,9 @@ class Driver { static folly::CPUThreadPoolExecutor* FOLLY_NONNULL executor(int32_t threads = 0); - static void run( - std::shared_ptr self, - folly::Executor* FOLLY_NULLABLE executor = nullptr); + static void run(std::shared_ptr self); - static void enqueue( - std::shared_ptr instance, - folly::Executor* FOLLY_NULLABLE executor = nullptr); + static void enqueue(std::shared_ptr instance); // Waits for activity on 'executor_' to finish and then makes a new // executor. Testing uses this to ensure that there are no live