Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple mechanism for collecting statistics during execution #47

Merged
merged 2 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS "Debug" "Release" "MinSizeRel" "RelWithDebInfo")
endif()

option(REACTOR_CPP_PRINT_STATISTICS "Print statistics after execution" OFF)
option(REACTOR_CPP_TRACE "Enable tracing" OFF)
option(REACTOR_CPP_VALIDATE "Enable runtime validation" ON)
if (NOT DEFINED REACTOR_CPP_LOG_LEVEL)
Expand Down
2 changes: 2 additions & 0 deletions include/reactor-cpp/config.hh.in
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef REACTOR_CPP_CONFIG_HH
#define REACTOR_CPP_CONFIG_HH

// NOLINTNEXTLINE
#cmakedefine REACTOR_CPP_PRINT_STATISTICS
// NOLINTNEXTLINE
#cmakedefine REACTOR_CPP_TRACE
// NOLINTNEXTLINE
Expand Down
2 changes: 1 addition & 1 deletion include/reactor-cpp/logging.hh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#ifndef REACTOR_CPP_LOGGING_HH
#define REACTOR_CPP_LOGGING_HH

#include "reactor-cpp/config.hh" //NOLINT
#include "reactor-cpp/config.hh"
#include "reactor-cpp/time.hh"
#include <chrono>
#include <iostream>
Expand Down
2 changes: 2 additions & 0 deletions include/reactor-cpp/scheduler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ private:
void terminate_all_workers();
void set_port_helper(BasePort* port);

void advance_logical_time_to(const Tag& tag);

public:
explicit Scheduler(Environment* env);
~Scheduler();
Expand Down
99 changes: 99 additions & 0 deletions include/reactor-cpp/statistics.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (C) 2023 TU Dresden
* All rights reserved.
*
* Authors:
* Christian Menard
*/

#ifndef REACTOR_CPP_STATISTICS_HH
#define REACTOR_CPP_STATISTICS_HH

#include <atomic>

#include "reactor-cpp/config.hh"
#include "reactor-cpp/logging.hh"

namespace reactor {

class Statistics {
private:
#ifdef REACTOR_CPP_PRINT_STATISTICS
constexpr static bool enabled_{true};
#else
constexpr static bool enabled_{false};
#endif

// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
inline static std::atomic_size_t reactor_instances_{0};
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
inline static std::atomic_size_t connections_{0};
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
inline static std::atomic_size_t reactions_{0};
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
inline static std::atomic_size_t actions_{0};
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
inline static std::atomic_size_t ports_{0};
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
inline static std::atomic_size_t processed_events_{0};
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
inline static std::atomic_size_t processed_reactions_{0};
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
inline static std::atomic_size_t triggered_actions_{0};
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
inline static std::atomic_size_t set_ports_{0};
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
inline static std::atomic_size_t scheduled_actions_{0};

inline static void increment(std::atomic_size_t& counter) {
if constexpr (enabled_) {
counter.fetch_add(1, std::memory_order_release);
}
}

public:
inline static void increment_reactor_instances() { increment(reactor_instances_); }
inline static void increment_connections() { increment(connections_); }
inline static void increment_reactions() { increment(reactions_); }
inline static void increment_actions() { increment(actions_); }
inline static void increment_ports() { increment(ports_); }
inline static void increment_processed_events() { increment(processed_events_); }
inline static void increment_processed_reactions() { increment(processed_reactions_); }
inline static void increment_triggered_actions() { increment(triggered_actions_); }
inline static void increment_set_ports() { increment(set_ports_); }
inline static void increment_scheduled_actions() { increment(scheduled_actions_); }

inline static auto reactor_instances() { return reactor_instances_.load(std::memory_order_acquire); }
inline static auto connections() { return connections_.load(std::memory_order_acquire); }
inline static auto reactions() { return reactions_.load(std::memory_order_acquire); }
inline static auto actions() { return actions_.load(std::memory_order_acquire); }
inline static auto ports() { return ports_.load(std::memory_order_acquire); }
inline static auto processed_events() { return processed_events_.load(std::memory_order_acquire); }
inline static auto processed_reactions() { return processed_reactions_.load(std::memory_order_acquire); }
inline static auto triggered_actions() { return triggered_actions_.load(std::memory_order_acquire); }
inline static auto set_ports() { return set_ports_.load(std::memory_order_acquire); }
inline static auto scheduled_actions() { return scheduled_actions_.load(std::memory_order_acquire); }

inline static void print() {
if constexpr (enabled_) {
reactor::log::Info() << "-----------------------------------------------------------";
reactor::log::Info() << "Program statistics:";
reactor::log::Info() << " - number of reactors: " << reactor_instances();
reactor::log::Info() << " - number of connections: " << connections();
reactor::log::Info() << " - number of reactions " << reactions();
reactor::log::Info() << " - number of actions: " << actions();
reactor::log::Info() << " - number of ports: " << ports();
reactor::log::Info() << "Execution statistics:";
reactor::log::Info() << " - processed events: " << processed_events();
reactor::log::Info() << " - triggered actions: " << triggered_actions();
reactor::log::Info() << " - processed reactions: " << processed_reactions();
reactor::log::Info() << " - set ports set: " << set_ports();
reactor::log::Info() << " - scheduled actions: " << scheduled_actions();
reactor::log::Info() << "-----------------------------------------------------------";
}
}
};

} // namespace reactor

