Skip to content

Commit

Permalink
#703 add 'nested scheduler' infrastructure + trace support
Browse files Browse the repository at this point in the history
- Expose the notion of begin/end scheduler LOOP.

  The trace works with this to ensure begin-end event
  lifetimes are properly managed. It does this by synthetic
  popping and pushing the appopriate event markers.

  It is possible that code can leave and enter idle
  without logging any events; or that a runSchedulerWhile
  can begin and end without entering and idle.
  (Although it would be trivially to 'blip' and idle event.)

- Also fixes some minor issues, such as idle event time
  ordering correctness, removes fictiously used ctor, etc.
  • Loading branch information
pnstickne committed Mar 16, 2020
1 parent dea265e commit 83796f4
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 75 deletions.
1 change: 1 addition & 0 deletions cmake/turn_on_warnings.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ if(NOT hasParent)
enable_cxx_compiler_flag_if_supported("-pedantic")
enable_cxx_compiler_flag_if_supported("-Wshadow")
enable_cxx_compiler_flag_if_supported("-Wno-unknown-pragmas")
enable_cxx_compiler_flag_if_supported("-Wsign-compare")
endif()
31 changes: 29 additions & 2 deletions src/vt/scheduler/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ namespace vt { namespace sched {
}

Scheduler::Scheduler() {
event_triggers.resize(SchedulerEventType::SchedulerEventSize + 1);
event_triggers_once.resize(SchedulerEventType::SchedulerEventSize + 1);
auto event_count = SchedulerEventType::LastSchedulerEvent + 1;
event_triggers.resize(event_count);
event_triggers_once.resize(event_count);

progress_time_enabled_ = arguments::ArgConfig::vt_sched_progress_sec != 0.0;
}

Expand Down Expand Up @@ -222,6 +224,31 @@ void Scheduler::scheduler(bool msg_only) {
}
}

void Scheduler::runSchedulerWhile(std::function<bool()> cond) {
vtAssert(
action_depth_ == 0 or not is_idle,
"Nested schedulers never expected from idle context"
);

triggerEvent(SchedulerEventType::BeginSchedulerLoop);

while (cond()) {
runScheduler();
}

// At the end of a nested scheduler context, always ensure to enter into
// a non-idle state as the outer work resumes, even if the scheduler
// work queue is itself empty. That is, a nested scheduler should
// ONLY trigger an idle state during the duration of the scheduling loop.
// The opposite does not hold: idle is only entered when there is no work.
if (action_depth_ > 0 and is_idle) {
is_idle = false;
triggerEvent(SchedulerEventType::EndIdle);
}

triggerEvent(SchedulerEventType::EndSchedulerLoop);
}

