Skip to content

Commit

Permalink
Fix race condition in Driver::runInternal (facebookincubator#1219)
Browse files Browse the repository at this point in the history
Summary:
There was a race condition that caused ASAN heap-use-after-free in
Driver::runInternal. It is possible for the Task to terminate (due to an error)
while Driver is off-thread sitting in the executor's queue waiting to be
scheduled to run on a thread. If this case, Task may have been terminated and
operators have been destroyed just before Driver::runInternal executes. In this
case `operators_[curOpIndex_]->stats().addRuntimeStat
("queuedWallNanos", ...);` call would try to access freed memory.

The fix is to (1) streamline Driver by removing task_ member variable and use
DriverCtx::task instead; (2) move logic for updating stats after the call to
task->enter(). If task has terminated, enter() will return
StopReason::kTerminate and no Driver will abort any further processing.

```AddressSanitizer: heap-use-after-free

    facebookincubator#6 0x7f01c29ae9b9 in facebook::velox::exec::OperatorStats::addRuntimeStat(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, long) velox/exec/Operator.h:144
    facebookincubator#7 0x7f01c29afaf7 in facebook::velox::exec::Driver::runInternal(std::shared_ptr<facebook::velox::exec::Driver>&, std::shared_ptr<facebook::velox::exec::BlockingState>*) velox/exec/Driver.cpp:249
    facebookincubator#8 0x7f01c29b4174 in facebook::velox::exec::Driver::run(std::shared_ptr<facebook::velox::exec::Driver>) velox/exec/Driver.cpp:415
    facebookincubator#9 0x7f01c29cce5f in facebook::velox::exec::Driver::enqueue(std::shared_ptr<facebook::velox::exec::Driver>)::$_0::operator()() const velox/exec/Driver.cpp:161
```

Pull Request resolved: facebookincubator#1219

Test Plan:
$ buck test mode/dev-asan //presto_cpp/main/tests:presto_main_test -- --exact 'presto_cpp/main/tests:presto_main_test - TaskManagerTest.outOfQueryUserMemory' --run-disabled --stress-runs 1000

```
Summary
  Pass: 1000
  ListingSuccess: 1
Finished test run: https://www.internalfb.com/intern/testinfra/testrun/1125900137408748
```

Imported from GitHub, without a `Test Plan:` line.

Reviewed By: spershin, oerling

Differential Revision: D34904220

Pulled By: mbasmanova

fbshipit-source-id: d4b88cf9e3ec2884c77371c37c571297a59728b6
  • Loading branch information
mbasmanova authored and ZJie1 committed Mar 18, 2022
1 parent 893e957 commit a35fbc6
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 97 deletions.
68 changes: 26 additions & 42 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,31 +142,18 @@ class CancelGuard {
};
} // namespace

Driver::~Driver() {
if (task_) {
LOG(ERROR) << "Driver destructed while still in Task: "
<< task_->toString();
DLOG(FATAL) << "Driver destructed while referencing task";
}
}

// static
void Driver::enqueue(std::shared_ptr<Driver> driver) {
// This is expected to be called inside the Driver's Tasks's mutex.
driver->enqueueInternal();
auto& task = driver->task_;
if (!task) {
return;
}
task->queryCtx()->executor()->add([driver]() { Driver::run(driver); });
driver->task()->queryCtx()->executor()->add(
[driver]() { Driver::run(driver); });
}

