From 59f647fa0f85746e5fb930bf22c8d93302f64a3f Mon Sep 17 00:00:00 2001 From: Kevin Parks Date: Fri, 12 Mar 2021 13:06:21 -0800 Subject: [PATCH] Provide scheduler_ptr to _ScheduleFuncWithAutoInline The ambient_scheduler may not be the desired scheduler to schedule functions on, but _ScheduleFuncWithAutoInline does not offer any other option. Provide a means to select a desired scheduler and fall back to the ambient scheduler if not specified. --- Release/include/pplx/pplx.h | 14 ++++-- Release/include/pplx/pplxtasks.h | 7 ++- Release/src/pplx/pplxtasks.cpp | 7 +-- .../pplx/pplx_test/pplx_task_options.cpp | 46 +++++++++++++++++++ 4 files changed, 66 insertions(+), 8 deletions(-) diff --git a/Release/include/pplx/pplx.h b/Release/include/pplx/pplx.h index a8b6ab19b8..c66153176b 100644 --- a/Release/include/pplx/pplx.h +++ b/Release/include/pplx/pplx.h @@ -176,7 +176,7 @@ class _TaskCollectionImpl scheduler_ptr _GetScheduler() const { return _M_pScheduler; } // Fire and forget - static void _RunTask(TaskProc_t _Proc, void* _Parameter, _TaskInliningMode _InliningMode) + static void _RunTask(TaskProc_t _Proc, void* _Parameter, _TaskInliningMode _InliningMode, scheduler_ptr _Scheduler) { if (_InliningMode == _ForceInline) { @@ -184,8 +184,16 @@ class _TaskCollectionImpl } else { - // Schedule the work on the ambient scheduler - get_ambient_scheduler()->schedule(_Proc, _Parameter); + if (_Scheduler.get()) + { + // Schedule the work on the desired scheduler + _Scheduler->schedule(_Proc, _Parameter); + } + else + { + // Schedule the work on the ambient scheduler as a fallback + get_ambient_scheduler()->schedule(_Proc, _Parameter); + } } } diff --git a/Release/include/pplx/pplxtasks.h b/Release/include/pplx/pplxtasks.h index affe051aac..ec30826096 100644 --- a/Release/include/pplx/pplxtasks.h +++ b/Release/include/pplx/pplxtasks.h @@ -603,7 +603,10 @@ struct _TaskProcThunk /// /// The inlining scheduling policy for current functor. /// -void _ScheduleFuncWithAutoInline(const std::function& _Func, _TaskInliningMode_t _InliningMode); +/// +/// The intended Scheduler to run the task on. +/// +void _ScheduleFuncWithAutoInline(const std::function& _Func, _TaskInliningMode_t _InliningMode, scheduler_ptr _Scheduler); class _ContextCallback { @@ -1671,7 +1674,7 @@ struct _Task_impl : public _Task_impl_base if (_M_Continuations) { // Scheduling cancellation with automatic inlining. - _ScheduleFuncWithAutoInline([=]() { _RunTaskContinuations(); }, details::_DefaultAutoInline); + _ScheduleFuncWithAutoInline([=]() { _RunTaskContinuations(); }, details::_DefaultAutoInline, _M_TaskCollection._GetScheduler()); } } return true; diff --git a/Release/src/pplx/pplxtasks.cpp b/Release/src/pplx/pplxtasks.cpp index e8d5b2e069..04fb1c9539 100644 --- a/Release/src/pplx/pplxtasks.cpp +++ b/Release/src/pplx/pplxtasks.cpp @@ -79,9 +79,9 @@ namespace details delete _M_pThunk; } - void _ScheduleFuncWithAutoInline(const std::function & _Func, _TaskInliningMode_t _InliningMode) + void _ScheduleFuncWithAutoInline(const std::function & _Func, _TaskInliningMode_t _InliningMode, scheduler_ptr _Scheduler) { - _TaskCollection_t::_RunTask(&_TaskProcThunk::_Bridge, new _TaskProcThunk(_Func), _InliningMode); + _TaskCollection_t::_RunTask(&_TaskProcThunk::_Bridge, new _TaskProcThunk(_Func), _InliningMode, _Scheduler); } #if defined (__cplusplus_winrt) @@ -965,7 +965,8 @@ namespace details } } }, - _PTaskHandle->_M_inliningMode); + _PTaskHandle->_M_inliningMode, + _M_TaskCollection._GetScheduler()); } else { diff --git a/Release/tests/functional/pplx/pplx_test/pplx_task_options.cpp b/Release/tests/functional/pplx/pplx_test/pplx_task_options.cpp index 983e9cc06e..926bd5fa4b 100644 --- a/Release/tests/functional/pplx/pplx_test/pplx_task_options.cpp +++ b/Release/tests/functional/pplx/pplx_test/pplx_task_options.cpp @@ -39,6 +39,7 @@ class TaskOptionsTestScheduler : public pplx::scheduler_interface { public: TaskOptionsTestScheduler() : m_numTasks(0), m_scheduler(get_scheduler()) {} + TaskOptionsTestScheduler(std::shared_ptr scheduler) : m_numTasks(0), m_scheduler(std::move(scheduler)) {} virtual void schedule(pplx::TaskProc_t proc, void* param) { @@ -159,6 +160,24 @@ SUITE(pplx_task_options_tests) VERIFY_ARE_EQUAL(sched1.get_num_tasks(), 1); VERIFY_ARE_EQUAL(sched2.get_num_tasks(), 2); } + + TEST(then_from_exception_custom_scheduler_test) + { + class custom_direct_executor : public pplx::scheduler_interface + { + public: + virtual void schedule(pplx::TaskProc_t proc, _In_ void* param) { proc(param); } + }; + + TaskOptionsTestScheduler sched(std::make_shared()); + long n = 0; + + auto t1 = pplx::create_task([&n]() { n++; throw std::runtime_error("exception"); }, sched); + t1.then([&n](pplx::task task_result) { n++; try { task_result.get(); } catch (...){} }) // inherit sched + .wait(); + + VERIFY_ARE_EQUAL(sched.get_num_tasks(), n); + } TEST(opand_nooptions_test) { @@ -260,6 +279,33 @@ SUITE(pplx_task_options_tests) VERIFY_ARE_EQUAL(sched2.get_num_tasks(), 0); } + TEST(whenall_then_from_exception_custom_scheduler_test) + { + class custom_direct_executor : public pplx::scheduler_interface + { + public: + virtual void schedule(pplx::TaskProc_t proc, _In_ void* param) { proc(param); } + }; + + TaskOptionsTestScheduler sched(std::make_shared()); + + std::vector> taskVect; + const int task_count = 3; + long n = 0; + for (int i = 0; i < (task_count-1); i++) + { + taskVect.push_back(pplx::create_task([&n]() {n++;}, sched)); + } + taskVect.push_back(pplx::create_task([&n]() { n++; throw std::runtime_error("exception");}, sched)); + + auto t3 = pplx::when_all( + begin(taskVect), end(taskVect), sched); + n++; // sched used within when_all + t3.then([&n](pplx::task task_result) { n++; try { task_result.get(); } catch (...){} }, sched).wait(); + + VERIFY_ARE_EQUAL(sched.get_num_tasks(), n); + } + TEST(opor_nooptions_test) { TaskOptionsTestScheduler sched;