void Scheduler::triggerEvent(SchedulerEventType const& event) {
vtAssert(
event_triggers.size() >= event, "Must be large enough to hold this event"
Expand Down
15 changes: 14 additions & 1 deletion src/vt/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ enum SchedulerEvent {
EndIdle = 1,
BeginIdleMinusTerm = 2,
EndIdleMinusTerm = 3,
SchedulerEventSize = 4
BeginSchedulerLoop = 4,
EndSchedulerLoop = 5,

LastSchedulerEvent = 5,
};

struct Scheduler {
Expand All @@ -92,6 +95,16 @@ struct Scheduler {
void scheduler(bool msg_only = false);
void runProgress(bool msg_only = false);

/**
* \brief Runs the scheduler until a condition is met.
*
* Runs the scheduler until a condition is met.
* This form SHOULD be used instead of "while (..) { runScheduler(..) }"
* in all cases of nested scheduler loops, such as during a barrier,
* in order to ensure proper event unwinding and idle time tracking.
*/
void runSchedulerWhile(std::function<bool()> cond);

void registerTrigger(SchedulerEventType const& event, TriggerType trigger);
void registerTriggerOnce(
SchedulerEventType const& event, TriggerType trigger
Expand Down
150 changes: 86 additions & 64 deletions src/vt/trace/trace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ struct TraceEventSeqCompare {
}
};

Trace::Trace(std::string const& in_prog_name, std::string const& in_trace_name)
: prog_name_(in_prog_name), trace_name_(in_trace_name),
start_time_(getCurrentTime()), log_file_(nullptr)
Trace::Trace()
{
/*
* Incremental flush mode for zlib. Several options are available:
Expand All @@ -116,36 +114,29 @@ Trace::Trace(std::string const& in_prog_name, std::string const& in_trace_name)
*/

incremental_flush_mode = Z_SYNC_FLUSH;
}

Trace::Trace() { }

/*static*/ void Trace::traceBeginIdleTrigger() {
#if backend_check_enabled(trace_enabled)
if (not theTrace()->inIdleEvent()) {
theTrace()->beginIdle();
}
#endif
}

/*static*/ void Trace::traceEndIdleTrigger() {
#if backend_check_enabled(trace_enabled)
if (theTrace()->inIdleEvent()) {
theTrace()->endIdle();
}
#endif
// The first (implied) scheduler always starts with an empty event stack.
event_holds_.push_back(0);
}

void Trace::initialize() {
#if backend_check_enabled(trace_enabled)
theSched()->registerTrigger(
sched::SchedulerEvent::BeginSchedulerLoop, [this]{ beginSchedulerLoop(); }
);
theSched()->registerTrigger(
sched::SchedulerEvent::BeginIdle, traceBeginIdleTrigger
sched::SchedulerEvent::EndSchedulerLoop, [this]{ endSchedulerLoop(); }
);
theSched()->registerTrigger(
sched::SchedulerEvent::EndIdle, traceEndIdleTrigger
sched::SchedulerEvent::BeginIdle, [this]{ beginIdle(); }
);
theSched()->registerTrigger(
sched::SchedulerEvent::EndIdle, [this]{ endIdle(); }
);

// Register a trace user event to demarcate flushes that occur
flush_event_ = vt::trace::registerEventCollective("trace_flush");
#endif
}

void Trace::loadAndBroadcastSpec() {
Expand Down Expand Up @@ -446,11 +437,17 @@ void Trace::addUserEventBracketedManual(
addUserEventBracketed(id, begin, end);
}

void Trace::addMemoryEvent(std::size_t memory, double time) {
auto const type = TraceConstantsType::MemoryUsageCurrent;
logEvent(LogType{time, type, memory});
}

TraceProcessingTag Trace::beginProcessing(
TraceEntryIDType const ep, TraceMsgLenType const len,
TraceEventIDType const event, NodeType const from_node, double const time,
uint64_t const idx1, uint64_t const idx2, uint64_t const idx3,
uint64_t const idx4
TraceEventIDType const event, NodeType const from_node,
uint64_t const idx1, uint64_t const idx2,
uint64_t const idx3, uint64_t const idx4,
double const time
) {
if (not checkDynamicRuntimeEnabled()) {
return TraceProcessingTag{};
Expand All @@ -464,6 +461,7 @@ TraceProcessingTag Trace::beginProcessing(

auto const type = TraceConstantsType::BeginProcessing;

emitTraceForTopProcessingEvent(time, TraceConstantsType::EndProcessing);
TraceEventIDType loggedEvent = logEvent(
LogType{
time, ep, type, event, len, from_node, idx1, idx2, idx3, idx4
Expand Down Expand Up @@ -494,27 +492,26 @@ void Trace::endProcessing(
return;
}

if (idle_begun_) {
// TODO: This should be a prohibited case - vt 1.1?
endIdle(time);
}

vtAssert(
not open_events_.empty()
// This is current contract expectations; however it precludes async closing.
and open_events_.top().ep == ep
and open_events_.top().event == event,
and open_events_.back().ep == ep
and open_events_.back().event == event,
"Event being closed must be on the top of the open event stack."
);

// Final event is same as original with a few .. tweaks.
// Always done PRIOR TO restarts.
traces_.push(
LogType{open_events_.top(), time, TraceConstantsType::EndProcessing}
LogType{open_events_.back(), time, TraceConstantsType::EndProcessing}
);
open_events_.pop();

if (not open_events_.empty()) {
// Emit a '[re]start' event for the reactivated stack item.
traces_.push(
LogType{open_events_.top(), time, TraceConstantsType::BeginProcessing}
);
}
open_events_.pop_back();
emitTraceForTopProcessingEvent(time, TraceConstantsType::BeginProcessing);

// Unlike logEvent there is currently no flush here.
}
Expand Down Expand Up @@ -549,12 +546,29 @@ void Trace::endProcessing(
);
}

void Trace::addMemoryEvent(std::size_t memory, double time) {
auto const type = TraceConstantsType::MemoryUsageCurrent;
logEvent(LogType{time, type, memory});
void Trace::beginSchedulerLoop() {
// Capture the current open event depth.
event_holds_.push_back(open_events_.size());
}

void Trace::endSchedulerLoop() {
vtAssert(
event_holds_.size() > 1,
"Too many endSchedulerLoop calls."
);

vtAssert(
event_holds_.back() == open_events_.size(),
"Processing events opened in a scheduler loop must be closed by loop end."
);

event_holds_.pop_back();
}

void Trace::beginIdle(double const time) {
if (idle_begun_) {
return;
}
if (not checkDynamicRuntimeEnabled()) {
return;
}
Expand All @@ -566,13 +580,17 @@ void Trace::beginIdle(double const time) {
auto const type = TraceConstantsType::BeginIdle;
NodeType const node = theContext()->getNode();

emitTraceForTopProcessingEvent(time, TraceConstantsType::EndProcessing);
logEvent(
LogType{time, type, node}
);
idle_begun_ = true; // must set AFTER logEvent
}

void Trace::endIdle(double const time) {
if (not idle_begun_) {
return;
}
if (not checkDynamicRuntimeEnabled()) {
return;
}
Expand All @@ -584,10 +602,11 @@ void Trace::endIdle(double const time) {
auto const type = TraceConstantsType::EndIdle;
NodeType const node = theContext()->getNode();

idle_begun_ = false; // must set BEFORE logEvent
logEvent(
LogType{time, type, node}
);
idle_begun_ = false; // must set AFTER logEvent
emitTraceForTopProcessingEvent(time, TraceConstantsType::BeginProcessing);
}

TraceEventIDType Trace::messageCreation(
Expand Down Expand Up @@ -664,11 +683,12 @@ void Trace::setTraceEnabledCurrentPhase(PhaseType cur_phase) {
if (trace_enabled_cur_phase_ != ret) {
auto time = getCurrentTime();
// Close and pop everything, we are disabling traces at this point
while (not open_events_.empty()) {
std::size_t hold_at = event_holds_.back();
while (open_events_.size() > hold_at) {
traces_.push(
LogType{open_events_.top(), time, TraceConstantsType::EndProcessing}
LogType{open_events_.back(), time, TraceConstantsType::EndProcessing}
);
open_events_.pop();
open_events_.pop_back();
}

// Go ahead and perform a trace flush when tracing is disabled (and was
Expand Down Expand Up @@ -700,27 +720,17 @@ TraceEventIDType Trace::logEvent(LogType&& log) {
"Event must exist that was logged"
);

// close any idle event as soon as we encounter any other type of event
if (idle_begun_ and
log.type != TraceConstantsType::BeginIdle and
log.type != TraceConstantsType::EndIdle) {
endIdle();
double time = log.time;

// Close any idle event as soon as we encounter any other type of event.
if (idle_begun_) {
// TODO: This should be a prohibited case - vt 1.1?
endIdle(time);
}

switch (log.type) {
case TraceConstantsType::BeginProcessing: {

if (not open_events_.empty()) {
// Emit a 'stop' event for the current stack item;
// another '[re]start' event will be emitted on group end.
double logTime = log.time;
traces_.push(
LogType{open_events_.top(), logTime, TraceConstantsType::EndProcessing}
);
}

open_events_.push(log /* copy, not forwarding rv-ref */);

open_events_.push_back(log /* copy, not forwarding rv-ref */);
break;
}
case TraceConstantsType::EndProcessing: {
Expand All @@ -729,17 +739,17 @@ TraceEventIDType Trace::logEvent(LogType&& log) {
// This case remains for compatibility.

vtAssert(
open_events_.top().ep == log.ep and
open_events_.top().type == TraceConstantsType::BeginProcessing,
open_events_.back().ep == log.ep and
open_events_.back().type == TraceConstantsType::BeginProcessing,
"Top event should be correct type and event"
);

// Steal top event information
LogType const& top = open_events_.top();
LogType const& top = open_events_.back();
TraceProcessingTag processing_tag{top.ep, top.event};
endProcessing(processing_tag);

return trace::no_trace_event;
return trace::no_trace_event; // n.b. pushed eagerly
}
case TraceConstantsType::Creation:
case TraceConstantsType::CreationBcast:
Expand Down Expand Up @@ -779,6 +789,16 @@ TraceEventIDType Trace::logEvent(LogType&& log) {
return event;
}

void Trace::emitTraceForTopProcessingEvent(
double const time, TraceConstantsType const type
) {
if (not open_events_.empty()) {
traces_.push(
LogType{open_events_.back(), time, type}
);
}
}

/*static*/ bool Trace::traceWritingEnabled(NodeType node) {
return (ArgType::vt_trace
and (ArgType::vt_trace_mod == 0
Expand Down Expand Up @@ -806,6 +826,8 @@ void Trace::cleanupTracesFile() {
}

// No more events can be written.
// Close any idle for consistency.
endIdle();
disableTracing();

//--- Dump everything into an output file and close.
Expand Down
Loading

0 comments on commit 83796f4

Please sign in to comment.