From a35fbc668f8c60587ef44d4b4ae2787a84a982c3 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Wed, 16 Mar 2022 11:48:33 -0700 Subject: [PATCH] Fix race condition in Driver::runInternal (#1219) Summary: There was a race condition that caused ASAN heap-use-after-free in Driver::runInternal. It is possible for the Task to terminate (due to an error) while Driver is off-thread sitting in the executor's queue waiting to be scheduled to run on a thread. If this case, Task may have been terminated and operators have been destroyed just before Driver::runInternal executes. In this case `operators_[curOpIndex_]->stats().addRuntimeStat ("queuedWallNanos", ...);` call would try to access freed memory. The fix is to (1) streamline Driver by removing task_ member variable and use DriverCtx::task instead; (2) move logic for updating stats after the call to task->enter(). If task has terminated, enter() will return StopReason::kTerminate and no Driver will abort any further processing. ```AddressSanitizer: heap-use-after-free #6 0x7f01c29ae9b9 in facebook::velox::exec::OperatorStats::addRuntimeStat(std::__cxx11::basic_string, std::allocator > const&, long) velox/exec/Operator.h:144 #7 0x7f01c29afaf7 in facebook::velox::exec::Driver::runInternal(std::shared_ptr&, std::shared_ptr*) velox/exec/Driver.cpp:249 #8 0x7f01c29b4174 in facebook::velox::exec::Driver::run(std::shared_ptr) velox/exec/Driver.cpp:415 #9 0x7f01c29cce5f in facebook::velox::exec::Driver::enqueue(std::shared_ptr)::$_0::operator()() const velox/exec/Driver.cpp:161 ``` Pull Request resolved: https://github.com/facebookincubator/velox/pull/1219 Test Plan: $ buck test mode/dev-asan //presto_cpp/main/tests:presto_main_test -- --exact 'presto_cpp/main/tests:presto_main_test - TaskManagerTest.outOfQueryUserMemory' --run-disabled --stress-runs 1000 ``` Summary Pass: 1000 ListingSuccess: 1 Finished test run: https://www.internalfb.com/intern/testinfra/testrun/1125900137408748 ``` Imported from GitHub, without a `Test Plan:` line. Reviewed By: spershin, oerling Differential Revision: D34904220 Pulled By: mbasmanova fbshipit-source-id: d4b88cf9e3ec2884c77371c37c571297a59728b6 --- velox/exec/Driver.cpp | 68 +++++++++++++++---------------------- velox/exec/Driver.h | 17 ++++------ velox/exec/Task.cpp | 78 +++++++++++++++++++------------------------ 3 files changed, 66 insertions(+), 97 deletions(-) diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 41f51857996c6..a538a9a653bb4 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -142,31 +142,18 @@ class CancelGuard { }; } // namespace -Driver::~Driver() { - if (task_) { - LOG(ERROR) << "Driver destructed while still in Task: " - << task_->toString(); - DLOG(FATAL) << "Driver destructed while referencing task"; - } -} - // static void Driver::enqueue(std::shared_ptr driver) { // This is expected to be called inside the Driver's Tasks's mutex. driver->enqueueInternal(); - auto& task = driver->task_; - if (!task) { - return; - } - task->queryCtx()->executor()->add([driver]() { Driver::run(driver); }); + driver->task()->queryCtx()->executor()->add( + [driver]() { Driver::run(driver); }); } Driver::Driver( std::unique_ptr ctx, std::vector>&& operators) - : ctx_(std::move(ctx)), - task_(ctx_->task), - operators_(std::move(operators)) { + : ctx_(std::move(ctx)), operators_(std::move(operators)) { curOpIndex_ = operators_.size() - 1; // Operators need access to their Driver for adaptation. ctx_->driver = this; @@ -244,22 +231,17 @@ void Driver::enqueueInternal() { StopReason Driver::runInternal( std::shared_ptr& self, std::shared_ptr* blockingState) { - // Update the next operator's queueTime. - if (curOpIndex_ < operators_.size()) { - operators_[curOpIndex_]->stats().addRuntimeStat( - "queuedWallNanos", - (getCurrentTimeMicro() - queueTimeStartMicros_) * 1'000); - } - // Get 'task_' into a local because this could be unhooked from it on another - // thread. - auto task = task_; - auto stop = !task ? StopReason::kTerminate : task->enter(state_); + const auto queuedWallNanos = + (getCurrentTimeMicro() - queueTimeStartMicros_) * 1'000; + + auto& task = ctx_->task; + auto stop = task->enter(state_); if (stop != StopReason::kNone) { if (stop == StopReason::kTerminate) { // ctx_ still has a reference to the Task. 'this' is not on // thread from the Task's viewpoint, hence no need to call // close(). - ctx_->task->setError(std::make_exception_ptr(VeloxRuntimeError( + task->setError(std::make_exception_ptr(VeloxRuntimeError( __FILE__, __LINE__, __FUNCTION__, @@ -272,7 +254,13 @@ StopReason Driver::runInternal( return stop; } - CancelGuard guard(task_.get(), &state_, [&](StopReason reason) { + // Update the next operator's queueTime. + if (curOpIndex_ < operators_.size()) { + operators_[curOpIndex_]->stats().addRuntimeStat( + "queuedWallNanos", queuedWallNanos); + } + + CancelGuard guard(task.get(), &state_, [&](StopReason reason) { // This is run on error or cancel exit. if (reason == StopReason::kTerminate) { task->setError(std::make_exception_ptr(VeloxRuntimeError( @@ -299,7 +287,7 @@ StopReason Driver::runInternal( for (;;) { for (int32_t i = numOperators - 1; i >= 0; --i) { - stop = task_->shouldStop(); + stop = task->shouldStop(); if (stop != StopReason::kNone) { guard.notThrown(); return stop; @@ -354,7 +342,7 @@ StopReason Driver::runInternal( i += 2; continue; } else { - stop = task_->shouldStop(); + stop = task->shouldStop(); if (stop != StopReason::kNone) { guard.notThrown(); return stop; @@ -399,11 +387,11 @@ StopReason Driver::runInternal( } } } catch (velox::VeloxException& e) { - task_->setError(std::current_exception()); + task->setError(std::current_exception()); // The CancelPoolGuard will close 'self' and remove from task_. return StopReason::kAlreadyTerminated; } catch (std::exception& e) { - task_->setError(std::current_exception()); + task->setError(std::current_exception()); // The CancelGuard will close 'self' and remove from task_. return StopReason::kAlreadyTerminated; } @@ -454,15 +442,18 @@ void Driver::addStatsToTask() { for (auto& op : operators_) { auto& stats = op->stats(); stats.memoryStats.update(op->pool()->getMemoryUsageTracker()); - task_->addOperatorStats(stats); + ctx_->task->addOperatorStats(stats); } } void Driver::close() { - if (!task_) { + if (closed_) { // Already closed. return; } + + closed_ = true; + if (!isOnThread() && !isTerminated()) { LOG(FATAL) << "Driver::close is only allowed from the Driver's thread"; } @@ -470,9 +461,7 @@ void Driver::close() { for (auto& op : operators_) { op->close(); } - auto task = std::move(task_); - - Task::removeDriver(task, this); + Task::removeDriver(ctx_->task, this); } void Driver::closeByTask() { @@ -481,11 +470,6 @@ void Driver::closeByTask() { for (auto& op : operators_) { op->close(); } - task_ = nullptr; -} - -void Driver::disconnectFromTask() { - task_ = nullptr; } bool Driver::mayPushdownAggregation(Operator* aggregation) const { diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 7d9a21e52ccf9..746cea0ca686d 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -189,8 +189,6 @@ class Driver { std::unique_ptr driverCtx, std::vector>&& operators); - ~Driver(); - static void run(std::shared_ptr self); static void enqueue(std::shared_ptr instance); @@ -236,18 +234,14 @@ class Driver { return ctx_.get(); } - std::shared_ptr task() const { - return task_; + const std::shared_ptr& task() const { + return ctx_->task; } - // Updates the stats in 'task_' and frees resources. Only called by Task for + // Updates the stats in Task and frees resources. Only called by Task for // closing non-running Drivers. void closeByTask(); - // This is called if the creation of drivers failed and we want to disconnect - // driver from the task before driver's destruction. - void disconnectFromTask(); - private: void enqueueInternal(); @@ -262,9 +256,10 @@ class Driver { void pushdownFilters(int operatorIndex); std::unique_ptr ctx_; - std::shared_ptr task_; - // Set via Task_ and serialized by 'task_'s mutex. + std::atomic_bool closed_{false}; + + // Set via Task and serialized by Task's mutex. ThreadState state_; // Timer used to track down the time we are sitting in the driver queue. diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index df216bbc90871..0a3fa8cd9ed7b 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -326,52 +326,42 @@ void Task::createDriversLocked( std::shared_ptr& self, uint32_t splitGroupId, std::vector>& out) { - try { - auto& splitGroupState = self->splitGroupStates_[splitGroupId]; - const auto numPipelines = driverFactories_.size(); - for (auto pipeline = 0; pipeline < numPipelines; ++pipeline) { - auto& factory = driverFactories_[pipeline]; - const uint32_t driverIdOffset = factory->numDrivers * splitGroupId; - for (uint32_t partitionId = 0; partitionId < factory->numDrivers; - ++partitionId) { - out.emplace_back(factory->createDriver( - std::make_unique( - self, - driverIdOffset + partitionId, - pipeline, - splitGroupId, - partitionId), - self->exchangeClients_[pipeline], - [self](size_t i) { - return i < self->driverFactories_.size() - ? self->driverFactories_[i]->numTotalDrivers - : 0; - })); - ++splitGroupState.activeDrivers; - } - } - noMoreLocalExchangeProducers(splitGroupId); - ++numRunningSplitGroups_; - - // Initialize operator stats using the 1st driver of each operator. - if (not initializedOpStats_) { - initializedOpStats_ = true; - size_t driverIndex{0}; - for (auto pipeline = 0; pipeline < numPipelines; ++pipeline) { - auto& factory = self->driverFactories_[pipeline]; - out[driverIndex]->initializeOperatorStats( - self->taskStats_.pipelineStats[pipeline].operatorStats); - driverIndex += factory->numDrivers; - } + auto& splitGroupState = self->splitGroupStates_[splitGroupId]; + const auto numPipelines = driverFactories_.size(); + for (auto pipeline = 0; pipeline < numPipelines; ++pipeline) { + auto& factory = driverFactories_[pipeline]; + const uint32_t driverIdOffset = factory->numDrivers * splitGroupId; + for (uint32_t partitionId = 0; partitionId < factory->numDrivers; + ++partitionId) { + out.emplace_back(factory->createDriver( + std::make_unique( + self, + driverIdOffset + partitionId, + pipeline, + splitGroupId, + partitionId), + self->exchangeClients_[pipeline], + [self](size_t i) { + return i < self->driverFactories_.size() + ? self->driverFactories_[i]->numTotalDrivers + : 0; + })); + ++splitGroupState.activeDrivers; } - } catch (std::exception& e) { - // If one of the drivers threw in creation, we need to remove task pointer - // from already created drivers or we will face another throw (or even a - // crash). - for (auto& driver : out) { - driver->disconnectFromTask(); + } + noMoreLocalExchangeProducers(splitGroupId); + ++numRunningSplitGroups_; + + // Initialize operator stats using the 1st driver of each operator. + if (not initializedOpStats_) { + initializedOpStats_ = true; + size_t driverIndex{0}; + for (auto pipeline = 0; pipeline < numPipelines; ++pipeline) { + auto& factory = self->driverFactories_[pipeline]; + out[driverIndex]->initializeOperatorStats( + self->taskStats_.pipelineStats[pipeline].operatorStats); + driverIndex += factory->numDrivers; } - throw; } }