Skip to content

Commit

Permalink
#703 update core to use theSched()->runSchedulerWhile
Browse files Browse the repository at this point in the history
- Using this while loop form ensures that scheduler loops
  (as an entire collective) can be nested without interfering
  with events from other scheduler loops.
  • Loading branch information
pnstickne committed Mar 16, 2020
1 parent 83796f4 commit 76318a8
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 37 deletions.
4 changes: 1 addition & 3 deletions examples/broadcast_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ int main(int argc, char** argv) {
}
}

while (!rt->isTerminated()) {
runScheduler();
}
theSched()->runSchedulerWhile([]{ return !rt->isTerminated(); });

CollectiveOps::finalize();

Expand Down
9 changes: 5 additions & 4 deletions src/vt/collective/barrier/barrier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,13 @@ void Barrier::waitBarrier(

barrierUp(is_named, is_wait, barrier, skip_term);

while (not barrier_state.released) {
vt::runScheduler();
if (poll_action) {
theSched()->runSchedulerWhile([&poll_action, &barrier_state]{
bool cond = not barrier_state.released;
if (cond and poll_action) {
poll_action();
}
}
return cond;
});

debug_print(
barrier, node,
Expand Down
4 changes: 1 addition & 3 deletions src/vt/collective/collective_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ void CollectiveAnyOps<instance>::scheduleThenFinalize(
auto rt_use = has_rt ? in_rt.unsafe() : curRT;

auto sched_fn = [=]{
while (not rt_use->isTerminated()) {
runScheduler();
}
theSched()->runSchedulerWhile([rt_use]{ return not rt_use->isTerminated(); });
};

if (workers == no_workers) {
Expand Down
6 changes: 3 additions & 3 deletions src/vt/runtime/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,9 @@ void Runtime::setupTerminateHandler() {
}

/*virtual*/ Runtime::~Runtime() {
while (runtime_active_ && !aborted_) {
runScheduler();
}
theSched->runSchedulerWhile([this]{
return runtime_active_ && !aborted_;
});
if (!aborted_) {
finalize();
}
Expand Down
12 changes: 6 additions & 6 deletions src/vt/termination/term_scope.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ namespace vt { namespace term {
closure();
theMsg()->popEpoch();
theTerm()->finishedEpoch(epoch);
while (!term_finished) {
runScheduler();
}

theSched()->runSchedulerWhile([&term_finished]{ return !term_finished; });

return epoch;
}

Expand Down Expand Up @@ -97,9 +97,9 @@ namespace vt { namespace term {
closure();
theMsg()->popEpoch();
theTerm()->finishedEpoch(epoch);
while (!term_finished) {
runScheduler();
}

theSched()->runSchedulerWhile([&term_finished]{ return !term_finished; });

return epoch;
}

Expand Down
6 changes: 3 additions & 3 deletions src/vt/termination/termination.cc
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,9 @@ bool TerminationDetector::propagateEpoch(TermStateType& state) {
startEpochGraphBuild();
// After spawning the build, spin until the file gets written out so
// vtAbort does not exit too early
while (not has_printed_epoch_graph or not theSched()->isIdle()) {
vt::runScheduler();
}
theSched()->runSchedulerWhile([this]{
return not has_printed_epoch_graph or not theSched()->isIdle();
});
}
vtAbort("Detected hang indicating no further progress is possible");
}
Expand Down
8 changes: 5 additions & 3 deletions src/vt/trace/trace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,18 @@ void Trace::initialize() {
void Trace::loadAndBroadcastSpec() {
if (ArgType::vt_trace_spec) {
auto spec_proxy = file_spec::TraceSpec::construct();

theTerm()->produce();
if (theContext()->getNode() == 0) {
auto spec_ptr = spec_proxy.get();
spec_ptr->parse();
spec_ptr->broadcastSpec();
}
while (not spec_proxy.get()->specReceived()) {
vt::runScheduler();
}
theSched()->runSchedulerWhile([&spec_proxy]{
return not spec_proxy.get()->specReceived();
});
theTerm()->consume();

spec_proxy_ = spec_proxy.getProxy();

// Set enabled for the initial phase
Expand Down
8 changes: 2 additions & 6 deletions src/vt/vrt/collection/balance/gossiplb/gossiplb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,7 @@ void GossipLB::inform() {

theTerm()->finishedEpoch(propagate_epoch);

while (not inform_done) {
vt::runScheduler();
}
theSched()->runSchedulerWhile([&inform_done]{ return not inform_done; });

debug_print(
gossiplb, node,
Expand Down Expand Up @@ -457,9 +455,7 @@ void GossipLB::decide() {

theTerm()->finishedEpoch(lazy_epoch);

while (not decide_done) {
vt::runScheduler();
}
theSched()->runSchedulerWhile([&decide_done]{ return not decide_done; });
}

void GossipLB::thunkMigrations() {
Expand Down
4 changes: 1 addition & 3 deletions src/vt/vrt/collection/balance/lb_invoke/invoke.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,7 @@ void LBManager::waitLBCollective() {
// The invocation should only happen collectively across the whole all nodes.
//
theTerm()->produce();
while (synced_in_lb_) {
vt::runScheduler();
}
theSched()->runSchedulerWhile([this]{ return synced_in_lb_; });
synced_in_lb_ = true;
theTerm()->consume();

Expand Down
6 changes: 3 additions & 3 deletions src/vt/vrt/collection/manager.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1826,9 +1826,9 @@ CollectionManager::constructCollectiveMap(
// Wait for construction to finish before we release control to the user; this
// ensures that other parts of the system do not migrate elements until the
// group construction is complete
while (constructed_.find(proxy) == constructed_.end()) {
vt::runScheduler();
}
theSched()->runSchedulerWhile([this, &proxy]{
return constructed_.find(proxy) == constructed_.end();
});

debug_print(
vrt_coll, node,
Expand Down

0 comments on commit 76318a8

Please sign in to comment.