-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tbb::task_group does not work well with serial task queues #346
Comments
Here is the example code #if defined(USE_RESUMABLE_TASKS)
#define TBB_PREVIEW_RESUMABLE_TASKS 1
#include <tbb/task.h>
#endif
#include <tbb/task_group.h>
#include <tbb/global_control.h>
#include <iostream>
#include <atomic>
#include <tbb/concurrent_queue.h>
#include <functional>
//Serializes task calls
class TaskQueue {
private:
class QueuedTask;
public:
template<typename F>
void push(tbb::task_group& iGroup, F&& iF) {
queue_.push(new QueuedTask(iGroup, std::function<void()>(iF), this) );
bool expected = false;
if(started_.compare_exchange_strong(expected, true) ) {
QueuedTask* next = nullptr;
queue_.try_pop(next);
next->runAsync();
}
}
private:
friend class QueuedTask;
QueuedTask* pop() {
QueuedTask* next = nullptr;
queue_.try_pop(next);
if(nullptr == next) { started_ = false; }
return next;
}
//When a task finishes it automatically starts
// the next task in the queue
class QueuedTask {
public:
QueuedTask(tbb::task_group& iGroup, std::function<void()> iFunc, TaskQueue* iQueue):
group_(&iGroup), f_(std::move(iFunc)), queue_(iQueue) {}
void runAsync() {
group_->run([this]() {
f_();
auto next = queue_->pop();
if(next) { next->runAsync(); }
delete this;
});
}
private:
tbb::task_group* group_;
std::function<void()> f_;
TaskQueue* queue_;
};
tbb::concurrent_queue<QueuedTask*> queue_;
std::atomic<bool> started_{false};
};
int main(int argc, char const* const* argv ) {
tbb::global_control gc(tbb::global_control::max_allowed_parallelism, std::stoi(argv[1]));
tbb::task_group outer;
TaskQueue taskQueue;
//note: without this outer.run the tbb::task::suspend seg faults
outer.run([&outer,&taskQueue]() {
taskQueue.push(outer,[]() { std::cout<<"first"<<std::endl;});
taskQueue.push(outer,[]() { std::cout<<"second"<<std::endl;});
std::cout <<" inner wait started"<<std::endl;
#if defined(USE_RESUMABLE_TASKS)
tbb::task::suspend([&outer,&taskQueue](tbb::task::suspend_point tag) {
taskQueue.push(outer,[&tag]() {
std::cout <<"inner"<<std::endl;
tbb::task::resume(tag);
});
});
#else
{
tbb::task_group inner;
std::atomic<bool> wasRun{false};
taskQueue.push(inner, [&wasRun]() {std::cout <<"inner"<<std::endl; wasRun = true;});
do {
inner.wait();
}while(not wasRun);
}
#endif
std::cout <<" inner wait finished"<<std::endl;
taskQueue.push(outer,[]() { std::cout<<"last"<<std::endl;});
});
outer.wait();
} |
@Dr15Jones, class QueuedTask {
public:
QueuedTask(tbb::task_group& iGroup, std::function<void()> iFunc, TaskQueue* iQueue):
group_(&iGroup),
th_(iGroup.defer([this, iFunc, iQueue]{
iFunc();
auto next = iQueue->pop();
if(next) { next->runAsync(); }
delete this;
}))
{}
void runAsync() {
group_->run(std::move(th_));
}
private:
tbb::task_group* group_;
tbb::task_handle th_;
}; here is the full sample (slightly changed to always use single thread): https://godbolt.org/z/qYbxWerYT |
@Dr15Jones , @makortel do you need more help with this issue ? |
@anton-potapov we haven't yet had an opportunity to try it in our 'full' application. It does appear to work in the test code which had a referring pull request. |
@Dr15Jones is this issue still relevant for you, did you had a chance to try proposed solution? Could you please respond? |
@Dr15Jones kindly ping you. |
@isaevil sorry, the messages from this GitHub repository are being sent to my spam filter so I'm not seeing them promptly. Using Thanks for reaching out to be sure the issue was successful. |
@Dr15Jones glad that proposed solution worked out. If this issue is successful, I would kindly ask you to close it :) |
In our application we protect non-thread safe shared resources via serial task queues. When part of the application needs to use the shared resource it pushes a task into the appropriate serial task queue. The serial task queue makes sure that 1 and only 1 task it holds runs at a time. These tasks are usually set to run asynchronously, although a few places require them to happen synchronously. This all worked fine with the old
tbb::task
interface as the synchronous calls could just do the wait on the task which was added to the queue. Problems arise when usingtbb::task_group
.The application needs to be able to wait for tasks to finish at several different spots. There is a wait that happens on the main thread to wait for all work of the application to finish. There are also waits that happen synchronously from within a running task. When all we have are
tbb::task_groups
it means each wait has to use a differenttbb::task_group
. This is a problem for the synchronous waits. So if atbb::task_group
has tasks presently running on other threads and there is no more tasks for the thread doing the wait to run, then tasks from othertbb::task_groups
can be run on the waiting threads. However, if there are no running tasks from thetbb::task_group
then thattbb::task_group
will immediately stop waiting. If one has a queue of tasks and the tasks earlier in the queue are from a differenttbb::task_group
and the tasks from the synchronous waitingtbb::task_group
are later in the queue then there is no guarantee that the earlier tasks will ever be run (as the waitingtbb::task_group
will not run them as its tasks have yet to be started).This is illustrated in the code added as an additional comment to this issue. The code issues two tasks on the 'outer' (i.e. main)
tbb::task_group
and then attempts to do a synchronous wait for an 'inner' task to complete. If the program runs with >1 thread, the program works fine. If run with just 1 thread none of the tasks will be run.If we use the resumable task API preview feature:
https://www.threadingbuildingblocks.org/docs/help/index.htm#reference/appendices/preview_features/resumable_tasks.html
the it is possible to safely do a synchronous wait.
Given the ability to use serial task queues with synchronous waits is a 'deal breaker' for use (we've been using TBB for 5 years now) we would like to know
tbb::task_groups
in this way, orThe text was updated successfully, but these errors were encountered: