From bec05c461971325fdce390f4b56d9dee42cf8cd0 Mon Sep 17 00:00:00 2001 From: Matti Kortelainen Date: Fri, 21 Jan 2022 09:04:30 -0800 Subject: [PATCH] [fwtest] Migrate to tbb::task_group Following https://github.com/cms-sw/cmssw/pull/32804 --- src/fwtest/Framework/EmptyWaitingTask.h | 27 -------- src/fwtest/Framework/FunctorTask.h | 16 ++--- src/fwtest/Framework/TaskBase.h | 65 +++++++++++++++++++ src/fwtest/Framework/WaitingTask.h | 33 +++++++--- src/fwtest/Framework/WaitingTaskHolder.h | 48 +++++++++++--- src/fwtest/Framework/WaitingTaskList.cc | 53 +++++++++++++-- src/fwtest/Framework/WaitingTaskList.h | 34 ++++------ .../Framework/WaitingTaskWithArenaHolder.cc | 29 +++++++-- .../Framework/WaitingTaskWithArenaHolder.h | 22 +++++-- src/fwtest/Framework/Worker.cc | 12 +--- src/fwtest/Framework/Worker.h | 44 ++++++------- src/fwtest/bin/EventProcessor.cc | 15 +++-- src/fwtest/bin/StreamSchedule.cc | 42 +++++++----- 13 files changed, 288 insertions(+), 152 deletions(-) delete mode 100644 src/fwtest/Framework/EmptyWaitingTask.h create mode 100644 src/fwtest/Framework/TaskBase.h diff --git a/src/fwtest/Framework/EmptyWaitingTask.h b/src/fwtest/Framework/EmptyWaitingTask.h deleted file mode 100644 index c8277e60d..000000000 --- a/src/fwtest/Framework/EmptyWaitingTask.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef EmptyWaitingTask_h -#define EmptyWaitingTask_h - -// from FWCore/Concurrency/interface/WaitingTaskList.h -#include "Framework/WaitingTask.h" - -namespace edm { - class EmptyWaitingTask : public WaitingTask { - public: - EmptyWaitingTask() = default; - - tbb::task* execute() override { return nullptr; } - }; - - namespace waitingtask { - struct TaskDestroyer { - void operator()(tbb::task* iTask) const { tbb::task::destroy(*iTask); } - }; - } // namespace waitingtask - ///Create an EmptyWaitingTask which will properly be destroyed - inline std::unique_ptr make_empty_waiting_task() { - return std::unique_ptr(new (tbb::task::allocate_root()) - edm::EmptyWaitingTask{}); - } -} // namespace edm - -#endif diff --git a/src/fwtest/Framework/FunctorTask.h b/src/fwtest/Framework/FunctorTask.h index 900263230..63ad45f6f 100644 --- a/src/fwtest/Framework/FunctorTask.h +++ b/src/fwtest/Framework/FunctorTask.h @@ -23,30 +23,26 @@ #include #include -#include - // user include files +#include "Framework/TaskBase.h" // forward declarations namespace edm { template - class FunctorTask : public tbb::task { + class FunctorTask : public TaskBase { public: explicit FunctorTask(F f) : func_(std::move(f)) {} - task* execute() override { - func_(); - return nullptr; - }; + void execute() final { func_(); }; private: F func_; }; - template - FunctorTask* make_functor_task(ALLOC&& iAlloc, F f) { - return new (iAlloc) FunctorTask(std::move(f)); + template + FunctorTask* make_functor_task(F f) { + return new FunctorTask(std::move(f)); } } // namespace edm diff --git a/src/fwtest/Framework/TaskBase.h b/src/fwtest/Framework/TaskBase.h new file mode 100644 index 000000000..9c8aeb37d --- /dev/null +++ b/src/fwtest/Framework/TaskBase.h @@ -0,0 +1,65 @@ +#ifndef FWCore_Concurrency_TaskBase_h +#define FWCore_Concurrency_TaskBase_h +// -*- C++ -*- +// +// Package: Concurrency +// Class : TaskBase +// +/**\class TaskBase TaskBase.h FWCore/Concurrency/interface/TaskBase.h + + Description: Base class for tasks. + + Usage: + Used as a callback to happen after a task has been completed. +*/ +// +// Original Author: Chris Jones +// Created: Tue Jan 5 13:46:31 CST 2020 +// $Id$ +// + +// system include files +#include +#include +#include + +// user include files + +// forward declarations + +namespace edm { + class TaskBase { + public: + friend class TaskSentry; + + ///Constructor + TaskBase() : m_refCount{0} {} + virtual ~TaskBase() = default; + + virtual void execute() = 0; + + void increment_ref_count() { ++m_refCount; } + unsigned int decrement_ref_count() { return --m_refCount; } + + private: + virtual void recycle() { delete this; } + + std::atomic m_refCount{0}; + }; + + class TaskSentry { + public: + TaskSentry(TaskBase* iTask) : m_task{iTask} {} + ~TaskSentry() { m_task->recycle(); } + TaskSentry() = delete; + TaskSentry(TaskSentry const&) = delete; + TaskSentry(TaskSentry&&) = delete; + TaskSentry operator=(TaskSentry const&) = delete; + TaskSentry operator=(TaskSentry&&) = delete; + + private: + TaskBase* m_task; + }; +} // namespace edm + +#endif diff --git a/src/fwtest/Framework/WaitingTask.h b/src/fwtest/Framework/WaitingTask.h index 361c2365f..a1730c1f5 100644 --- a/src/fwtest/Framework/WaitingTask.h +++ b/src/fwtest/Framework/WaitingTask.h @@ -23,9 +23,8 @@ #include #include -#include - // user include files +#include "Framework/TaskBase.h" // forward declarations @@ -34,7 +33,7 @@ namespace edm { class WaitingTaskHolder; class WaitingTaskWithArenaHolder; - class WaitingTask : public tbb::task { + class WaitingTask : public TaskBase { public: friend class WaitingTaskList; friend class WaitingTaskHolder; @@ -70,23 +69,37 @@ namespace edm { std::atomic m_ptr; }; + /** Use this class on the stack to signal the final task to be run. + Call done() to check to see if the task was run and check value of + exceptionPtr() to see if an exception was thrown by any task in the group. + */ + class FinalWaitingTask : public WaitingTask { + public: + FinalWaitingTask() : m_done{false} {} + + void execute() final { m_done = true; } + + bool done() const { return m_done.load(); } + + private: + void recycle() final {} + std::atomic m_done; + }; + template class FunctorWaitingTask : public WaitingTask { public: explicit FunctorWaitingTask(F f) : func_(std::move(f)) {} - task* execute() override { - func_(exceptionPtr()); - return nullptr; - }; + void execute() final { func_(exceptionPtr()); }; private: F func_; }; - template - FunctorWaitingTask* make_waiting_task(ALLOC&& iAlloc, F f) { - return new (iAlloc) FunctorWaitingTask(std::move(f)); + template + FunctorWaitingTask* make_waiting_task(F f) { + return new FunctorWaitingTask(std::move(f)); } } // namespace edm diff --git a/src/fwtest/Framework/WaitingTaskHolder.h b/src/fwtest/Framework/WaitingTaskHolder.h index d74c5d8ed..89ed970c0 100644 --- a/src/fwtest/Framework/WaitingTaskHolder.h +++ b/src/fwtest/Framework/WaitingTaskHolder.h @@ -20,6 +20,7 @@ // system include files #include +#include // user include files #include "Framework/WaitingTask.h" @@ -29,28 +30,50 @@ namespace edm { class WaitingTaskHolder { public: - WaitingTaskHolder() : m_task(nullptr) {} + friend class WaitingTaskList; + friend class WaitingTaskWithArenaHolder; - explicit WaitingTaskHolder(edm::WaitingTask* iTask) : m_task(iTask) { m_task->increment_ref_count(); } + WaitingTaskHolder() : m_task(nullptr), m_group(nullptr) {} + + explicit WaitingTaskHolder(tbb::task_group& iGroup, edm::WaitingTask* iTask) : m_task(iTask), m_group(&iGroup) { + m_task->increment_ref_count(); + } ~WaitingTaskHolder() { if (m_task) { doneWaiting(std::exception_ptr{}); } } - WaitingTaskHolder(const WaitingTaskHolder& iHolder) : m_task(iHolder.m_task) { m_task->increment_ref_count(); } + WaitingTaskHolder(const WaitingTaskHolder& iHolder) : m_task(iHolder.m_task), m_group(iHolder.m_group) { + m_task->increment_ref_count(); + } - WaitingTaskHolder(WaitingTaskHolder&& iOther) : m_task(iOther.m_task) { iOther.m_task = nullptr; } + WaitingTaskHolder(WaitingTaskHolder&& iOther) : m_task(iOther.m_task), m_group(iOther.m_group) { + iOther.m_task = nullptr; + } WaitingTaskHolder& operator=(const WaitingTaskHolder& iRHS) { WaitingTaskHolder tmp(iRHS); std::swap(m_task, tmp.m_task); + std::swap(m_group, tmp.m_group); + return *this; + } + + WaitingTaskHolder& operator=(WaitingTaskHolder&& iRHS) { + WaitingTaskHolder tmp(std::move(iRHS)); + std::swap(m_task, tmp.m_task); + std::swap(m_group, tmp.m_group); return *this; } // ---------- const member functions --------------------- - bool taskHasFailed() const { return m_task->exceptionPtr() != nullptr; } + bool taskHasFailed() const noexcept { return m_task->exceptionPtr() != nullptr; } + bool hasTask() const noexcept { return m_task != nullptr; } + /** since tbb::task_group is thread safe, we can return it non-const from here since + the object is not really part of the state of the holder + */ + tbb::task_group* group() const noexcept { return m_group; } // ---------- static member functions -------------------- // ---------- member functions --------------------------- @@ -59,7 +82,7 @@ namespace edm { failure before some other child task which may be run later reports a different, but related failure. You must later call doneWaiting in the same thread passing the same exceptoin. - */ + */ void presetTaskAsFailed(std::exception_ptr iExcept) { if (iExcept) { m_task->dependentTaskFailed(iExcept); @@ -70,20 +93,29 @@ namespace edm { if (iExcept) { m_task->dependentTaskFailed(iExcept); } - //spawn can run the task before we finish + //task_group::run can run the task before we finish // doneWaiting and some other thread might // try to reuse this object. Resetting // before spawn avoids problems auto task = m_task; m_task = nullptr; if (0 == task->decrement_ref_count()) { - tbb::task::spawn(*task); + m_group->run([task]() { + TaskSentry s{task}; + task->execute(); + }); } } private: + WaitingTask* release_no_decrement() noexcept { + auto t = m_task; + m_task = nullptr; + return t; + } // ---------- member data -------------------------------- WaitingTask* m_task; + tbb::task_group* m_group; }; } // namespace edm diff --git a/src/fwtest/Framework/WaitingTaskList.cc b/src/fwtest/Framework/WaitingTaskList.cc index 7b1754018..c9050cf2e 100644 --- a/src/fwtest/Framework/WaitingTaskList.cc +++ b/src/fwtest/Framework/WaitingTaskList.cc @@ -66,7 +66,7 @@ void WaitingTaskList::reset() { m_waiting = true; } -WaitingTaskList::WaitNode* WaitingTaskList::createNode(WaitingTask* iTask) { +WaitingTaskList::WaitNode* WaitingTaskList::createNode(tbb::task_group* iGroup, WaitingTask* iTask) { unsigned int index = m_lastAssignedCacheIndex++; WaitNode* returnValue; @@ -77,6 +77,7 @@ WaitingTaskList::WaitNode* WaitingTaskList::createNode(WaitingTask* iTask) { returnValue->m_fromCache = false; } returnValue->m_task = iTask; + returnValue->m_group = iGroup; //No other thread can see m_next yet. The caller to create node // will be doing a synchronization operation anyway which will // make sure m_task and m_next are synched across threads @@ -85,17 +86,53 @@ WaitingTaskList::WaitNode* WaitingTaskList::createNode(WaitingTask* iTask) { return returnValue; } -void WaitingTaskList::add(WaitingTask* iTask) { +void WaitingTaskList::add(WaitingTaskHolder iTask) { + if (!m_waiting) { + if (m_exceptionPtr) { + iTask.doneWaiting(m_exceptionPtr); + } + } else { + auto task = iTask.release_no_decrement(); + WaitNode* newHead = createNode(iTask.group(), task); + //This exchange is sequentially consistent thereby + // ensuring ordering between it and setNextNode + WaitNode* oldHead = m_head.exchange(newHead); + newHead->setNextNode(oldHead); + + //For the case where oldHead != nullptr, + // even if 'm_waiting' changed, we don't + // have to recheck since we beat 'announce()' in + // the ordering of 'm_head.exchange' call so iTask + // is guaranteed to be in the link list + + if (nullptr == oldHead) { + newHead->setNextNode(nullptr); + if (!m_waiting) { + //if finished waiting right before we did the + // exchange our task will not be run. Also, + // additional threads may be calling add() and swapping + // heads and linking us to the new head. + // It is safe to call announce from multiple threads + announce(); + } + } + } +} + +void WaitingTaskList::add(tbb::task_group* iGroup, WaitingTask* iTask) { iTask->increment_ref_count(); if (!m_waiting) { if (bool(m_exceptionPtr)) { iTask->dependentTaskFailed(m_exceptionPtr); } if (0 == iTask->decrement_ref_count()) { - tbb::task::spawn(*iTask); + iGroup->run([iTask]() { + TaskSentry s{iTask}; + iTask->execute(); + }); } } else { - WaitNode* newHead = createNode(iTask); + WaitNode* newHead = createNode(iGroup, iTask); //This exchange is sequentially consistent thereby // ensuring ordering between it and setNextNode WaitNode* oldHead = m_head.exchange(newHead); @@ -110,7 +147,7 @@ void WaitingTaskList::add(WaitingTask* iTask) { if (nullptr == oldHead) { if (!m_waiting) { //if finished waiting right before we did the - // exchange our task will not be spawned. Also, + // exchange our task will not be run. Also, // additional threads may be calling add() and swapping // heads and linking us to the new head. // It is safe to call announce from multiple threads @@ -149,6 +186,7 @@ void WaitingTaskList::announce() { hardware_pause(); } auto t = n->m_task; + auto g = n->m_group; if (bool(m_exceptionPtr)) { t->dependentTaskFailed(m_exceptionPtr); } @@ -160,7 +198,10 @@ void WaitingTaskList::announce() { //the task may indirectly call WaitingTaskList::reset // so we need to call spawn after we are done using the node. if (0 == t->decrement_ref_count()) { - tbb::task::spawn(*t); + g->run([t]() { + TaskSentry s{t}; + t->execute(); + }); } } } diff --git a/src/fwtest/Framework/WaitingTaskList.h b/src/fwtest/Framework/WaitingTaskList.h index 4e2c5e008..177336dba 100644 --- a/src/fwtest/Framework/WaitingTaskList.h +++ b/src/fwtest/Framework/WaitingTaskList.h @@ -75,28 +75,11 @@ // user include files #include "Framework/WaitingTask.h" +#include "Framework/WaitingTaskHolder.h" // forward declarations namespace edm { - class EmptyWaitingTask : public WaitingTask { - public: - EmptyWaitingTask() = default; - - tbb::task* execute() override { return nullptr; } - }; - - namespace waitingtask { - struct TaskDestroyer { - void operator()(tbb::task* iTask) const { tbb::task::destroy(*iTask); } - }; - } // namespace waitingtask - ///Create an EmptyWaitingTask which will properly be destroyed - inline std::unique_ptr make_empty_waiting_task() { - return std::unique_ptr(new (tbb::task::allocate_root()) - edm::EmptyWaitingTask{}); - } - class WaitingTaskList { public: ///Constructor @@ -105,6 +88,8 @@ namespace edm { * The value is only useful for optimization as the object can resize itself. */ explicit WaitingTaskList(unsigned int iInitialSize = 2); + WaitingTaskList(const WaitingTaskList&) = delete; // stop default + const WaitingTaskList& operator=(const WaitingTaskList&) = delete; // stop default ~WaitingTaskList() = default; // ---------- member functions --------------------------- @@ -122,7 +107,12 @@ namespace edm { * then be spawned. * Calls to add() and doneWaiting() can safely be done concurrently. */ - void add(WaitingTask*); + void add(tbb::task_group*, WaitingTask*); + + ///Adds task to the waiting list + /**Calls to add() and doneWaiting() can safely be done concurrently. + */ + void add(WaitingTaskHolder); ///Signals that the resource is now available and tasks should be spawned /**The owner of the resource calls this function to allow the waiting tasks to @@ -142,9 +132,6 @@ namespace edm { void reset(); private: - WaitingTaskList(const WaitingTaskList&) = delete; // stop default - const WaitingTaskList& operator=(const WaitingTaskList&) = delete; // stop default - /**Handles spawning the tasks, * safe to call from multiple threads */ @@ -152,6 +139,7 @@ namespace edm { struct WaitNode { WaitingTask* m_task; + tbb::task_group* m_group; std::atomic m_next; bool m_fromCache; @@ -160,7 +148,7 @@ namespace edm { WaitNode* nextNode() const { return m_next; } }; - WaitNode* createNode(WaitingTask* iTask); + WaitNode* createNode(tbb::task_group* iGroup, WaitingTask* iTask); // ---------- member data -------------------------------- std::atomic m_head; diff --git a/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc b/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc index 7c852d041..aa0740eb5 100644 --- a/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc +++ b/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc @@ -17,11 +17,16 @@ namespace edm { // Note that the arena will be the one containing the thread // that runs this constructor. This is the arena where you // eventually intend for the task to be spawned. - WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTask* iTask) - : m_task(iTask), m_arena(std::make_shared(tbb::task_arena::attach())) { + WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(tbb::task_group& iGroup, WaitingTask* iTask) + : m_task(iTask), m_group(&iGroup), m_arena(std::make_shared(tbb::task_arena::attach())) { m_task->increment_ref_count(); } + WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTaskHolder&& iTask) + : m_task(iTask.release_no_decrement()), + m_group(iTask.group()), + m_arena(std::make_shared(tbb::task_arena::attach())) {} + WaitingTaskWithArenaHolder::~WaitingTaskWithArenaHolder() { if (m_task) { doneWaiting(std::exception_ptr{}); @@ -29,20 +34,21 @@ namespace edm { } WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTaskWithArenaHolder const& iHolder) - : m_task(iHolder.m_task), m_arena(iHolder.m_arena) { + : m_task(iHolder.m_task), m_group(iHolder.m_group), m_arena(iHolder.m_arena) { if (m_task != nullptr) { m_task->increment_ref_count(); } } WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTaskWithArenaHolder&& iOther) - : m_task(iOther.m_task), m_arena(std::move(iOther.m_arena)) { + : m_task(iOther.m_task), m_group(iOther.m_group), m_arena(std::move(iOther.m_arena)) { iOther.m_task = nullptr; } WaitingTaskWithArenaHolder& WaitingTaskWithArenaHolder::operator=(const WaitingTaskWithArenaHolder& iRHS) { WaitingTaskWithArenaHolder tmp(iRHS); std::swap(m_task, tmp.m_task); + std::swap(m_group, tmp.m_group); std::swap(m_arena, tmp.m_arena); return *this; } @@ -50,6 +56,7 @@ namespace edm { WaitingTaskWithArenaHolder& WaitingTaskWithArenaHolder::operator=(WaitingTaskWithArenaHolder&& iRHS) { WaitingTaskWithArenaHolder tmp(std::move(iRHS)); std::swap(m_task, tmp.m_task); + std::swap(m_group, tmp.m_group); std::swap(m_arena, tmp.m_arena); return *this; } @@ -71,7 +78,12 @@ namespace edm { if (0 == task->decrement_ref_count()) { // The enqueue call will cause a worker thread to be created in // the arena if there is not one already. - m_arena->enqueue([task = task]() { tbb::task::spawn(*task); }); + m_arena->enqueue([task = task, group = m_group]() { + group->run([task]() { + TaskSentry s(task); + task->execute(); + }); + }); } } @@ -88,9 +100,14 @@ namespace edm { // the problem quickly). WaitingTaskHolder WaitingTaskWithArenaHolder::makeWaitingTaskHolderAndRelease() { - WaitingTaskHolder holder(m_task); + WaitingTaskHolder holder(*m_group, m_task); m_task->decrement_ref_count(); m_task = nullptr; return holder; } + + bool WaitingTaskWithArenaHolder::taskHasFailed() const noexcept { return m_task->exceptionPtr() != nullptr; } + + bool WaitingTaskWithArenaHolder::hasTask() const noexcept { return m_task != nullptr; } + } // namespace edm diff --git a/src/fwtest/Framework/WaitingTaskWithArenaHolder.h b/src/fwtest/Framework/WaitingTaskWithArenaHolder.h index 9dcbda804..bb646f56a 100644 --- a/src/fwtest/Framework/WaitingTaskWithArenaHolder.h +++ b/src/fwtest/Framework/WaitingTaskWithArenaHolder.h @@ -23,6 +23,7 @@ #include #include +#include namespace edm { @@ -36,7 +37,11 @@ namespace edm { // Note that the arena will be the one containing the thread // that runs this constructor. This is the arena where you // eventually intend for the task to be spawned. - explicit WaitingTaskWithArenaHolder(WaitingTask* iTask); + explicit WaitingTaskWithArenaHolder(tbb::task_group&, WaitingTask* iTask); + + // Takes ownership of the underlying task and uses the current + // arena. + explicit WaitingTaskWithArenaHolder(WaitingTaskHolder&& iTask); ~WaitingTaskWithArenaHolder(); @@ -67,9 +72,19 @@ namespace edm { // the problem quickly). WaitingTaskHolder makeWaitingTaskHolderAndRelease(); + bool taskHasFailed() const noexcept; + + bool hasTask() const noexcept; + + /** since oneapi::tbb::task_group is thread safe, we can return it non-const from here since + the object is not really part of the state of the holder + */ + tbb::task_group* group() const { return m_group; } + private: // ---------- member data -------------------------------- WaitingTask* m_task; + tbb::task_group* m_group; std::shared_ptr m_arena; }; @@ -84,10 +99,9 @@ namespace edm { }; } - template - auto make_waiting_task_with_holder(ALLOC&& iAlloc, WaitingTaskWithArenaHolder h, F&& f) { + template + auto make_waiting_task_with_holder(WaitingTaskWithArenaHolder h, F&& f) { return make_waiting_task( - std::forward(iAlloc), [holder = h, func = make_lambda_with_holder(h, std::forward(f))](std::exception_ptr const* excptr) mutable { if (excptr) { holder.doneWaiting(*excptr); diff --git a/src/fwtest/Framework/Worker.cc b/src/fwtest/Framework/Worker.cc index b3b3df74c..6c1a5a57d 100644 --- a/src/fwtest/Framework/Worker.cc +++ b/src/fwtest/Framework/Worker.cc @@ -1,25 +1,15 @@ #include "Framework/Worker.h" namespace edm { - void Worker::prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) { + void Worker::prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder iTask) { //std::cout << "prefetchAsync for " << this << " iTask " << iTask << std::endl; bool expected = false; if (prefetchRequested_.compare_exchange_strong(expected, true)) { //std::cout << "first prefetch call" << std::endl; - //Need to be sure the ref count isn't set to 0 immediately - iTask->increment_ref_count(); for (Worker* dep : itemsToGet_) { //std::cout << "calling doWorkAsync for " << dep << " with " << iTask << std::endl; dep->doWorkAsync(event, eventSetup, iTask); } - - auto count = iTask->decrement_ref_count(); - //std::cout << "count " << count << std::endl; - if (0 == count) { - //std::cout << "spawning iTask for " << this << " task " << iTask << std::endl; - //if everything finishes before we leave this routine, we need to launch the task - tbb::task::spawn(*iTask); - } } } } // namespace edm diff --git a/src/fwtest/Framework/Worker.h b/src/fwtest/Framework/Worker.h index 0a03670a4..cd5631c69 100644 --- a/src/fwtest/Framework/Worker.h +++ b/src/fwtest/Framework/Worker.h @@ -23,10 +23,10 @@ namespace edm { void setItemsToGet(std::vector workers) { itemsToGet_ = std::move(workers); } // thread safe - void prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask); + void prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder iTask); // not thread safe - virtual void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) = 0; + virtual void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder iTask) = 0; // not thread safe virtual void doEndJob() = 0; @@ -50,15 +50,15 @@ namespace edm { public: explicit WorkerT(ProductRegistry& reg) : producer_(reg) {} - void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) override { - waitingTasksWork_.add(iTask); + void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder task) override { + waitingTasksWork_.add(task); //std::cout << "doWorkAsync for " << this << " with iTask " << iTask << std::endl; bool expected = false; if (workStarted_.compare_exchange_strong(expected, true)) { //std::cout << "first doWorkAsync call" << std::endl; - WaitingTask* moduleTask = make_waiting_task( - tbb::task::allocate_root(), [this, &event, &eventSetup](std::exception_ptr const* iPtr) mutable { + WaitingTask* moduleTask = + make_waiting_task([this, &event, &eventSetup](std::exception_ptr const* iPtr) mutable { if (iPtr) { waitingTasksWork_.doneWaiting(*iPtr); } else { @@ -73,26 +73,26 @@ namespace edm { waitingTasksWork_.doneWaiting(exceptionPtr); } }); + auto* group = task.group(); if (producer_.hasAcquire()) { - WaitingTaskWithArenaHolder runProduceHolder{moduleTask}; - moduleTask = make_waiting_task(tbb::task::allocate_root(), - [this, &event, &eventSetup, runProduceHolder = std::move(runProduceHolder)]( + WaitingTaskWithArenaHolder runProduceHolder{*group, moduleTask}; + moduleTask = make_waiting_task([this, &event, &eventSetup, runProduceHolder = std::move(runProduceHolder)]( std::exception_ptr const* iPtr) mutable { - if (iPtr) { - runProduceHolder.doneWaiting(*iPtr); - } else { - std::exception_ptr exceptionPtr; - try { - producer_.doAcquire(event, eventSetup, runProduceHolder); - } catch (...) { - exceptionPtr = std::current_exception(); - } - runProduceHolder.doneWaiting(exceptionPtr); - } - }); + if (iPtr) { + runProduceHolder.doneWaiting(*iPtr); + } else { + std::exception_ptr exceptionPtr; + try { + producer_.doAcquire(event, eventSetup, runProduceHolder); + } catch (...) { + exceptionPtr = std::current_exception(); + } + runProduceHolder.doneWaiting(exceptionPtr); + } + }); } //std::cout << "calling prefetchAsync " << this << " with moduleTask " << moduleTask << std::endl; - prefetchAsync(event, eventSetup, moduleTask); + prefetchAsync(event, eventSetup, WaitingTaskHolder(*group, moduleTask)); } } diff --git a/src/fwtest/bin/EventProcessor.cc b/src/fwtest/bin/EventProcessor.cc index 33d5548ef..bc25e46be 100644 --- a/src/fwtest/bin/EventProcessor.cc +++ b/src/fwtest/bin/EventProcessor.cc @@ -1,4 +1,3 @@ -#include "Framework/EmptyWaitingTask.h" #include "Framework/ESPluginFactory.h" #include "Framework/WaitingTask.h" #include "Framework/WaitingTaskHolder.h" @@ -29,14 +28,16 @@ namespace edm { void EventProcessor::runToCompletion() { source_.startProcessing(); // The task that waits for all other work - auto globalWaitTask = make_empty_waiting_task(); - globalWaitTask->increment_ref_count(); + FinalWaitingTask globalWaitTask; + tbb::task_group group; for (auto& s : schedules_) { - s.runToCompletionAsync(WaitingTaskHolder(globalWaitTask.get())); + s.runToCompletionAsync(WaitingTaskHolder(group, &globalWaitTask)); } - globalWaitTask->wait_for_all(); - if (globalWaitTask->exceptionPtr()) { - std::rethrow_exception(*(globalWaitTask->exceptionPtr())); + do { + group.wait(); + } while (not globalWaitTask.done()); + if (globalWaitTask.exceptionPtr()) { + std::rethrow_exception(*(globalWaitTask.exceptionPtr())); } } diff --git a/src/fwtest/bin/StreamSchedule.cc b/src/fwtest/bin/StreamSchedule.cc index 8636bce24..4a18c39a3 100644 --- a/src/fwtest/bin/StreamSchedule.cc +++ b/src/fwtest/bin/StreamSchedule.cc @@ -43,12 +43,18 @@ namespace edm { StreamSchedule& StreamSchedule::operator=(StreamSchedule&&) = default; void StreamSchedule::runToCompletionAsync(WaitingTaskHolder h) { - auto task = - make_functor_task(tbb::task::allocate_root(), [this, h]() mutable { processOneEventAsync(std::move(h)); }); + auto task = make_functor_task([this, h]() mutable { processOneEventAsync(std::move(h)); }); if (streamId_ == 0) { - tbb::task::spawn(*task); + h.group()->run([task]() { + TaskSentry s{task}; + task->execute(); + }); } else { - tbb::task::enqueue(*task); + tbb::task_arena arena{tbb::task_arena::attach()}; + arena.enqueue([task]() { + TaskSentry s{task}; + task->execute(); + }); } } @@ -59,27 +65,27 @@ namespace edm { // Pass a non-owning pointer to the event to preceding tasks //std::cout << "Begin processing event " << event->eventID() << std::endl; auto eventPtr = event.get(); + auto* group = h.group(); auto nextEventTask = - make_waiting_task(tbb::task::allocate_root(), - [this, h = std::move(h), ev = std::move(event)](std::exception_ptr const* iPtr) mutable { - ev.reset(); - if (iPtr) { - h.doneWaiting(*iPtr); - } else { - for (auto const& worker : path_) { - worker->reset(); - } - processOneEventAsync(std::move(h)); - } - }); + make_waiting_task([this, h = std::move(h), ev = std::move(event)](std::exception_ptr const* iPtr) mutable { + ev.reset(); + if (iPtr) { + h.doneWaiting(*iPtr); + } else { + for (auto const& worker : path_) { + worker->reset(); + } + processOneEventAsync(std::move(h)); + } + }); // To guarantee that the nextEventTask is spawned also in // absence of Workers, and also to prevent spawning it before // all workers have been processed (should not happen though) - auto nextEventTaskHolder = WaitingTaskHolder(nextEventTask); + auto nextEventTaskHolder = WaitingTaskHolder(*group, nextEventTask); for (auto iWorker = path_.rbegin(); iWorker != path_.rend(); ++iWorker) { //std::cout << "calling doWorkAsync for " << iWorker->get() << " with nextEventTask " << nextEventTask << std::endl; - (*iWorker)->doWorkAsync(*eventPtr, *eventSetup_, nextEventTask); + (*iWorker)->doWorkAsync(*eventPtr, *eventSetup_, nextEventTaskHolder); } } else { h.doneWaiting(std::exception_ptr{});