Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix potential hang when duplicated task registered. #8193

Merged
merged 3 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,12 @@ void MPPTaskMonitorHelper::initAndAddself(MPPTaskManager * manager_, const Strin
{
manager = manager_;
task_unique_id = task_unique_id_;
manager->addMonitoredTask(task_unique_id);
initialized = true;
added_to_monitor = manager->addMonitoredTask(task_unique_id);
}

MPPTaskMonitorHelper::~MPPTaskMonitorHelper()
{
if (initialized)
if (added_to_monitor)
{
manager->removeMonitoredTask(task_unique_id);
}
Expand Down Expand Up @@ -361,11 +360,14 @@ MemoryTracker * MPPTask::getMemoryTracker() const

void MPPTask::unregisterTask()
{
auto [result, reason] = manager->unregisterTask(id, getErrString());
if (result)
LOG_DEBUG(log, "task unregistered");
else
LOG_WARNING(log, "task failed to unregister, reason: {}", reason);
if (is_registered)
{
auto [result, reason] = manager->unregisterTask(id, getErrString());
if (result)
LOG_DEBUG(log, "task unregistered");
else
LOG_WARNING(log, "task failed to unregister, reason: {}", reason);
}
}

void MPPTask::initQueryOperatorSpillContexts(
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class MPPTaskMonitorHelper
private:
MPPTaskManager * manager = nullptr;
String task_unique_id;
bool initialized = false;
bool added_to_monitor = false;
};

class MPPTask
Expand Down Expand Up @@ -176,7 +176,7 @@ class MPPTask
ContextPtr context;

MPPTaskManager * manager;
std::atomic<bool> is_public{false};
std::atomic<bool> is_registered{false};

MPPTaskScheduleEntry schedule_entry;

Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ std::pair<bool, String> MPPTaskManager::registerTask(MPPTask * task)
return {false, "task is already registered"};
}
gather_task_set->registerTask(task->id);
task->is_registered = true;
task->initProcessListEntry(query->process_list_entry);
task->initQueryOperatorSpillContexts(query->mpp_query_operator_spill_contexts);
return {true, ""};
Expand All @@ -387,6 +388,15 @@ MPPQueryId MPPTaskManager::getCurrentMinTSOQueryId(const String & resource_group
return scheduler->getCurrentMinTSOQueryId(resource_group_name);
}

bool MPPTaskManager::isTaskExists(const MPPTaskId & id)
{
std::unique_lock lock(mu);
auto [query, gather_task_set, error_msg] = getMPPQueryAndGatherTaskSet(id.gather_id);
if (gather_task_set == nullptr)
return false;
return gather_task_set->isTaskRegistered(id);
}

std::pair<bool, String> MPPTaskManager::makeTaskActive(MPPTaskPtr task)
{
if (!task->isRootMPPTask())
Expand All @@ -412,7 +422,6 @@ std::pair<bool, String> MPPTaskManager::makeTaskActive(MPPTaskPtr task)
"Task process list entry should always be the same as query process list entry");
gather_task_set->makeTaskActive(task);
gather_task_set->cancelAlarmsBySenderTaskId(task->id);
task->is_public = true;
cv.notify_all();
return {true, ""};
}
Expand Down
15 changes: 12 additions & 3 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ struct MPPTaskMonitor
: log(log_)
{}

void addMonitoredTask(const String & task_unique_id)
bool addMonitoredTask(const String & task_unique_id)
{
std::lock_guard lock(mu);
auto iter = monitored_tasks.find(task_unique_id);
Expand All @@ -167,10 +167,11 @@ struct MPPTaskMonitor
log,
"task {} is repeatedly added to be monitored which is not an expected behavior!",
task_unique_id);
return;
return false;
}

monitored_tasks.insert(std::make_pair(task_unique_id, Stopwatch()));
return true;
}

void removeMonitoredTask(const String & task_unique_id)
Expand All @@ -186,6 +187,12 @@ struct MPPTaskMonitor
monitored_tasks.erase(iter);
}

bool isInMonitor(const String & task_unique_id)
{
std::lock_guard lock(mu);
return monitored_tasks.find(task_unique_id) != monitored_tasks.end();
}

std::mutex mu;
std::condition_variable cv;
bool is_shutdown = false;
Expand Down Expand Up @@ -220,7 +227,7 @@ class MPPTaskManager : private boost::noncopyable

std::shared_ptr<MPPTaskMonitor> getMPPTaskMonitor() const { return monitor; }

void addMonitoredTask(const String & task_unique_id) { monitor->addMonitoredTask(task_unique_id); }
bool addMonitoredTask(const String & task_unique_id) { return monitor->addMonitoredTask(task_unique_id); }

void removeMonitoredTask(const String & task_unique_id) { monitor->removeMonitoredTask(task_unique_id); }

Expand Down Expand Up @@ -261,6 +268,8 @@ class MPPTaskManager : private boost::noncopyable
/// for test
MPPQueryId getCurrentMinTSOQueryId(const String & resource_group_name);

bool isTaskExists(const MPPTaskId & id);

private:
MPPQueryPtr addMPPQuery(
const MPPQueryId & query_id,
Expand Down
24 changes: 24 additions & 0 deletions dbms/src/Flash/Mpp/tests/gtest_mpp_task_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,29 @@ try
}
CATCH

TEST_F(TestMPPTaskManager, testDuplicateMPPTaskId)
try
{
MPPTaskPtr original_task;
auto context = createContextForTest();
auto mpp_task_manager = context->getTMTContext().getMPPTaskManager();
{
mpp::EstablishMPPConnectionRequest establish_req;
auto gather_id = MPPGatherId(1, MPPQueryId(1, 1, 1, 1, ""));
auto * sender_meta = establish_req.mutable_sender_meta();
fillTaskMeta(sender_meta, 1, gather_id);
original_task = MPPTask::newTaskForTest(*sender_meta, context);
auto result = mpp_task_manager->registerTask(original_task.get());
ASSERT_TRUE(result.first);
auto second_task = MPPTask::newTaskForTest(*sender_meta, context);
result = mpp_task_manager->registerTask(second_task.get());
ASSERT_FALSE(result.first);
second_task->handleError(result.second);
}
ASSERT_TRUE(mpp_task_manager->isTaskExists(original_task->getId()));
ASSERT_TRUE(mpp_task_manager->getMPPTaskMonitor()->isInMonitor(original_task->getId().toString()));
}
CATCH

} // namespace tests
} // namespace DB