Skip to content

Commit

Permalink
stop finding a task if it is canncelled (#1778)
Browse files Browse the repository at this point in the history
* stop finding a task if it is canncelled

* refactor code

* update comments
  • Loading branch information
fzhedu authored Apr 15, 2021
1 parent 2be501e commit 37740f7
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 107 deletions.
9 changes: 4 additions & 5 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,10 @@ ::grpc::Status FlashService::EstablishMPPConnection(::grpc::ServerContext * grpc
auto & tmt_context = context.getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
std::chrono::seconds timeout(10);
MPPTaskPtr sender_task = task_manager->findTaskWithTimeout(request->sender_meta(), timeout);
std::string errMsg;
MPPTaskPtr sender_task = task_manager->findTaskWithTimeout(request->sender_meta(), timeout, errMsg);
if (sender_task == nullptr)
{
auto errMsg = "can't find task [" + toString(request->sender_meta().start_ts()) + "," + toString(request->sender_meta().task_id())
+ "] within " + toString(timeout.count()) + " s";
LOG_ERROR(log, errMsg);
mpp::MPPDataPacket packet;
auto err = new mpp::Error();
Expand All @@ -185,8 +184,8 @@ ::grpc::Status FlashService::EstablishMPPConnection(::grpc::ServerContext * grpc
MPPTunnelPtr tunnel = sender_task->getTunnelWithTimeout(request->receiver_meta(), timeout);
if (tunnel == nullptr)
{
auto errMsg = "can't find tunnel ( " + toString(request->receiver_meta().task_id()) + " + "
+ toString(request->sender_meta().task_id()) + " ) within " + toString(timeout.count()) + " s";
errMsg = "can't find tunnel ( " + toString(request->receiver_meta().task_id()) + " + " + toString(request->sender_meta().task_id())
+ " ) within " + toString(timeout.count()) + " s";
LOG_ERROR(log, errMsg);
mpp::MPPDataPacket packet;
auto err = new mpp::Error();
Expand Down
125 changes: 125 additions & 0 deletions dbms/src/Flash/Mpp/MPPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,131 @@ MPPTaskManager::MPPTaskManager(BackgroundProcessingPool & background_pool_)
false);
}

MPPTaskPtr MPPTaskManager::findTaskWithTimeout(const mpp::TaskMeta & meta, std::chrono::seconds timeout, std::string & errMsg)
{
MPPTaskId id{meta.start_ts(), meta.task_id()};
std::map<MPPTaskId, MPPTaskPtr>::iterator it;
bool cancelled = false;
std::unique_lock<std::mutex> lock(mu);
auto ret = cv.wait_for(lock, timeout, [&] {
auto query_it = mpp_query_map.find(id.start_ts);
// TODO: how about the query has been cancelled in advance?
if (query_it == mpp_query_map.end())
{
return false;
}
else if (query_it->second.to_be_cancelled)
{
/// if the query is cancelled, return true to stop waiting timeout.
LOG_WARNING(log, "Query " + std::to_string(id.start_ts) + " is cancelled, all its tasks are invalid.");
cancelled = true;
return true;
}
it = query_it->second.task_map.find(id);
return it != query_it->second.task_map.end();
});
if (cancelled)
{
errMsg = "Task [" + DB::toString(meta.start_ts()) + "," + DB::toString(meta.task_id()) + "] has been cancelled.";
return nullptr;
}
else if (!ret)
{
errMsg = "Can't find task [" + DB::toString(meta.start_ts()) + "," + DB::toString(meta.task_id()) + "] within "
+ DB::toString(timeout.count()) + " s.";
return nullptr;
}
return it->second;
}

void MPPTaskManager::cancelMPPQuery(UInt64 query_id, const String & reason)
{
MPPQueryTaskSet task_set;
{
/// cancel task may take a long time, so first
/// set a flag, so we can cancel task one by
/// one without holding the lock
std::lock_guard<std::mutex> lock(mu);
auto it = mpp_query_map.find(query_id);
if (it == mpp_query_map.end() || it->second.to_be_cancelled)
return;
it->second.to_be_cancelled = true;
task_set = it->second;
cv.notify_all();
}
LOG_WARNING(log, "Begin cancel query: " + std::to_string(query_id));
std::stringstream ss;
ss << "Remaining task in query " + std::to_string(query_id) + " are: ";
// TODO: cancel tasks in order rather than issuing so many threads to cancel tasks
std::vector<std::thread> cancel_workers;
for (auto task_it = task_set.task_map.rbegin(); task_it != task_set.task_map.rend(); task_it++)
{
ss << task_it->first.toString() << " ";
std::thread t(&MPPTask::cancel, task_it->second, std::ref(reason));
cancel_workers.push_back(std::move(t));
}
LOG_WARNING(log, ss.str());
for (auto & worker : cancel_workers)
{
worker.join();
}
MPPQueryTaskSet canceled_task_set;
{
std::lock_guard<std::mutex> lock(mu);
/// just to double check the query still exists
auto it = mpp_query_map.find(query_id);
if (it != mpp_query_map.end())
{
/// hold the canceled task set, so the mpp task will not be deconstruct when holding the
/// `mu` of MPPTaskManager, otherwise it might cause deadlock
canceled_task_set = it->second;
mpp_query_map.erase(it);
}
}
LOG_WARNING(log, "Finish cancel query: " + std::to_string(query_id));
}

bool MPPTaskManager::registerTask(MPPTaskPtr task)
{
std::unique_lock<std::mutex> lock(mu);
const auto & it = mpp_query_map.find(task->id.start_ts);
if (it != mpp_query_map.end() && it->second.to_be_cancelled)
{
LOG_WARNING(log, "Do not register task: " + task->id.toString() + " because the query is to be cancelled.");
cv.notify_all();
return false;
}
if (it != mpp_query_map.end() && it->second.task_map.find(task->id) != it->second.task_map.end())
{
throw Exception("The task " + task->id.toString() + " has been registered");
}
mpp_query_map[task->id.start_ts].task_map.emplace(task->id, task);
task->manager = this;
cv.notify_all();
return true;
}

void MPPTaskManager::unregisterTask(MPPTask * task)
{
std::unique_lock<std::mutex> lock(mu);
auto it = mpp_query_map.find(task->id.start_ts);
if (it != mpp_query_map.end())
{
if (it->second.to_be_cancelled)
return;
auto task_it = it->second.task_map.find(task->id);
if (task_it != it->second.task_map.end())
{
it->second.task_map.erase(task_it);
if (it->second.task_map.empty())
/// remove query task map if the task is the last one
mpp_query_map.erase(it);
return;
}
}
LOG_ERROR(log, "The task " + task->id.toString() + " cannot be found and fail to unregister");
}

MPPTaskManager::~MPPTaskManager() { background_pool.removeTask(handle); }

} // namespace DB
107 changes: 5 additions & 102 deletions dbms/src/Flash/Mpp/MPPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ class MPPTaskManager : private boost::noncopyable
}
return ret;
}

