Skip to content

Commit

Permalink
[native]Improve spill directory failure handling
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Oct 4, 2023
1 parent e4d6044 commit 3736f69
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 9 deletions.
25 changes: 16 additions & 9 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,19 @@ static void maybeSetupTaskSpillDirectory(
const auto includeNodeInSpillPath =
SystemConfig::instance()->includeNodeInSpillPath();
auto nodeConfig = NodeConfig::instance();

const auto taskSpillDirPath = TaskManager::buildTaskSpillDirectoryPath(
baseSpillDirectory,
nodeConfig->nodeInternalAddress(),
nodeConfig->nodeId(),
execTask.queryCtx()->queryId(),
execTask.taskId(),
includeNodeInSpillPath);
execTask.setSpillDirectory(taskSpillDirPath);
// Create folder for the task spilling.
auto fileSystem =
velox::filesystems::getFileSystem(taskSpillDirPath, nullptr);
VELOX_CHECK_NOT_NULL(fileSystem, "File System is null!");
fileSystem->mkdir(taskSpillDirPath);
execTask.setSpillDirectory(taskSpillDirPath);
}

// Keep outstanding Promises in RequestHandler's state itself.
Expand Down Expand Up @@ -436,20 +435,28 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTask(
return std::make_unique<TaskInfo>(prestoTask->updateInfoLocked());
}

execTask = exec::Task::create(
// Uses a temp variable to store the created velox task to destroy it
// under presto task lock if spill directory setup fails. Otherwise, the
// concurrent task creation retry from the coordinator might see the
// unexpected state in presto task left by the previously failed velox
// task which hasn't been destroyed yet, such as the task pool in query's
// root memory pool.
auto newExecTask = exec::Task::create(
taskId, planFragment, prestoTask->id.id(), std::move(queryCtx));
auto baseSpillDir = *(baseSpillDir_.rlock());
maybeSetupTaskSpillDirectory(planFragment, *execTask, baseSpillDir);
// TODO: move spill directory creation inside velox task execution
// whenever spilling is triggered. It will reduce the unnecessary file
// operations on remote storage.
const auto baseSpillDir = *(baseSpillDir_.rlock());
maybeSetupTaskSpillDirectory(planFragment, *newExecTask, baseSpillDir);

prestoTask->task = execTask;
prestoTask->task = std::move(newExecTask);
prestoTask->info.needsPlan = false;
startTask = true;
} else {
execTask = prestoTask->task;
}
execTask = prestoTask->task;
}
// Outside of prestoTask->mutex.
VELOX_CHECK(
VELOX_CHECK_NOT_NULL(
execTask,
"Task update received before setting a plan. The splits in "
"this update could not be delivered for {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ target_link_libraries(
velox_aggregates
velox_hive_partition_function
${RE2}
Folly::folly
gmock
gtest
gtest_main)
Expand Down
64 changes: 64 additions & 0 deletions presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ class Cursor {
uint64_t sequence_ = 0;
};

void setAggregationSpillConfig(
std::map<std::string, std::string>& queryConfigs) {
queryConfigs.emplace(core::QueryConfig::kSpillEnabled, "true");
queryConfigs.emplace(core::QueryConfig::kAggregationSpillEnabled, "true");
}

static const uint64_t kGB = 1024 * 1024 * 1024ULL;

class TaskManagerTest : public testing::Test {
Expand Down Expand Up @@ -1132,6 +1138,64 @@ TEST_F(TaskManagerTest, checkBatchSplits) {
ASSERT_EQ(resultOrFailure.status, nullptr);
}

TEST_F(TaskManagerTest, buildSpillDirectoryFailure) {
// Cleanup old tasks between test iterations.
taskManager_->setOldTaskCleanUpMs(0);
for (bool buildSpillDirectoryFailure : {false}) {
SCOPED_TRACE(fmt::format(
"buildSpillDirectoryFailure: {}", buildSpillDirectoryFailure));
auto spillDir = setupSpillPath();

std::vector<RowVectorPtr> batches = makeVectors(1, 1'000);

if (buildSpillDirectoryFailure) {
// Set bad formatted spill path.
taskManager_->setBaseSpillDirectory(
fmt::format("/etc/{}", spillDir->path));
} else {
taskManager_->setBaseSpillDirectory(spillDir->path);
}

std::map<std::string, std::string> queryConfigs;
setAggregationSpillConfig(queryConfigs);

auto planFragment = exec::test::PlanBuilder()
//.tableScan(rowType_)
.values(batches)
.singleAggregation({"c0"}, {"count(c1)"}, {})
.planFragment();
const protocol::TaskId taskId = "test.0.0.0.0";
protocol::TaskUpdateRequest updateRequest;
updateRequest.session.systemProperties = queryConfigs;
// Create task will fail if the spilling directory setup fails.
if (buildSpillDirectoryFailure) {
VELOX_ASSERT_THROW(
createOrUpdateTask(taskId, updateRequest, planFragment), "Mkdir");
} else {
createOrUpdateTask(taskId, updateRequest, planFragment);
auto taskMap = taskManager_->tasks();
ASSERT_EQ(taskMap.size(), 1);
auto* veloxTask = taskMap.begin()->second->task.get();
ASSERT_TRUE(veloxTask != nullptr);
ASSERT_FALSE(veloxTask->spillDirectory().empty());
}

taskManager_->deleteTask(taskId, true);
if (!buildSpillDirectoryFailure) {
auto taskMap = taskManager_->tasks();
ASSERT_EQ(taskMap.size(), 1);
auto* veloxTask = taskMap.begin()->second->task.get();
ASSERT_TRUE(veloxTask != nullptr);
while (veloxTask->numFinishedDrivers() != veloxTask->numTotalDrivers()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
taskManager_->cleanOldTasks();
velox::exec::test::waitForAllTasksToBeDeleted(3'000'000);
ASSERT_TRUE(taskManager_->tasks().empty());
}
}

// TODO: add disk spilling test for order by and hash join later.
} // namespace
} // namespace facebook::presto

0 comments on commit 3736f69

Please sign in to comment.