Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tbb::task_group does not work well with serial task queues #346

Closed
Dr15Jones opened this issue Feb 11, 2021 · 8 comments
Closed

tbb::task_group does not work well with serial task queues #346

Dr15Jones opened this issue Feb 11, 2021 · 8 comments

Comments

@Dr15Jones
Copy link

Dr15Jones commented Feb 11, 2021

In our application we protect non-thread safe shared resources via serial task queues. When part of the application needs to use the shared resource it pushes a task into the appropriate serial task queue. The serial task queue makes sure that 1 and only 1 task it holds runs at a time. These tasks are usually set to run asynchronously, although a few places require them to happen synchronously. This all worked fine with the old tbb::task interface as the synchronous calls could just do the wait on the task which was added to the queue. Problems arise when using tbb::task_group.

The application needs to be able to wait for tasks to finish at several different spots. There is a wait that happens on the main thread to wait for all work of the application to finish. There are also waits that happen synchronously from within a running task. When all we have are tbb::task_groups it means each wait has to use a different tbb::task_group. This is a problem for the synchronous waits. So if a tbb::task_group has tasks presently running on other threads and there is no more tasks for the thread doing the wait to run, then tasks from other tbb::task_groups can be run on the waiting threads. However, if there are no running tasks from the tbb::task_group then that tbb::task_group will immediately stop waiting. If one has a queue of tasks and the tasks earlier in the queue are from a different tbb::task_group and the tasks from the synchronous waiting tbb::task_group are later in the queue then there is no guarantee that the earlier tasks will ever be run (as the waiting tbb::task_group will not run them as its tasks have yet to be started).

This is illustrated in the code added as an additional comment to this issue. The code issues two tasks on the 'outer' (i.e. main) tbb::task_group and then attempts to do a synchronous wait for an 'inner' task to complete. If the program runs with >1 thread, the program works fine. If run with just 1 thread none of the tasks will be run.

If we use the resumable task API preview feature:
https://www.threadingbuildingblocks.org/docs/help/index.htm#reference/appendices/preview_features/resumable_tasks.html
the it is possible to safely do a synchronous wait.

