From 19af6f382570bdb29ad192bf0289e440fc850cfb Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Wed, 12 Aug 2020 09:54:40 -0700 Subject: [PATCH 1/7] #981: test: relax test again using new percent tolerance --- .../timetrigger/test_time_trigger.extended.cc | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/unit/timetrigger/test_time_trigger.extended.cc b/tests/unit/timetrigger/test_time_trigger.extended.cc index 9228996fa8..5f5d267f46 100644 --- a/tests/unit/timetrigger/test_time_trigger.extended.cc +++ b/tests/unit/timetrigger/test_time_trigger.extended.cc @@ -82,7 +82,7 @@ TEST_F(TestTimeTrigger, test_time_trigger_1) { sleep_for(5ms); } while (vt::timing::Timing::getCurrentTime() - cur_time < total_time/1000); - int tolerance = 5; + int tolerance = 15; // Allow for some error tolerance in the number of triggers given the period EXPECT_LE(triggered, (total_time / trigger_period.count()) + tolerance); @@ -129,12 +129,19 @@ TEST_F(TestTimeTrigger, test_time_trigger_2) { testTime->progress(); } while (vt::timing::Timing::getCurrentTime() - cur_time < total_time/1000); - int tolerance = 5; + // tolerance of 80% of expected triggers + double tolerance = 0.8; // Allow for some error tolerance in the number of triggers given the period for (int i = 0; i < 3; i++) { - EXPECT_LE(triggered[i], (total_time / trigger_period[i].count()) + tolerance); - EXPECT_GE(triggered[i], (total_time / trigger_period[i].count()) - tolerance); + EXPECT_LE( + triggered[i], + (total_time / trigger_period[i].count()) + triggered[i] * tolerance + ); + EXPECT_GE( + triggered[i], + (total_time / trigger_period[i].count()) - triggered[i] * tolerance + ); } for (int i = 0; i < 3; i++) { From a467cea6abab16a5a2e8bfd36d2a3db21c73a482 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Wed, 12 Aug 2020 10:15:22 -0700 Subject: [PATCH 2/7] #981: test: fix warning --- tests/unit/timetrigger/test_time_trigger.extended.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/timetrigger/test_time_trigger.extended.cc b/tests/unit/timetrigger/test_time_trigger.extended.cc index 5f5d267f46..d69552b226 100644 --- a/tests/unit/timetrigger/test_time_trigger.extended.cc +++ b/tests/unit/timetrigger/test_time_trigger.extended.cc @@ -118,7 +118,7 @@ TEST_F(TestTimeTrigger, test_time_trigger_2) { for (int i = 0; i < 3; i++) { testTime->addTrigger( - trigger_period[i], [&triggered,i,&cur_time]{ + trigger_period[i], [&triggered,i]{ triggered[i]++; }, true From 9664b71908a5c5912842c17ae75d0f7870a47b03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cezary=20Skrzy=C5=84ski?= Date: Fri, 7 Aug 2020 09:19:40 +0200 Subject: [PATCH 3/7] #955 use runInEpochCollective instead of addAction --- tests/unit/group/test_group.cc | 137 +++++++++------------ tests/unit/location/test_hops.extended.cc | 21 +--- tests/unit/location/test_location_common.h | 33 ++--- 3 files changed, 72 insertions(+), 119 deletions(-) diff --git a/tests/unit/group/test_group.cc b/tests/unit/group/test_group.cc index d0614f4823..0052097d01 100644 --- a/tests/unit/group/test_group.cc +++ b/tests/unit/group/test_group.cc @@ -74,25 +74,27 @@ TEST_F(TestGroup, test_group_range_construct_1) { auto const& num_nodes = theContext()->getNumNodes(); NodeType const lo = 0; NodeType const hi = num_nodes / 2; - if (this_node == 0) { - auto list = std::make_unique(lo,hi); - theGroup()->newGroup( - std::move(list), [](GroupType group){ - fmt::print("Group is created: group={:x}\n", group); - auto msg = makeMessage(); - envelopeSetGroup(msg->env, group); - theMsg()->broadcastMsg(msg.get()); - } - ); - } - theTerm()->addAction([=]{ - if (this_node >= lo && this_node < hi) { - EXPECT_EQ(num_recv, 1); - } else { - EXPECT_EQ(num_recv, 0); + + runInEpochCollective([&]{ + if (this_node == 0) { + auto list = std::make_unique(lo,hi); + theGroup()->newGroup( + std::move(list), [](GroupType group){ + fmt::print("Group is created: group={:x}\n", group); + auto msg = makeMessage(); + envelopeSetGroup(msg->env, group); + theMsg()->broadcastMsg(msg.get()); + } + ); } - num_recv = 0; }); + + if (this_node >= lo && this_node < hi) { + EXPECT_EQ(num_recv, 1); + } else { + EXPECT_EQ(num_recv, 0); + } + num_recv = 0; } TEST_F(TestGroup, test_group_range_construct_2) { @@ -101,76 +103,55 @@ TEST_F(TestGroup, test_group_range_construct_2) { NodeType const lo = 1; NodeType const max_val = 5; NodeType const hi = std::min(num_nodes,max_val); - if (this_node == 0) { - auto list = std::make_unique(lo,hi); - theGroup()->newGroup( - std::move(list), [](GroupType group){ - fmt::print("Group is created: group={:x}\n", group); - auto msg = makeMessage(); - envelopeSetGroup(msg->env, group); - theMsg()->broadcastMsg(msg.get()); - } - ); - } - theTerm()->addAction([=]{ - if (this_node >= lo && this_node < hi) { - EXPECT_EQ(num_recv, 1); - } else { - EXPECT_EQ(num_recv, 0); + + + runInEpochCollective([&]{ + if (this_node == 0) { + auto list = std::make_unique(lo,hi); + theGroup()->newGroup( + std::move(list), [](GroupType group){ + fmt::print("Group is created: group={:x}\n", group); + auto msg = makeMessage(); + envelopeSetGroup(msg->env, group); + theMsg()->broadcastMsg(msg.get()); + } + ); } - num_recv = 0; }); + + if (this_node >= lo && this_node < hi) { + EXPECT_EQ(num_recv, 1); + } else { + EXPECT_EQ(num_recv, 0); + } + num_recv = 0; } TEST_F(TestGroup, test_group_collective_construct_1) { auto const& this_node = theContext()->getNode(); auto const& num_nodes = theContext()->getNumNodes(); bool const node_filter = this_node % 2 == 0; - theGroup()->newGroupCollective( - node_filter, [=](GroupType group) { - auto const& in_group = theGroup()->inGroup(group); - auto const& is_default_group = theGroup()->groupDefault(group); - EXPECT_EQ(in_group, node_filter); - EXPECT_EQ(is_default_group, false); - auto msg = makeMessage(); - envelopeSetGroup(msg->env, group); - theMsg()->broadcastMsg(msg.get()); - } - ); - theTerm()->addAction([=]{ - if (node_filter) { - EXPECT_EQ(num_recv, num_nodes); - } else { - EXPECT_EQ(num_recv, 0); - } - num_recv = 0; + + runInEpochCollective([&]{ + theGroup()->newGroupCollective( + node_filter, [=](GroupType group) { + auto const& in_group = theGroup()->inGroup(group); + auto const& is_default_group = theGroup()->groupDefault(group); + EXPECT_EQ(in_group, node_filter); + EXPECT_EQ(is_default_group, false); + auto msg = makeMessage(); + envelopeSetGroup(msg->env, group); + theMsg()->broadcastMsg(msg.get()); + } + ); }); -} -// TEST_F(TestGroup, test_group_collective_construct_2) { -// auto const& this_node = theContext()->getNode(); -// auto const& num_nodes = theContext()->getNumNodes(); -// auto const& node_filter = this_node % 2 == 1; -// theGroup()->newGroupCollective( -// node_filter, [=](GroupType group) { -// auto const& in_group = theGroup()->inGroup(group); -// auto const& is_default_group = theGroup()->groupDefault(group); -// ::fmt::print("{}: new group collective lambda\n", this_node); -// EXPECT_EQ(in_group, node_filter); -// EXPECT_EQ(is_default_group, false); -// auto msg = makeMessage(); -// envelopeSetGroup(msg->env, group); -// theMsg()->broadcastMsg(msg.get()); -// } -// ); -// theTerm()->addAction([=]{ -// if (node_filter) { -// EXPECT_EQ(num_recv, num_nodes); -// } else { -// EXPECT_EQ(num_recv, 0); -// } -// num_recv = 0; -// }); -// } + if (node_filter) { + EXPECT_EQ(num_recv, num_nodes); + } else { + EXPECT_EQ(num_recv, 0); + } + num_recv = 0; +} }}} // end namespace vt::tests::unit diff --git a/tests/unit/location/test_hops.extended.cc b/tests/unit/location/test_hops.extended.cc index 94abe88381..b7c27be8b0 100644 --- a/tests/unit/location/test_hops.extended.cc +++ b/tests/unit/location/test_hops.extended.cc @@ -139,21 +139,6 @@ struct TestColl : Collection { std::vector vec; }; -template -void executeInEpoch(Callable&& fn) { - auto this_node = theContext()->getNode(); - auto ep = vt::theTerm()->makeEpochCollective(); - vt::theMsg()->pushEpoch(ep); - if (this_node == 0) { - fn(); - } - vt::theMsg()->popEpoch(ep); - vt::theTerm()->finishedEpoch(ep); - bool done = false; - vt::theTerm()->addAction(ep, [&done]{ done = true; }); - do vt::runScheduler(); while (!done); -} - TEST_F(TestHops, test_hops_1) { auto num_nodes = theContext()->getNumNodes(); auto this_node = theContext()->getNode(); @@ -169,7 +154,7 @@ TEST_F(TestHops, test_hops_1) { if (this_node == 0) { vt_print(gen, "Doing work stage 1 for iter={}\n", i); } - executeInEpoch([=]{ + runInEpochCollective([&]{ if (this_node == 0) { proxy.broadcast(false); } @@ -177,7 +162,7 @@ TEST_F(TestHops, test_hops_1) { if (this_node == 0) { vt_print(gen, "Doing work stage 2 for iter={}\n", i); } - executeInEpoch([=]{ + runInEpochCollective([&]{ if (this_node == 0) { proxy.broadcast(true); } @@ -185,7 +170,7 @@ TEST_F(TestHops, test_hops_1) { if (this_node == 0) { vt_print(gen, "Running LB for iter={}\n", i); } - executeInEpoch([=]{ + runInEpochCollective([&]{ if (this_node == 0) { proxy.broadcast(); } diff --git a/tests/unit/location/test_location_common.h b/tests/unit/location/test_location_common.h index 2305f03b48..f99ff13804 100644 --- a/tests/unit/location/test_location_common.h +++ b/tests/unit/location/test_location_common.h @@ -151,18 +151,13 @@ void verifyCacheConsistency( // perform the checks only at the end of the epoch // to ensure that all entity messages have been // correctly delivered before. - auto epoch = vt::theTerm()->makeEpochCollective(); + runInEpochCollective([&]{ + // create an entity message to route + auto msg = vt::makeMessage(entity, my_node); + // check if should be serialized or not + bool serialize = msg->getSerialize(); - // create an entity message to route - auto msg = vt::makeMessage(entity, my_node); - // check if should be serialized or not - bool serialize = msg->getSerialize(); - - bool finished = false; - - vt::theTerm()->addAction(epoch, [=,&finished]{ if (my_node not_eq home) { - // check the routing protocol to be used by the manager. bool is_eager = theLocMan()->virtual_loc->useEagerProtocol(msg); @@ -193,20 +188,12 @@ void verifyCacheConsistency( // regardless of the protocol (eager or not) EXPECT_TRUE(isCached(entity)); } - finished = true; - }); - if (my_node not_eq home) { - // route entity message - vt::theLocMan()->virtual_loc->routeMsg(entity, home, msg, serialize); - } - // wait for all ranks and finish the epoch - vt::theCollective()->barrier(); - vt::theTerm()->finishedEpoch(epoch); - - while (not finished) { - vt::runScheduler(); - } + if (my_node not_eq home) { + // route entity message + vt::theLocMan()->virtual_loc->routeMsg(entity, home, msg, serialize); + } + }); } } From 07a8158497abb5e09a76a1c419d6b1e1da862d34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cezary=20Skrzy=C5=84ski?= Date: Tue, 11 Aug 2020 16:21:35 +0200 Subject: [PATCH 4/7] #955 remove addAction usage from test_callback_bcast.cc --- tests/unit/pipe/test_callback_bcast.cc | 76 +++++++++++++------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/tests/unit/pipe/test_callback_bcast.cc b/tests/unit/pipe/test_callback_bcast.cc index 30db44b4c8..8c10d6c0e8 100644 --- a/tests/unit/pipe/test_callback_bcast.cc +++ b/tests/unit/pipe/test_callback_bcast.cc @@ -115,13 +115,13 @@ TEST_F(TestCallbackBcast, test_callback_bcast_1) { theCollective()->barrier(); - auto cb = theCB()->makeBcast(); - auto nmsg = makeMessage(1,2,3); - cb.send(nmsg.get()); - - theTerm()->addAction([=]{ - EXPECT_EQ(called, 100 * theContext()->getNumNodes()); + runInEpochCollective([&]{ + auto cb = theCB()->makeBcast(); + auto nmsg = makeMessage(1,2,3); + cb.send(nmsg.get()); }); + + EXPECT_EQ(called, 100 * theContext()->getNumNodes()); } TEST_F(TestCallbackBcast, test_callback_bcast_2) { @@ -129,13 +129,13 @@ TEST_F(TestCallbackBcast, test_callback_bcast_2) { theCollective()->barrier(); - auto cb = theCB()->makeBcast(); - auto nmsg = makeMessage(1,2,3); - cb.send(nmsg.get()); - - theTerm()->addAction([=]{ - EXPECT_EQ(called, 200 * theContext()->getNumNodes()); + runInEpochCollective([&]{ + auto cb = theCB()->makeBcast(); + auto nmsg = makeMessage(1,2,3); + cb.send(nmsg.get()); }); + + EXPECT_EQ(called, 200 * theContext()->getNumNodes()); } TEST_F(TestCallbackBcast, test_callback_bcast_3) { @@ -143,12 +143,12 @@ TEST_F(TestCallbackBcast, test_callback_bcast_3) { theCollective()->barrier(); - auto cb = theCB()->makeBcast(); - cb.send(); - - theTerm()->addAction([=]{ - EXPECT_EQ(called, 300 * theContext()->getNumNodes()); + runInEpochCollective([&]{ + auto cb = theCB()->makeBcast(); + cb.send(); }); + + EXPECT_EQ(called, 300 * theContext()->getNumNodes()); } TEST_F(TestCallbackBcast, test_callback_bcast_remote_1) { @@ -159,14 +159,14 @@ TEST_F(TestCallbackBcast, test_callback_bcast_remote_1) { theCollective()->barrier(); - auto next = this_node + 1 < num_nodes ? this_node + 1 : 0; - auto cb = theCB()->makeBcast(); - auto msg = makeMessage(cb); - theMsg()->sendMsg(next, msg.get()); - - theTerm()->addAction([=]{ - EXPECT_EQ(called, 100 * theContext()->getNumNodes()); + runInEpochCollective([&]{ + auto next = this_node + 1 < num_nodes ? this_node + 1 : 0; + auto cb = theCB()->makeBcast(); + auto msg = makeMessage(cb); + theMsg()->sendMsg(next, msg.get()); }); + + EXPECT_EQ(called, 100 * theContext()->getNumNodes()); } TEST_F(TestCallbackBcast, test_callback_bcast_remote_2) { @@ -177,14 +177,14 @@ TEST_F(TestCallbackBcast, test_callback_bcast_remote_2) { theCollective()->barrier(); - auto next = this_node + 1 < num_nodes ? this_node + 1 : 0; - auto cb = theCB()->makeBcast(); - auto msg = makeMessage(cb); - theMsg()->sendMsg(next, msg.get()); - - theTerm()->addAction([=]{ - EXPECT_EQ(called, 200 * theContext()->getNumNodes()); + runInEpochCollective([&]{ + auto next = this_node + 1 < num_nodes ? this_node + 1 : 0; + auto cb = theCB()->makeBcast(); + auto msg = makeMessage(cb); + theMsg()->sendMsg(next, msg.get()); }); + + EXPECT_EQ(called, 200 * theContext()->getNumNodes()); } TEST_F(TestCallbackBcast, test_callback_bcast_remote_3) { @@ -195,14 +195,14 @@ TEST_F(TestCallbackBcast, test_callback_bcast_remote_3) { theCollective()->barrier(); - auto next = this_node + 1 < num_nodes ? this_node + 1 : 0; - auto cb = theCB()->makeBcast(); - auto msg = makeMessage(cb); - theMsg()->sendMsg(next, msg.get()); - - theTerm()->addAction([=]{ - EXPECT_EQ(called, 300 * theContext()->getNumNodes()); + runInEpochCollective([&]{ + auto next = this_node + 1 < num_nodes ? this_node + 1 : 0; + auto cb = theCB()->makeBcast(); + auto msg = makeMessage(cb); + theMsg()->sendMsg(next, msg.get()); }); + + EXPECT_EQ(called, 300 * theContext()->getNumNodes()); } From ed4e3eba96a0bd9569d7872688a4c1c94d7fd787 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cezary=20Skrzy=C5=84ski?= Date: Tue, 11 Aug 2020 16:47:12 +0200 Subject: [PATCH 5/7] #955 remove addAction usage from test_memory_lifetime.cc --- tests/unit/memory/test_memory_lifetime.cc | 50 +++++++++++------------ 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/tests/unit/memory/test_memory_lifetime.cc b/tests/unit/memory/test_memory_lifetime.cc index baaddcd1d8..9d2c3a98d1 100644 --- a/tests/unit/memory/test_memory_lifetime.cc +++ b/tests/unit/memory/test_memory_lifetime.cc @@ -121,20 +121,20 @@ TEST_F(TestMemoryLifetime, test_active_send_serial_lifetime_1) { auto const& num_nodes = theContext()->getNumNodes(); if (num_nodes > 1) { - auto const next_node = this_node + 1 < num_nodes ? this_node + 1 : 0; - for (int i = 0; i < num_msgs_sent; i++) { - auto msg = makeMessage(); - theMsg()->sendMsg(next_node, msg.get()); - } - for (int i = 0; i < num_msgs_sent; i++) { - auto msg = makeMessage(); - theMsg()->sendMsg(next_node, msg.get()); - } - - theTerm()->addAction([=]{ - EXPECT_EQ(SerialTrackMsg::alloc_count, 0); - EXPECT_EQ(local_count, num_msgs_sent*2); + runInEpochCollective([&]{ + auto const next_node = this_node + 1 < num_nodes ? this_node + 1 : 0; + for (int i = 0; i < num_msgs_sent; i++) { + auto msg = makeMessage(); + theMsg()->sendMsg(next_node, msg.get()); + } + for (int i = 0; i < num_msgs_sent; i++) { + auto msg = makeMessage(); + theMsg()->sendMsg(next_node, msg.get()); + } }); + + EXPECT_EQ(SerialTrackMsg::alloc_count, 0); + EXPECT_EQ(local_count, num_msgs_sent*2); } } @@ -145,19 +145,19 @@ TEST_F(TestMemoryLifetime, test_active_bcast_serial_lifetime_1) { auto const& num_nodes = theContext()->getNumNodes(); if (num_nodes > 1) { - for (int i = 0; i < num_msgs_sent; i++) { - auto msg = makeMessage(); - theMsg()->broadcastMsg(msg.get()); - } - for (int i = 0; i < num_msgs_sent; i++) { - auto msg = makeMessage(); - theMsg()->broadcastMsg(msg.get()); - } - - theTerm()->addAction([=]{ - EXPECT_EQ(SerialTrackMsg::alloc_count, 0); - EXPECT_EQ(local_count, num_msgs_sent*(num_nodes-1)*2); + runInEpochCollective([&]{ + for (int i = 0; i < num_msgs_sent; i++) { + auto msg = makeMessage(); + theMsg()->broadcastMsg(msg.get()); + } + for (int i = 0; i < num_msgs_sent; i++) { + auto msg = makeMessage(); + theMsg()->broadcastMsg(msg.get()); + } }); + + EXPECT_EQ(SerialTrackMsg::alloc_count, 0); + EXPECT_EQ(local_count, num_msgs_sent*(num_nodes-1)*2); } } From 73d20fdbd45f7a6e38a51f4cdd9297b0eb21da97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cezary=20Skrzy=C5=84ski?= Date: Tue, 11 Aug 2020 18:06:33 +0200 Subject: [PATCH 6/7] #955 improve Scheduler documentation --- docs/md/scheduler.md | 10 ++++++---- src/vt/scheduler/scheduler.h | 3 +++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/docs/md/scheduler.md b/docs/md/scheduler.md index cd490b7aaf..d3a062125d 100644 --- a/docs/md/scheduler.md +++ b/docs/md/scheduler.md @@ -1,7 +1,7 @@ \page scheduler Scheduler \brief General scheduling of work -The scheduler component `vt::sched::Scheduler`, accessed via `vt::theSched()` +The scheduler component `vt::sched::Scheduler`, accessed via `vt::theSched()`, holds pieces of work to execute later that may be prioritized. The scheduler polls the \vt components to make progress and collect new pieces of work. The scheduler allows registration of callbacks when the system is idle. @@ -17,12 +17,14 @@ vt::theSched()->scheduler(); This polls every component that might generate or complete work, and potentially runs one piece of available work. -\copydoc vt::sched::Scheduler::scheduler(bool) - However, if the scheduler needs to be run until a condition (or set of conditions) is met, it is recommended that `runSchedulerWhile` be invoked: -\copydoc vt::sched::Scheduler::runSchedulerWhile(std::function) +\code{.cpp} +vt::theSched()->runSchedulerWhile(/*std::function cond*/); +\endcode + +\copydetails vt::sched::Scheduler::runSchedulerWhile(std::function) \section higher-level-calls Higher-level Calls to Wait for Completion diff --git a/src/vt/scheduler/scheduler.h b/src/vt/scheduler/scheduler.h index ae11b5e270..7f47523f3e 100644 --- a/src/vt/scheduler/scheduler.h +++ b/src/vt/scheduler/scheduler.h @@ -129,6 +129,9 @@ struct Scheduler : runtime::component::Component { /** * \brief Turn the scheduler * + * Polls every component that might generate or complete work, and + * potentially runs one piece of available work. + * * \param[in] msg_only whether to only make progress on the core active * messenger */ From 4c45a7cb6ab4dd36121f8fd5a0938e4be9b6d81e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cezary=20Skrzy=C5=84ski?= Date: Wed, 12 Aug 2020 18:12:35 +0200 Subject: [PATCH 7/7] #955 remove addAction usage from test_objgroup.cc --- tests/unit/objgroup/test_objgroup.cc | 143 ++++++++++++++------------- 1 file changed, 72 insertions(+), 71 deletions(-) diff --git a/tests/unit/objgroup/test_objgroup.cc b/tests/unit/objgroup/test_objgroup.cc index 4b06ac5ef5..330203ce04 100644 --- a/tests/unit/objgroup/test_objgroup.cc +++ b/tests/unit/objgroup/test_objgroup.cc @@ -172,65 +172,66 @@ TEST_F(TestObjGroup, test_proxy_update) { } TEST_F(TestObjGroup, test_proxy_schedule) { - - // create a proxy to a object group - auto proxy = vt::theObjGroup()->makeCollective(); - - // self-send a message and then broadcast auto my_node = vt::theContext()->getNode(); auto num_nodes = vt::theContext()->getNumNodes(); - proxy[my_node].send(); - proxy.broadcast(); + // create a proxy to a object group + auto proxy = vt::theObjGroup()->makeCollective(); + MyObjA *obj = nullptr; - auto obj = proxy.get(); - vt_debug_print( - objgroup, node, - "obj->recv:{} before term\n", obj->recv_ - ); + runInEpochCollective([&]{ + // self-send a message and then broadcast + proxy[my_node].send(); + proxy.broadcast(); - // wait for term and check state to ensure all expected events executed - theTerm()->addAction([=]{ + obj = proxy.get(); vt_debug_print( objgroup, node, - "obj->recv:{} after term\n", obj->recv_ + "obj->recv:{} before term\n", obj->recv_ ); - EXPECT_EQ(obj->recv_, num_nodes + 1); }); + + // check state to ensure all expected events executed + vt_debug_print( + objgroup, node, + "obj->recv:{} after term\n", obj->recv_ + ); + EXPECT_EQ(obj->recv_, num_nodes + 1); } TEST_F(TestObjGroup, test_proxy_callbacks) { - auto const my_node = vt::theContext()->getNode(); + MyObjA* obj1 = nullptr; + MyObjB* obj2 = nullptr; + MyObjA* obj3 = nullptr; + + runInEpochCollective([&]{ + // create object groups and retrieve proxies + auto proxy1 = vt::theObjGroup()->makeCollective(); + auto proxy2 = vt::theObjGroup()->makeCollective(1); + auto proxy3 = vt::theObjGroup()->makeCollective(); + + if (my_node == 0) { + proxy1[0].send(); + proxy1[0].send(); + proxy1[1].send(); + } else if (my_node == 1) { + proxy2.broadcast(); + proxy3[0].send(); + } - // create object groups and retrieve proxies - auto proxy1 = vt::theObjGroup()->makeCollective(); - auto proxy2 = vt::theObjGroup()->makeCollective(1); - auto proxy3 = vt::theObjGroup()->makeCollective(); - - if (my_node == 0) { - proxy1[0].send(); - proxy1[0].send(); - proxy1[1].send(); - } else if (my_node == 1) { - proxy2.broadcast(); - proxy3[0].send(); - } - - // wait for term and check state to ensure all expected events executed - theTerm()->addAction([=]{ // check received messages for each group - auto obj1 = proxy1.get(); - auto obj2 = proxy2.get(); - auto obj3 = proxy3.get(); - - switch (my_node) { - case 0: EXPECT_EQ(obj1->recv_, 2); break; - case 1: EXPECT_EQ(obj1->recv_, 1); break; - default: EXPECT_EQ(obj1->recv_, 0); break; - } - EXPECT_EQ(obj2->recv_, 1); - EXPECT_EQ(obj3->recv_, my_node == 0 ? 1 : 0); + obj1 = proxy1.get(); + obj2 = proxy2.get(); + obj3 = proxy3.get(); }); + + switch (my_node) { + case 0: EXPECT_EQ(obj1->recv_, 2); break; + case 1: EXPECT_EQ(obj1->recv_, 1); break; + default: EXPECT_EQ(obj1->recv_, 0); break; + } + EXPECT_EQ(obj2->recv_, 1); + EXPECT_EQ(obj3->recv_, my_node == 0 ? 1 : 0); } TEST_F(TestObjGroup, test_proxy_reduce) { @@ -240,34 +241,34 @@ TEST_F(TestObjGroup, test_proxy_reduce) { vt::theCollective()->barrier(); - // create four proxy instances of a same object group type - auto proxy1 = vt::theObjGroup()->makeCollective(); - auto proxy2 = vt::theObjGroup()->makeCollective(); - auto proxy3 = vt::theObjGroup()->makeCollective(); - auto proxy4 = vt::theObjGroup()->makeCollective(); - - auto msg1 = vt::makeMessage(my_node); - auto msg2 = vt::makeMessage(4); - auto msg3 = vt::makeMessage(my_node); - auto msg4 = vt::makeMessage(my_node); - - // Multiple reductions should not interfere each other, even if - // performed by the same subset of nodes within the same epoch. - // Proxies should be able to do perform reduction - // on any valid operator and data type. - using namespace vt::collective; - - proxy1.reduce, Verify<1>>(msg1); - proxy2.reduce, Verify<2>>(msg2); - proxy3.reduce< MaxOp, Verify<3>>(msg3); - proxy4.reduce, Verify<4>>(msg4); - - theTerm()->addAction([=]{ - auto const root_node = 0; - if (my_node == root_node) { - EXPECT_EQ(TestObjGroup::total_verify_expected_, 4); - } + runInEpochCollective([&]{ + // create four proxy instances of a same object group type + auto proxy1 = vt::theObjGroup()->makeCollective(); + auto proxy2 = vt::theObjGroup()->makeCollective(); + auto proxy3 = vt::theObjGroup()->makeCollective(); + auto proxy4 = vt::theObjGroup()->makeCollective(); + + auto msg1 = vt::makeMessage(my_node); + auto msg2 = vt::makeMessage(4); + auto msg3 = vt::makeMessage(my_node); + auto msg4 = vt::makeMessage(my_node); + + // Multiple reductions should not interfere each other, even if + // performed by the same subset of nodes within the same epoch. + // Proxies should be able to do perform reduction + // on any valid operator and data type. + using namespace vt::collective; + + proxy1.reduce, Verify<1>>(msg1); + proxy2.reduce, Verify<2>>(msg2); + proxy3.reduce< MaxOp, Verify<3>>(msg3); + proxy4.reduce, Verify<4>>(msg4); }); + + auto const root_node = 0; + if (my_node == root_node) { + EXPECT_EQ(TestObjGroup::total_verify_expected_, 4); + } } }}} // end namespace vt::tests::unit