Skip to content

Commit

Permalink
Merge branch 'develop' into 884-rename-getNumTotalChildren
Browse files Browse the repository at this point in the history
  • Loading branch information
cz4rs authored Aug 14, 2020
2 parents 78fffb3 + 56c37d7 commit 448e6ff
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 262 deletions.
10 changes: 6 additions & 4 deletions docs/md/scheduler.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
\page scheduler Scheduler
\brief General scheduling of work

The scheduler component `vt::sched::Scheduler`, accessed via `vt::theSched()`
The scheduler component `vt::sched::Scheduler`, accessed via `vt::theSched()`,
holds pieces of work to execute later that may be prioritized. The scheduler
polls the \vt components to make progress and collect new pieces of work. The
scheduler allows registration of callbacks when the system is idle.
Expand All @@ -17,12 +17,14 @@ vt::theSched()->scheduler();
This polls every component that might generate or complete work, and potentially
runs one piece of available work.

\copydoc vt::sched::Scheduler::scheduler(bool)

However, if the scheduler needs to be run until a condition (or set of
conditions) is met, it is recommended that `runSchedulerWhile` be invoked:

\copydoc vt::sched::Scheduler::runSchedulerWhile(std::function<bool()>)
\code{.cpp}
vt::theSched()->runSchedulerWhile(/*std::function<bool()> cond*/);
\endcode

\copydetails vt::sched::Scheduler::runSchedulerWhile(std::function<bool()>)

\section higher-level-calls Higher-level Calls to Wait for Completion

