From 521fc147d7d75847e5d46cd38be678ee9043cb86 Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 13 Oct 2023 15:02:28 +0800 Subject: [PATCH 1/2] save work Signed-off-by: xufei --- dbms/src/Flash/Mpp/MPPTask.cpp | 18 +++++++++------- dbms/src/Flash/Mpp/MPPTask.h | 4 ++-- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 11 +++++++++- dbms/src/Flash/Mpp/MPPTaskManager.h | 15 ++++++++++--- .../Mpp/tests/gtest_mpp_task_manager.cpp | 21 +++++++++++++++++++ 5 files changed, 55 insertions(+), 14 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 98b9eab40dd..7c6623ec703 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -112,13 +112,12 @@ void MPPTaskMonitorHelper::initAndAddself(MPPTaskManager * manager_, const Strin { manager = manager_; task_unique_id = task_unique_id_; - manager->addMonitoredTask(task_unique_id); - initialized = true; + added_to_monitor = manager->addMonitoredTask(task_unique_id); } MPPTaskMonitorHelper::~MPPTaskMonitorHelper() { - if (initialized) + if (added_to_monitor) { manager->removeMonitoredTask(task_unique_id); } @@ -361,11 +360,14 @@ MemoryTracker * MPPTask::getMemoryTracker() const void MPPTask::unregisterTask() { - auto [result, reason] = manager->unregisterTask(id, getErrString()); - if (result) - LOG_DEBUG(log, "task unregistered"); - else - LOG_WARNING(log, "task failed to unregister, reason: {}", reason); + if (is_registered) + { + auto [result, reason] = manager->unregisterTask(id, getErrString()); + if (result) + LOG_DEBUG(log, "task unregistered"); + else + LOG_WARNING(log, "task failed to unregister, reason: {}", reason); + } } void MPPTask::initQueryOperatorSpillContexts( diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index 0563cb32869..1c997e2bb5e 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -62,7 +62,7 @@ class MPPTaskMonitorHelper private: MPPTaskManager * manager = nullptr; String task_unique_id; - bool initialized = false; + bool added_to_monitor = false; }; class MPPTask @@ -176,7 +176,7 @@ class MPPTask ContextPtr context; MPPTaskManager * manager; - std::atomic is_public{false}; + std::atomic is_registered{false}; MPPTaskScheduleEntry schedule_entry; diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 4606aff9bb7..bb42cd3871d 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -376,6 +376,7 @@ std::pair MPPTaskManager::registerTask(MPPTask * task) return {false, "task is already registered"}; } gather_task_set->registerTask(task->id); + task->is_registered = true; task->initProcessListEntry(query->process_list_entry); task->initQueryOperatorSpillContexts(query->mpp_query_operator_spill_contexts); return {true, ""}; @@ -387,6 +388,15 @@ MPPQueryId MPPTaskManager::getCurrentMinTSOQueryId(const String & resource_group return scheduler->getCurrentMinTSOQueryId(resource_group_name); } +bool MPPTaskManager::isTaskExists(const MPPTaskId & id) +{ + std::unique_lock lock(mu); + auto [query, gather_task_set, error_msg] = getMPPQueryAndGatherTaskSet(id.gather_id); + if (gather_task_set == nullptr) + return false; + return gather_task_set->isTaskRegistered(id); +} + std::pair MPPTaskManager::makeTaskActive(MPPTaskPtr task) { if (!task->isRootMPPTask()) @@ -412,7 +422,6 @@ std::pair MPPTaskManager::makeTaskActive(MPPTaskPtr task) "Task process list entry should always be the same as query process list entry"); gather_task_set->makeTaskActive(task); gather_task_set->cancelAlarmsBySenderTaskId(task->id); - task->is_public = true; cv.notify_all(); return {true, ""}; } diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index f14745471ca..78090e9ab0a 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -157,7 +157,7 @@ struct MPPTaskMonitor : log(log_) {} - void addMonitoredTask(const String & task_unique_id) + bool addMonitoredTask(const String & task_unique_id) { std::lock_guard lock(mu); auto iter = monitored_tasks.find(task_unique_id); @@ -167,10 +167,11 @@ struct MPPTaskMonitor log, "task {} is repeatedly added to be monitored which is not an expected behavior!", task_unique_id); - return; + return false; } monitored_tasks.insert(std::make_pair(task_unique_id, Stopwatch())); + return true; } void removeMonitoredTask(const String & task_unique_id) @@ -186,6 +187,12 @@ struct MPPTaskMonitor monitored_tasks.erase(iter); } + bool isInMonitor(const String & task_unique_id) + { + std::lock_guard lock(mu); + return monitored_tasks.find(task_unique_id) != monitored_tasks.end(); + } + std::mutex mu; std::condition_variable cv; bool is_shutdown = false; @@ -220,7 +227,7 @@ class MPPTaskManager : private boost::noncopyable std::shared_ptr getMPPTaskMonitor() const { return monitor; } - void addMonitoredTask(const String & task_unique_id) { monitor->addMonitoredTask(task_unique_id); } + bool addMonitoredTask(const String & task_unique_id) { return monitor->addMonitoredTask(task_unique_id); } void removeMonitoredTask(const String & task_unique_id) { monitor->removeMonitoredTask(task_unique_id); } @@ -261,6 +268,8 @@ class MPPTaskManager : private boost::noncopyable /// for test MPPQueryId getCurrentMinTSOQueryId(const String & resource_group_name); + bool isTaskExists(const MPPTaskId & id); + private: MPPQueryPtr addMPPQuery( const MPPQueryId & query_id, diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpp_task_manager.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpp_task_manager.cpp index 21cfa97b153..10cea8261a3 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpp_task_manager.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpp_task_manager.cpp @@ -121,5 +121,26 @@ try } CATCH +TEST_F(TestMPPTaskManager, testDuplicateMPPTaskId) +try +{ + auto context = createContextForTest(); + mpp::EstablishMPPConnectionRequest establish_req; + auto gather_id = MPPGatherId(1, MPPQueryId(1, 1, 1, 1, "")); + auto * sender_meta = establish_req.mutable_sender_meta(); + fillTaskMeta(sender_meta, 1, gather_id); + auto mpp_task_manager = context->getTMTContext().getMPPTaskManager(); + auto mpp_task_1 = MPPTask::newTaskForTest(*sender_meta, context); + auto result = mpp_task_manager->registerTask(mpp_task_1.get()); + ASSERT_TRUE(result.first); + auto mpp_task_2 = MPPTask::newTaskForTest(*sender_meta, context); + result = mpp_task_manager->registerTask(mpp_task_2.get()); + ASSERT_FALSE(result.first); + mpp_task_2->handleError(result.second); + ASSERT_TRUE(mpp_task_manager->isTaskExists(mpp_task_1->getId())); + mpp_task_manager->getMPPTaskMonitor()->isInMonitor(mpp_task_1->getId().toString()); +} +CATCH + } // namespace tests } // namespace DB From bc5d2938b409fbeb782b989da026abc0533dbcdd Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 13 Oct 2023 15:42:35 +0800 Subject: [PATCH 2/2] refine tests Signed-off-by: xufei --- .../Mpp/tests/gtest_mpp_task_manager.cpp | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpp_task_manager.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpp_task_manager.cpp index 10cea8261a3..0ff23d02774 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpp_task_manager.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpp_task_manager.cpp @@ -124,21 +124,24 @@ CATCH TEST_F(TestMPPTaskManager, testDuplicateMPPTaskId) try { + MPPTaskPtr original_task; auto context = createContextForTest(); - mpp::EstablishMPPConnectionRequest establish_req; - auto gather_id = MPPGatherId(1, MPPQueryId(1, 1, 1, 1, "")); - auto * sender_meta = establish_req.mutable_sender_meta(); - fillTaskMeta(sender_meta, 1, gather_id); auto mpp_task_manager = context->getTMTContext().getMPPTaskManager(); - auto mpp_task_1 = MPPTask::newTaskForTest(*sender_meta, context); - auto result = mpp_task_manager->registerTask(mpp_task_1.get()); - ASSERT_TRUE(result.first); - auto mpp_task_2 = MPPTask::newTaskForTest(*sender_meta, context); - result = mpp_task_manager->registerTask(mpp_task_2.get()); - ASSERT_FALSE(result.first); - mpp_task_2->handleError(result.second); - ASSERT_TRUE(mpp_task_manager->isTaskExists(mpp_task_1->getId())); - mpp_task_manager->getMPPTaskMonitor()->isInMonitor(mpp_task_1->getId().toString()); + { + mpp::EstablishMPPConnectionRequest establish_req; + auto gather_id = MPPGatherId(1, MPPQueryId(1, 1, 1, 1, "")); + auto * sender_meta = establish_req.mutable_sender_meta(); + fillTaskMeta(sender_meta, 1, gather_id); + original_task = MPPTask::newTaskForTest(*sender_meta, context); + auto result = mpp_task_manager->registerTask(original_task.get()); + ASSERT_TRUE(result.first); + auto second_task = MPPTask::newTaskForTest(*sender_meta, context); + result = mpp_task_manager->registerTask(second_task.get()); + ASSERT_FALSE(result.first); + second_task->handleError(result.second); + } + ASSERT_TRUE(mpp_task_manager->isTaskExists(original_task->getId())); + ASSERT_TRUE(mpp_task_manager->getMPPTaskMonitor()->isInMonitor(original_task->getId().toString())); } CATCH