std::vector<MPPTaskPtr> getCurrentTasksForQuery(UInt64 query_id)
{
std::vector<MPPTaskPtr> ret;
Expand All @@ -443,111 +444,13 @@ class MPPTaskManager : private boost::noncopyable
return ret;
}

bool registerTask(MPPTaskPtr task)
{
std::unique_lock<std::mutex> lock(mu);
const auto & it = mpp_query_map.find(task->id.start_ts);
if (it != mpp_query_map.end() && it->second.to_be_cancelled)
{
LOG_WARNING(log, "Do not register task: " + task->id.toString() + " because the query is to be cancelled.");
cv.notify_all();
return false;
}
if (it != mpp_query_map.end() && it->second.task_map.find(task->id) != it->second.task_map.end())
{
throw Exception("The task " + task->id.toString() + " has been registered");
}
mpp_query_map[task->id.start_ts].task_map.emplace(task->id, task);
task->manager = this;
cv.notify_all();
return true;
}
bool registerTask(MPPTaskPtr task);

void unregisterTask(MPPTask * task)
{
std::unique_lock<std::mutex> lock(mu);
auto it = mpp_query_map.find(task->id.start_ts);
if (it != mpp_query_map.end())
{
if (it->second.to_be_cancelled)
return;
auto task_it = it->second.task_map.find(task->id);
if (task_it != it->second.task_map.end())
{
it->second.task_map.erase(task_it);
if (it->second.task_map.empty())
/// remove query task map if the task is the last one
mpp_query_map.erase(it);
return;
}
}
LOG_ERROR(log, "The task " + task->id.toString() + " cannot be found and fail to unregister");
}

MPPTaskPtr findTaskWithTimeout(const mpp::TaskMeta & meta, std::chrono::seconds timeout)
{
MPPTaskId id{meta.start_ts(), meta.task_id()};
std::map<MPPTaskId, MPPTaskPtr>::iterator it;
std::unique_lock<std::mutex> lock(mu);
auto ret = cv.wait_for(lock, timeout, [&] {
auto query_it = mpp_query_map.find(id.start_ts);
if (query_it == mpp_query_map.end() || query_it->second.to_be_cancelled)
{
/// if the query is cancelled, return false to make the finder fail quickly
LOG_WARNING(log, "Query " + std::to_string(id.start_ts) + " is cancelled, all its tasks are invalid.");
return false;
}
it = query_it->second.task_map.find(id);
return it != query_it->second.task_map.end();
});
return ret ? it->second : nullptr;
}
void unregisterTask(MPPTask * task);

void cancelMPPQuery(UInt64 query_id, const String & reason)
{
MPPQueryTaskSet task_set;
{
/// cancel task may take a long time, so first
/// set a flag, so we can cancel task one by
/// one without holding the lock
std::lock_guard<std::mutex> lock(mu);
auto it = mpp_query_map.find(query_id);
if (it == mpp_query_map.end() || it->second.to_be_cancelled)
return;
it->second.to_be_cancelled = true;
task_set = it->second;
}
LOG_WARNING(log, "Begin cancel query: " + std::to_string(query_id));
std::stringstream ss;
ss << "Remaining task in query " + std::to_string(query_id) + " are: ";
MPPTaskPtr findTaskWithTimeout(const mpp::TaskMeta & meta, std::chrono::seconds timeout, std::string & errMsg);

std::vector<std::thread> cancel_workers;
for (auto task_it = task_set.task_map.rbegin(); task_it != task_set.task_map.rend(); task_it++)
{
ss << task_it->first.toString() << " ";
std::thread t(&MPPTask::cancel, task_it->second, std::ref(reason));
cancel_workers.push_back(std::move(t));
}
LOG_WARNING(log, ss.str());
for (auto & worker : cancel_workers)
{
worker.join();
}
MPPQueryTaskSet canceled_task_set;
{
std::lock_guard<std::mutex> lock(mu);
/// just to double check the query still exists
auto it = mpp_query_map.find(query_id);
if (it != mpp_query_map.end())
{
/// hold the canceled task set, so the mpp task will not be deconstruct when holding the
/// `mu` of MPPTaskManager, otherwise it might cause deadlock
canceled_task_set = it->second;
mpp_query_map.erase(it);
}
}
LOG_WARNING(log, "Finish cancel query: " + std::to_string(query_id));
}
void cancelMPPQuery(UInt64 query_id, const String & reason);

String toString()
{
Expand Down

0 comments on commit 37740f7

Please sign in to comment.