Expand Down
3 changes: 3 additions & 0 deletions src/vt/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ struct Scheduler : runtime::component::Component<Scheduler> {
/**
* \brief Turn the scheduler
*
* Polls every component that might generate or complete work, and
* potentially runs one piece of available work.
*
* \param[in] msg_only whether to only make progress on the core active
* messenger
*/
Expand Down
137 changes: 59 additions & 78 deletions tests/unit/group/test_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,27 @@ TEST_F(TestGroup, test_group_range_construct_1) {
auto const& num_nodes = theContext()->getNumNodes();
NodeType const lo = 0;
NodeType const hi = num_nodes / 2;
if (this_node == 0) {
auto list = std::make_unique<region::Range>(lo,hi);
theGroup()->newGroup(
std::move(list), [](GroupType group){
fmt::print("Group is created: group={:x}\n", group);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
}
);
}
theTerm()->addAction([=]{
if (this_node >= lo && this_node < hi) {
EXPECT_EQ(num_recv, 1);
} else {
EXPECT_EQ(num_recv, 0);

runInEpochCollective([&]{
if (this_node == 0) {
auto list = std::make_unique<region::Range>(lo,hi);
theGroup()->newGroup(
std::move(list), [](GroupType group){
fmt::print("Group is created: group={:x}\n", group);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
}
);
}
num_recv = 0;
});

if (this_node >= lo && this_node < hi) {
EXPECT_EQ(num_recv, 1);
} else {
EXPECT_EQ(num_recv, 0);
}
num_recv = 0;
}

TEST_F(TestGroup, test_group_range_construct_2) {
Expand All @@ -101,76 +103,55 @@ TEST_F(TestGroup, test_group_range_construct_2) {
NodeType const lo = 1;
NodeType const max_val = 5;
NodeType const hi = std::min<NodeType>(num_nodes,max_val);
if (this_node == 0) {
auto list = std::make_unique<region::Range>(lo,hi);
theGroup()->newGroup(
std::move(list), [](GroupType group){
fmt::print("Group is created: group={:x}\n", group);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
}
);
}
theTerm()->addAction([=]{
if (this_node >= lo && this_node < hi) {
EXPECT_EQ(num_recv, 1);
} else {
EXPECT_EQ(num_recv, 0);


runInEpochCollective([&]{
if (this_node == 0) {
auto list = std::make_unique<region::Range>(lo,hi);
theGroup()->newGroup(
std::move(list), [](GroupType group){
fmt::print("Group is created: group={:x}\n", group);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
}
);
}
num_recv = 0;
});

if (this_node >= lo && this_node < hi) {
EXPECT_EQ(num_recv, 1);
} else {
EXPECT_EQ(num_recv, 0);
}
num_recv = 0;
}

TEST_F(TestGroup, test_group_collective_construct_1) {
auto const& this_node = theContext()->getNode();
auto const& num_nodes = theContext()->getNumNodes();
bool const node_filter = this_node % 2 == 0;
theGroup()->newGroupCollective(
node_filter, [=](GroupType group) {
auto const& in_group = theGroup()->inGroup(group);
auto const& is_default_group = theGroup()->groupDefault(group);
EXPECT_EQ(in_group, node_filter);
EXPECT_EQ(is_default_group, false);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
}
);
theTerm()->addAction([=]{
if (node_filter) {
EXPECT_EQ(num_recv, num_nodes);
} else {
EXPECT_EQ(num_recv, 0);
}
num_recv = 0;

runInEpochCollective([&]{
theGroup()->newGroupCollective(
node_filter, [=](GroupType group) {
auto const& in_group = theGroup()->inGroup(group);
auto const& is_default_group = theGroup()->groupDefault(group);
EXPECT_EQ(in_group, node_filter);
EXPECT_EQ(is_default_group, false);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
}
);
});
}

// TEST_F(TestGroup, test_group_collective_construct_2) {
// auto const& this_node = theContext()->getNode();
// auto const& num_nodes = theContext()->getNumNodes();
// auto const& node_filter = this_node % 2 == 1;
// theGroup()->newGroupCollective(
// node_filter, [=](GroupType group) {
// auto const& in_group = theGroup()->inGroup(group);
// auto const& is_default_group = theGroup()->groupDefault(group);
// ::fmt::print("{}: new group collective lambda\n", this_node);
// EXPECT_EQ(in_group, node_filter);
// EXPECT_EQ(is_default_group, false);
// auto msg = makeMessage<TestMsg>();
// envelopeSetGroup(msg->env, group);
// theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
// }
// );
// theTerm()->addAction([=]{
// if (node_filter) {
// EXPECT_EQ(num_recv, num_nodes);
// } else {
// EXPECT_EQ(num_recv, 0);
// }
// num_recv = 0;
// });
// }
if (node_filter) {
EXPECT_EQ(num_recv, num_nodes);
} else {
EXPECT_EQ(num_recv, 0);
}
num_recv = 0;
}

}}} // end namespace vt::tests::unit
21 changes: 3 additions & 18 deletions tests/unit/location/test_hops.extended.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,6 @@ struct TestColl : Collection<TestColl,vt::Index2D> {
std::vector<double> vec;
};

template <typename Callable>
void executeInEpoch(Callable&& fn) {
auto this_node = theContext()->getNode();
auto ep = vt::theTerm()->makeEpochCollective();
vt::theMsg()->pushEpoch(ep);
if (this_node == 0) {
fn();
}
vt::theMsg()->popEpoch(ep);
vt::theTerm()->finishedEpoch(ep);
bool done = false;
vt::theTerm()->addAction(ep, [&done]{ done = true; });
do vt::runScheduler(); while (!done);
}

TEST_F(TestHops, test_hops_1) {
auto num_nodes = theContext()->getNumNodes();
auto this_node = theContext()->getNode();
Expand All @@ -169,23 +154,23 @@ TEST_F(TestHops, test_hops_1) {
if (this_node == 0) {
vt_print(gen, "Doing work stage 1 for iter={}\n", i);
}
executeInEpoch([=]{
runInEpochCollective([&]{
if (this_node == 0) {
proxy.broadcast<TestColl::TestMsg,&TestColl::doWork>(false);
}
});
if (this_node == 0) {
vt_print(gen, "Doing work stage 2 for iter={}\n", i);
}
executeInEpoch([=]{
runInEpochCollective([&]{
if (this_node == 0) {
proxy.broadcast<TestColl::TestMsg,&TestColl::doWork>(true);
}
});
if (this_node == 0) {
vt_print(gen, "Running LB for iter={}\n", i);
}
executeInEpoch([=]{
runInEpochCollective([&]{
if (this_node == 0) {
proxy.broadcast<TestColl::TestMsg,&TestColl::dolb>();
}
Expand Down
33 changes: 10 additions & 23 deletions tests/unit/location/test_location_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,13 @@ void verifyCacheConsistency(
// perform the checks only at the end of the epoch
// to ensure that all entity messages have been
// correctly delivered before.
auto epoch = vt::theTerm()->makeEpochCollective();
runInEpochCollective([&]{
// create an entity message to route
auto msg = vt::makeMessage<MsgT>(entity, my_node);
// check if should be serialized or not
bool serialize = msg->getSerialize();

// create an entity message to route
auto msg = vt::makeMessage<MsgT>(entity, my_node);
// check if should be serialized or not
bool serialize = msg->getSerialize();

bool finished = false;

vt::theTerm()->addAction(epoch, [=,&finished]{
if (my_node not_eq home) {

// check the routing protocol to be used by the manager.
bool is_eager = theLocMan()->virtual_loc->useEagerProtocol(msg);

Expand Down Expand Up @@ -193,20 +188,12 @@ void verifyCacheConsistency(
// regardless of the protocol (eager or not)
EXPECT_TRUE(isCached(entity));
}
finished = true;
});

if (my_node not_eq home) {
// route entity message
vt::theLocMan()->virtual_loc->routeMsg<MsgT>(entity, home, msg, serialize);
}
// wait for all ranks and finish the epoch
vt::theCollective()->barrier();
vt::theTerm()->finishedEpoch(epoch);

while (not finished) {
vt::runScheduler();
}
if (my_node not_eq home) {
// route entity message
vt::theLocMan()->virtual_loc->routeMsg<MsgT>(entity, home, msg, serialize);
}
});
}
}

Expand Down
50 changes: 25 additions & 25 deletions tests/unit/memory/test_memory_lifetime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,20 @@ TEST_F(TestMemoryLifetime, test_active_send_serial_lifetime_1) {
auto const& num_nodes = theContext()->getNumNodes();

if (num_nodes > 1) {
auto const next_node = this_node + 1 < num_nodes ? this_node + 1 : 0;
for (int i = 0; i < num_msgs_sent; i++) {
auto msg = makeMessage<SerialTestMsg>();
theMsg()->sendMsg<SerialTestMsg, serialHan>(next_node, msg.get());
}
for (int i = 0; i < num_msgs_sent; i++) {
auto msg = makeMessage<SerialTestMsg>();
theMsg()->sendMsg<SerialTestMsg, serialHan>(next_node, msg.get());
}

theTerm()->addAction([=]{
EXPECT_EQ(SerialTrackMsg::alloc_count, 0);
EXPECT_EQ(local_count, num_msgs_sent*2);
runInEpochCollective([&]{
auto const next_node = this_node + 1 < num_nodes ? this_node + 1 : 0;
for (int i = 0; i < num_msgs_sent; i++) {
auto msg = makeMessage<SerialTestMsg>();
theMsg()->sendMsg<SerialTestMsg, serialHan>(next_node, msg.get());
}
for (int i = 0; i < num_msgs_sent; i++) {
auto msg = makeMessage<SerialTestMsg>();
theMsg()->sendMsg<SerialTestMsg, serialHan>(next_node, msg.get());
}
});

EXPECT_EQ(SerialTrackMsg::alloc_count, 0);
EXPECT_EQ(local_count, num_msgs_sent*2);
}
}

Expand All @@ -145,19 +145,19 @@ TEST_F(TestMemoryLifetime, test_active_bcast_serial_lifetime_1) {
auto const& num_nodes = theContext()->getNumNodes();

if (num_nodes > 1) {
for (int i = 0; i < num_msgs_sent; i++) {
auto msg = makeMessage<SerialTestMsg>();
theMsg()->broadcastMsg<SerialTestMsg, serialHan>(msg.get());
}
for (int i = 0; i < num_msgs_sent; i++) {
auto msg = makeMessage<SerialTestMsg>();
theMsg()->broadcastMsg<SerialTestMsg, serialHan>(msg.get());
}

theTerm()->addAction([=]{
EXPECT_EQ(SerialTrackMsg::alloc_count, 0);
EXPECT_EQ(local_count, num_msgs_sent*(num_nodes-1)*2);
runInEpochCollective([&]{
for (int i = 0; i < num_msgs_sent; i++) {
auto msg = makeMessage<SerialTestMsg>();
theMsg()->broadcastMsg<SerialTestMsg, serialHan>(msg.get());
}
for (int i = 0; i < num_msgs_sent; i++) {
auto msg = makeMessage<SerialTestMsg>();
theMsg()->broadcastMsg<SerialTestMsg, serialHan>(msg.get());
}
});

EXPECT_EQ(SerialTrackMsg::alloc_count, 0);
EXPECT_EQ(local_count, num_msgs_sent*(num_nodes-1)*2);
}
}

Expand Down
Loading

0 comments on commit 448e6ff

Please sign in to comment.