Given the ability to use serial task queues with synchronous waits is a 'deal breaker' for use (we've been using TBB for 5 years now) we would like to know

  • is there a way to safely use tbb::task_groups in this way, or
  • will the resumable task API (or something similar) be added to the production release of TBB in the near (say 3 month) future?
@Dr15Jones
Copy link
Author

Here is the example code

#if defined(USE_RESUMABLE_TASKS)
#define TBB_PREVIEW_RESUMABLE_TASKS 1
#include <tbb/task.h>
#endif
#include <tbb/task_group.h>
#include <tbb/global_control.h>
#include <iostream>
#include <atomic>
#include <tbb/concurrent_queue.h>
#include <functional>

//Serializes task calls
class TaskQueue {
private:
  class QueuedTask;
public:

  template<typename F>
  void push(tbb::task_group& iGroup, F&& iF) {
    queue_.push(new QueuedTask(iGroup, std::function<void()>(iF), this) );

    bool expected = false;
    if(started_.compare_exchange_strong(expected, true) ) {
      QueuedTask* next = nullptr;
      queue_.try_pop(next);
      next->runAsync();
    }
  }

private:
  friend class QueuedTask;

  QueuedTask* pop() {
    QueuedTask* next = nullptr;
    queue_.try_pop(next);
    if(nullptr == next) { started_ = false; }
    return next;
  }

  //When a task finishes it automatically starts
  // the next task in the queue
  class QueuedTask {
  public:
    QueuedTask(tbb::task_group& iGroup, std::function<void()> iFunc, TaskQueue* iQueue): 
      group_(&iGroup), f_(std::move(iFunc)), queue_(iQueue) {}
    
    void runAsync() {
      group_->run([this]() {
	  f_();
	  auto next = queue_->pop();
	  if(next) { next->runAsync(); }
	  delete this;
	});
    }
  private:
    tbb::task_group* group_;
    std::function<void()> f_;
    TaskQueue* queue_;
  };
  
  tbb::concurrent_queue<QueuedTask*> queue_;
  std::atomic<bool> started_{false};
};


int main(int argc, char const* const* argv ) {

  tbb::global_control gc(tbb::global_control::max_allowed_parallelism, std::stoi(argv[1]));

  tbb::task_group outer;
  TaskQueue taskQueue;

  //note: without this outer.run the tbb::task::suspend seg faults
  outer.run([&outer,&taskQueue]() {
      taskQueue.push(outer,[]() { std::cout<<"first"<<std::endl;});
      taskQueue.push(outer,[]() { std::cout<<"second"<<std::endl;});
      std::cout <<" inner wait started"<<std::endl;
#if defined(USE_RESUMABLE_TASKS)
      tbb::task::suspend([&outer,&taskQueue](tbb::task::suspend_point tag) {
	  taskQueue.push(outer,[&tag]() {
	      std::cout <<"inner"<<std::endl;
	      tbb::task::resume(tag);
	    });
	});
#else
      {
	tbb::task_group inner;
	std::atomic<bool> wasRun{false};
	taskQueue.push(inner, [&wasRun]() {std::cout <<"inner"<<std::endl; wasRun = true;});
	do {
	  inner.wait();
	}while(not wasRun);
      }
#endif
      std::cout <<" inner wait finished"<<std::endl;
      taskQueue.push(outer,[]() { std::cout<<"last"<<std::endl;});
    });
  outer.wait();
}

@anton-potapov
Copy link
Contributor

@Dr15Jones, task_group::defer is the solution to this problem. QueuedTask should create a deferred task (task_handle) via call to task_group::defer, thus blocking the task_group::wait until the task, stored in the task_handle, is actually run. :

  class QueuedTask {
  public:
    QueuedTask(tbb::task_group& iGroup, std::function<void()> iFunc, TaskQueue* iQueue): 
      group_(&iGroup), 
      th_(iGroup.defer([this, iFunc, iQueue]{
        iFunc();
        auto next = iQueue->pop();
        if(next) { next->runAsync(); }
        delete this;
      }))
      {}
    
    void runAsync() {
        group_->run(std::move(th_));
    }
  private:
    tbb::task_group* group_;
    tbb::task_handle th_;
  };

here is the full sample (slightly changed to always use single thread): https://godbolt.org/z/qYbxWerYT

@anton-potapov
Copy link
Contributor

@Dr15Jones , @makortel do you need more help with this issue ?

@Dr15Jones
Copy link
Author

@anton-potapov we haven't yet had an opportunity to try it in our 'full' application. It does appear to work in the test code which had a referring pull request.

@isaevil
Copy link
Contributor

isaevil commented Oct 5, 2022

@Dr15Jones is this issue still relevant for you, did you had a chance to try proposed solution? Could you please respond?

@isaevil
Copy link
Contributor

isaevil commented Oct 25, 2022

@Dr15Jones kindly ping you.

@Dr15Jones
Copy link
Author

@isaevil sorry, the messages from this GitHub repository are being sent to my spam filter so I'm not seeing them promptly.

Using tbb::task_group::defer did allow us to wait in the way we needed. The only funny bit is we always pass a do nothing lambda to defer since we only need it to keep the task_group waiting.

https://github.com/cms-sw/cmssw/blob/9976f1e5c7451f1d2e1ff9f3213ef87e3bf9ff28/FWCore/Concurrency/interface/FinalWaitingTask.h#L34-L35

Thanks for reaching out to be sure the issue was successful.

@isaevil
Copy link
Contributor

isaevil commented Oct 26, 2022

@Dr15Jones glad that proposed solution worked out. If this issue is successful, I would kindly ask you to close it :)

@isaevil isaevil closed this as completed Nov 2, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants