From 3736f69d8256973f8005660763036739bf600315 Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Tue, 3 Oct 2023 16:58:53 -0700 Subject: [PATCH] [native]Improve spill directory failure handling --- .../presto_cpp/main/TaskManager.cpp | 25 +++++--- .../presto_cpp/main/tests/CMakeLists.txt | 1 + .../presto_cpp/main/tests/TaskManagerTest.cpp | 64 +++++++++++++++++++ 3 files changed, 81 insertions(+), 9 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index 77f27428cf949..f066cae032b3b 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -52,7 +52,6 @@ static void maybeSetupTaskSpillDirectory( const auto includeNodeInSpillPath = SystemConfig::instance()->includeNodeInSpillPath(); auto nodeConfig = NodeConfig::instance(); - const auto taskSpillDirPath = TaskManager::buildTaskSpillDirectoryPath( baseSpillDirectory, nodeConfig->nodeInternalAddress(), @@ -60,12 +59,12 @@ static void maybeSetupTaskSpillDirectory( execTask.queryCtx()->queryId(), execTask.taskId(), includeNodeInSpillPath); - execTask.setSpillDirectory(taskSpillDirPath); // Create folder for the task spilling. auto fileSystem = velox::filesystems::getFileSystem(taskSpillDirPath, nullptr); VELOX_CHECK_NOT_NULL(fileSystem, "File System is null!"); fileSystem->mkdir(taskSpillDirPath); + execTask.setSpillDirectory(taskSpillDirPath); } // Keep outstanding Promises in RequestHandler's state itself. @@ -436,20 +435,28 @@ std::unique_ptr TaskManager::createOrUpdateTask( return std::make_unique(prestoTask->updateInfoLocked()); } - execTask = exec::Task::create( + // Uses a temp variable to store the created velox task to destroy it + // under presto task lock if spill directory setup fails. Otherwise, the + // concurrent task creation retry from the coordinator might see the + // unexpected state in presto task left by the previously failed velox + // task which hasn't been destroyed yet, such as the task pool in query's + // root memory pool. + auto newExecTask = exec::Task::create( taskId, planFragment, prestoTask->id.id(), std::move(queryCtx)); - auto baseSpillDir = *(baseSpillDir_.rlock()); - maybeSetupTaskSpillDirectory(planFragment, *execTask, baseSpillDir); + // TODO: move spill directory creation inside velox task execution + // whenever spilling is triggered. It will reduce the unnecessary file + // operations on remote storage. + const auto baseSpillDir = *(baseSpillDir_.rlock()); + maybeSetupTaskSpillDirectory(planFragment, *newExecTask, baseSpillDir); - prestoTask->task = execTask; + prestoTask->task = std::move(newExecTask); prestoTask->info.needsPlan = false; startTask = true; - } else { - execTask = prestoTask->task; } + execTask = prestoTask->task; } // Outside of prestoTask->mutex. - VELOX_CHECK( + VELOX_CHECK_NOT_NULL( execTask, "Task update received before setting a plan. The splits in " "this update could not be delivered for {}", diff --git a/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt b/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt index 112953fc75a62..9cbeedbc3be1c 100644 --- a/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt @@ -41,6 +41,7 @@ target_link_libraries( velox_aggregates velox_hive_partition_function ${RE2} + Folly::folly gmock gtest gtest_main) diff --git a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp index 2487bdd915951..1dbc9119d4277 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp @@ -134,6 +134,12 @@ class Cursor { uint64_t sequence_ = 0; }; +void setAggregationSpillConfig( + std::map& queryConfigs) { + queryConfigs.emplace(core::QueryConfig::kSpillEnabled, "true"); + queryConfigs.emplace(core::QueryConfig::kAggregationSpillEnabled, "true"); +} + static const uint64_t kGB = 1024 * 1024 * 1024ULL; class TaskManagerTest : public testing::Test { @@ -1132,6 +1138,64 @@ TEST_F(TaskManagerTest, checkBatchSplits) { ASSERT_EQ(resultOrFailure.status, nullptr); } +TEST_F(TaskManagerTest, buildSpillDirectoryFailure) { + // Cleanup old tasks between test iterations. + taskManager_->setOldTaskCleanUpMs(0); + for (bool buildSpillDirectoryFailure : {false}) { + SCOPED_TRACE(fmt::format( + "buildSpillDirectoryFailure: {}", buildSpillDirectoryFailure)); + auto spillDir = setupSpillPath(); + + std::vector batches = makeVectors(1, 1'000); + + if (buildSpillDirectoryFailure) { + // Set bad formatted spill path. + taskManager_->setBaseSpillDirectory( + fmt::format("/etc/{}", spillDir->path)); + } else { + taskManager_->setBaseSpillDirectory(spillDir->path); + } + + std::map queryConfigs; + setAggregationSpillConfig(queryConfigs); + + auto planFragment = exec::test::PlanBuilder() + //.tableScan(rowType_) + .values(batches) + .singleAggregation({"c0"}, {"count(c1)"}, {}) + .planFragment(); + const protocol::TaskId taskId = "test.0.0.0.0"; + protocol::TaskUpdateRequest updateRequest; + updateRequest.session.systemProperties = queryConfigs; + // Create task will fail if the spilling directory setup fails. + if (buildSpillDirectoryFailure) { + VELOX_ASSERT_THROW( + createOrUpdateTask(taskId, updateRequest, planFragment), "Mkdir"); + } else { + createOrUpdateTask(taskId, updateRequest, planFragment); + auto taskMap = taskManager_->tasks(); + ASSERT_EQ(taskMap.size(), 1); + auto* veloxTask = taskMap.begin()->second->task.get(); + ASSERT_TRUE(veloxTask != nullptr); + ASSERT_FALSE(veloxTask->spillDirectory().empty()); + } + + taskManager_->deleteTask(taskId, true); + if (!buildSpillDirectoryFailure) { + auto taskMap = taskManager_->tasks(); + ASSERT_EQ(taskMap.size(), 1); + auto* veloxTask = taskMap.begin()->second->task.get(); + ASSERT_TRUE(veloxTask != nullptr); + while (veloxTask->numFinishedDrivers() != veloxTask->numTotalDrivers()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + taskManager_->cleanOldTasks(); + velox::exec::test::waitForAllTasksToBeDeleted(3'000'000); + ASSERT_TRUE(taskManager_->tasks().empty()); + } +} + // TODO: add disk spilling test for order by and hash join later. } // namespace } // namespace facebook::presto