#endif // REACTOR_CPP_STATISTICS_HH
6 changes: 6 additions & 0 deletions lib/environment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "reactor-cpp/logging.hh"
#include "reactor-cpp/port.hh"
#include "reactor-cpp/reaction.hh"
#include "reactor-cpp/statistics.hh"
#include "reactor-cpp/time.hh"

namespace reactor {
Expand Down Expand Up @@ -288,6 +289,11 @@ auto Environment::startup(const TimePoint& start_time) -> std::thread {
for (auto& thread : threads) {
thread.join();
}

// If this is the top level environment, then print some execution statistics
if (this->containing_environment_ == nullptr) {
Statistics::print();
}
});
}

Expand Down
3 changes: 3 additions & 0 deletions lib/port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "reactor-cpp/assert.hh"
#include "reactor-cpp/environment.hh"
#include "reactor-cpp/reaction.hh"
#include "reactor-cpp/statistics.hh"

namespace reactor {

Expand Down Expand Up @@ -43,6 +44,8 @@ void BasePort::base_bind_to(BasePort* port) {
port->inward_binding_ = this;
[[maybe_unused]] bool result = this->outward_bindings_.insert(port).second;
reactor_assert(result);

Statistics::increment_connections();
}

void BasePort::register_dependency(Reaction* reaction, bool is_trigger) noexcept {
Expand Down
17 changes: 17 additions & 0 deletions lib/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "reactor-cpp/logging.hh"
#include "reactor-cpp/port.hh"
#include "reactor-cpp/reaction.hh"
#include "reactor-cpp/statistics.hh"

namespace reactor {

Expand Down Expand Up @@ -66,6 +67,17 @@ ReactorElement::ReactorElement(const std::string& name, ReactorElement::Type typ
validate(type == Type::Reactor || type == Type::Action, "Only reactors and actions can be owned by the environment!");
validate(this->environment_->phase() == Environment::Phase::Construction,
"Reactor elements can only be created during construction phase!");

switch (type) {
case Type::Action:
Statistics::increment_actions();
break;
case Type::Reactor:
Statistics::increment_reactor_instances();
break;
default:
break;
}
}

Reactor::Reactor(const std::string& name, Reactor* container)
Expand All @@ -82,6 +94,7 @@ void Reactor::register_action([[maybe_unused]] BaseAction* action) {
"Actions can only be registered during construction phase!");
[[maybe_unused]] bool result = actions_.insert(action).second;
reactor_assert(result);
Statistics::increment_actions();
}

void Reactor::register_input(BasePort* port) {
Expand All @@ -90,6 +103,7 @@ void Reactor::register_input(BasePort* port) {
"Ports can only be registered during construction phase!");
[[maybe_unused]] bool result = inputs_.insert(port).second;
reactor_assert(result);
Statistics::increment_ports();
}

void Reactor::register_output(BasePort* port) {
Expand All @@ -98,6 +112,7 @@ void Reactor::register_output(BasePort* port) {
"Ports can only be registered during construction phase!");
[[maybe_unused]] bool result = inputs_.insert(port).second;
reactor_assert(result);
Statistics::increment_ports();
}

void Reactor::register_reaction([[maybe_unused]] Reaction* reaction) {
Expand All @@ -107,6 +122,7 @@ void Reactor::register_reaction([[maybe_unused]] Reaction* reaction) {
"Reactions can only be registered during construction phase!");
[[maybe_unused]] bool result = reactions_.insert(reaction).second;
reactor_assert(result);
Statistics::increment_reactions();
}

void Reactor::register_reactor([[maybe_unused]] Reactor* reactor) {
Expand All @@ -115,6 +131,7 @@ void Reactor::register_reactor([[maybe_unused]] Reactor* reactor) {
"Reactions can only be registered during construction phase!");
[[maybe_unused]] bool result = reactors_.insert(reactor).second;
reactor_assert(result);
Statistics::increment_reactor_instances();
}

void Reactor::startup() {
Expand Down
19 changes: 14 additions & 5 deletions lib/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "reactor-cpp/logging.hh"
#include "reactor-cpp/port.hh"
#include "reactor-cpp/reaction.hh"
#include "reactor-cpp/statistics.hh"
#include "reactor-cpp/time_barrier.hh"
#include "reactor-cpp/trace.hh"

Expand Down Expand Up @@ -82,6 +83,8 @@ void Worker::execute_reaction(Reaction* reaction) const {
tracepoint(reactor_cpp, reaction_execution_starts, identity_, reaction->fqn(), scheduler_.logical_time());
reaction->trigger();
tracepoint(reactor_cpp, reaction_execution_finishes, identity_, reaction->fqn(), scheduler_.logical_time());

Statistics::increment_processed_reactions();
}

void Scheduler::schedule() noexcept {
Expand Down Expand Up @@ -297,6 +300,12 @@ void Scheduler::start() {
}
}

void Scheduler::advance_logical_time_to(const Tag& tag) {
log_.debug() << "advance logical time to tag " << tag;
logical_time_.advance_to(tag);
Statistics::increment_processed_events();
}

void Scheduler::next() { // NOLINT
// Notify other environments and let them know that we finished processing the
// current tag
Expand Down Expand Up @@ -348,8 +357,7 @@ void Scheduler::next() { // NOLINT
log_.debug() << "Schedule the last round of reactions including all "
"termination reactions";
triggered_actions_ = event_queue_.extract_next_event();
log_.debug() << "advance logical time to tag " << t_next;
logical_time_.advance_to(t_next);
advance_logical_time_to(t_next);
} else {
return;
}
Expand Down Expand Up @@ -390,9 +398,7 @@ void Scheduler::next() { // NOLINT
// queue.
triggered_actions_ = event_queue_.extract_next_event();

// advance logical time
log_.debug() << "advance logical time to tag " << t_next;
logical_time_.advance_to(t_next);
advance_logical_time_to(t_next);

// If there are no triggered actions at the event, then release the
// current tag and go back to the start of the loop
Expand All @@ -407,6 +413,7 @@ void Scheduler::next() { // NOLINT
log_.debug() << "events: " << triggered_actions_->size();
for (auto* action : *triggered_actions_) {
log_.debug() << "Action " << action->fqn();
Statistics::increment_triggered_actions();
action->setup();
for (auto* reaction : action->triggers()) {
// There is no need to acquire the mutex. At this point the scheduler
Expand All @@ -431,6 +438,7 @@ void Scheduler::schedule_sync(BaseAction* action, const Tag& tag) {
<< " with tag " << tag;
reactor_assert(logical_time_ < tag);
tracepoint(reactor_cpp, schedule_action, action->container()->fqn(), action->name(), tag);
Statistics::increment_scheduled_actions();

const auto& action_list = event_queue_.insert_event_at(tag);
action_list->push_back(action);
Expand Down Expand Up @@ -477,6 +485,7 @@ auto Scheduler::schedule_empty_async_at(const Tag& tag) -> bool {

void Scheduler::set_port(BasePort* port) {
log_.debug() << "Set port " << port->fqn();
Statistics::increment_set_ports();

// We do not check here if port is already in the list. This means clean()
// could be called multiple times for a single port. However, calling
Expand Down