Driver::Driver(
std::unique_ptr<DriverCtx> ctx,
std::vector<std::unique_ptr<Operator>>&& operators)
: ctx_(std::move(ctx)),
task_(ctx_->task),
operators_(std::move(operators)) {
: ctx_(std::move(ctx)), operators_(std::move(operators)) {
curOpIndex_ = operators_.size() - 1;
// Operators need access to their Driver for adaptation.
ctx_->driver = this;
Expand Down Expand Up @@ -244,22 +231,17 @@ void Driver::enqueueInternal() {
StopReason Driver::runInternal(
std::shared_ptr<Driver>& self,
std::shared_ptr<BlockingState>* blockingState) {
// Update the next operator's queueTime.
if (curOpIndex_ < operators_.size()) {
operators_[curOpIndex_]->stats().addRuntimeStat(
"queuedWallNanos",
(getCurrentTimeMicro() - queueTimeStartMicros_) * 1'000);
}
// Get 'task_' into a local because this could be unhooked from it on another
// thread.
auto task = task_;
auto stop = !task ? StopReason::kTerminate : task->enter(state_);
const auto queuedWallNanos =
(getCurrentTimeMicro() - queueTimeStartMicros_) * 1'000;

auto& task = ctx_->task;
auto stop = task->enter(state_);
if (stop != StopReason::kNone) {
if (stop == StopReason::kTerminate) {
// ctx_ still has a reference to the Task. 'this' is not on
// thread from the Task's viewpoint, hence no need to call
// close().
ctx_->task->setError(std::make_exception_ptr(VeloxRuntimeError(
task->setError(std::make_exception_ptr(VeloxRuntimeError(
__FILE__,
__LINE__,
__FUNCTION__,
Expand All @@ -272,7 +254,13 @@ StopReason Driver::runInternal(
return stop;
}

CancelGuard guard(task_.get(), &state_, [&](StopReason reason) {
// Update the next operator's queueTime.
if (curOpIndex_ < operators_.size()) {
operators_[curOpIndex_]->stats().addRuntimeStat(
"queuedWallNanos", queuedWallNanos);
}

CancelGuard guard(task.get(), &state_, [&](StopReason reason) {
// This is run on error or cancel exit.
if (reason == StopReason::kTerminate) {
task->setError(std::make_exception_ptr(VeloxRuntimeError(
Expand All @@ -299,7 +287,7 @@ StopReason Driver::runInternal(

for (;;) {
for (int32_t i = numOperators - 1; i >= 0; --i) {
stop = task_->shouldStop();
stop = task->shouldStop();
if (stop != StopReason::kNone) {
guard.notThrown();
return stop;
Expand Down Expand Up @@ -354,7 +342,7 @@ StopReason Driver::runInternal(
i += 2;
continue;
} else {
stop = task_->shouldStop();
stop = task->shouldStop();
if (stop != StopReason::kNone) {
guard.notThrown();
return stop;
Expand Down Expand Up @@ -399,11 +387,11 @@ StopReason Driver::runInternal(
}
}
} catch (velox::VeloxException& e) {
task_->setError(std::current_exception());
task->setError(std::current_exception());
// The CancelPoolGuard will close 'self' and remove from task_.
return StopReason::kAlreadyTerminated;
} catch (std::exception& e) {
task_->setError(std::current_exception());
task->setError(std::current_exception());
// The CancelGuard will close 'self' and remove from task_.
return StopReason::kAlreadyTerminated;
}
Expand Down Expand Up @@ -454,25 +442,26 @@ void Driver::addStatsToTask() {
for (auto& op : operators_) {
auto& stats = op->stats();
stats.memoryStats.update(op->pool()->getMemoryUsageTracker());
task_->addOperatorStats(stats);
ctx_->task->addOperatorStats(stats);
}
}

void Driver::close() {
if (!task_) {
if (closed_) {
// Already closed.
return;
}

closed_ = true;

if (!isOnThread() && !isTerminated()) {
LOG(FATAL) << "Driver::close is only allowed from the Driver's thread";
}
addStatsToTask();
for (auto& op : operators_) {
op->close();
}
auto task = std::move(task_);

Task::removeDriver(task, this);
Task::removeDriver(ctx_->task, this);
}

void Driver::closeByTask() {
Expand All @@ -481,11 +470,6 @@ void Driver::closeByTask() {
for (auto& op : operators_) {
op->close();
}
task_ = nullptr;
}

void Driver::disconnectFromTask() {
task_ = nullptr;
}

bool Driver::mayPushdownAggregation(Operator* aggregation) const {
Expand Down
17 changes: 6 additions & 11 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ class Driver {
std::unique_ptr<DriverCtx> driverCtx,
std::vector<std::unique_ptr<Operator>>&& operators);

~Driver();

static void run(std::shared_ptr<Driver> self);

static void enqueue(std::shared_ptr<Driver> instance);
Expand Down Expand Up @@ -236,18 +234,14 @@ class Driver {
return ctx_.get();
}

std::shared_ptr<Task> task() const {
return task_;
const std::shared_ptr<Task>& task() const {
return ctx_->task;
}

// Updates the stats in 'task_' and frees resources. Only called by Task for
// Updates the stats in Task and frees resources. Only called by Task for
// closing non-running Drivers.
void closeByTask();

// This is called if the creation of drivers failed and we want to disconnect
// driver from the task before driver's destruction.
void disconnectFromTask();

private:
void enqueueInternal();

Expand All @@ -262,9 +256,10 @@ class Driver {
void pushdownFilters(int operatorIndex);

std::unique_ptr<DriverCtx> ctx_;
std::shared_ptr<Task> task_;

// Set via Task_ and serialized by 'task_'s mutex.
std::atomic_bool closed_{false};

// Set via Task and serialized by Task's mutex.
ThreadState state_;

// Timer used to track down the time we are sitting in the driver queue.
Expand Down
78 changes: 34 additions & 44 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,52 +326,42 @@ void Task::createDriversLocked(
std::shared_ptr<Task>& self,
uint32_t splitGroupId,
std::vector<std::shared_ptr<Driver>>& out) {
try {
auto& splitGroupState = self->splitGroupStates_[splitGroupId];
const auto numPipelines = driverFactories_.size();
for (auto pipeline = 0; pipeline < numPipelines; ++pipeline) {
auto& factory = driverFactories_[pipeline];
const uint32_t driverIdOffset = factory->numDrivers * splitGroupId;
for (uint32_t partitionId = 0; partitionId < factory->numDrivers;
++partitionId) {
out.emplace_back(factory->createDriver(
std::make_unique<DriverCtx>(
self,
driverIdOffset + partitionId,
pipeline,
splitGroupId,
partitionId),
self->exchangeClients_[pipeline],
[self](size_t i) {
return i < self->driverFactories_.size()
? self->driverFactories_[i]->numTotalDrivers
: 0;
}));
++splitGroupState.activeDrivers;
}
}
noMoreLocalExchangeProducers(splitGroupId);
++numRunningSplitGroups_;

// Initialize operator stats using the 1st driver of each operator.
if (not initializedOpStats_) {
initializedOpStats_ = true;
size_t driverIndex{0};
for (auto pipeline = 0; pipeline < numPipelines; ++pipeline) {
auto& factory = self->driverFactories_[pipeline];
out[driverIndex]->initializeOperatorStats(
self->taskStats_.pipelineStats[pipeline].operatorStats);
driverIndex += factory->numDrivers;
}
auto& splitGroupState = self->splitGroupStates_[splitGroupId];
const auto numPipelines = driverFactories_.size();
for (auto pipeline = 0; pipeline < numPipelines; ++pipeline) {
auto& factory = driverFactories_[pipeline];
const uint32_t driverIdOffset = factory->numDrivers * splitGroupId;
for (uint32_t partitionId = 0; partitionId < factory->numDrivers;
++partitionId) {
out.emplace_back(factory->createDriver(
std::make_unique<DriverCtx>(
self,
driverIdOffset + partitionId,
pipeline,
splitGroupId,
partitionId),
self->exchangeClients_[pipeline],
[self](size_t i) {
return i < self->driverFactories_.size()
? self->driverFactories_[i]->numTotalDrivers
: 0;
}));
++splitGroupState.activeDrivers;
}
} catch (std::exception& e) {
// If one of the drivers threw in creation, we need to remove task pointer
// from already created drivers or we will face another throw (or even a
// crash).
for (auto& driver : out) {
driver->disconnectFromTask();
}
noMoreLocalExchangeProducers(splitGroupId);
++numRunningSplitGroups_;

// Initialize operator stats using the 1st driver of each operator.
if (not initializedOpStats_) {
initializedOpStats_ = true;
size_t driverIndex{0};
for (auto pipeline = 0; pipeline < numPipelines; ++pipeline) {
auto& factory = self->driverFactories_[pipeline];
out[driverIndex]->initializeOperatorStats(
self->taskStats_.pipelineStats[pipeline].operatorStats);
driverIndex += factory->numDrivers;
}
throw;
}
}

Expand Down

0 comments on commit a35fbc6

Please sign in to comment.