Skip to content

Commit

Permalink
Add spin_some support to the StaticSingleThreadedExecutor (#1338)
Browse files Browse the repository at this point in the history
* spin_some/spin_all/spin_once support: static executor

Signed-off-by: Mauro <[email protected]>

* Use spin_once instead of spin_once_impl

Signed-off-by: Mauro Passerino <[email protected]>

* Fix memory leak

Signed-off-by: Mauro Passerino <[email protected]>

* revert spinSome change

Signed-off-by: Mauro Passerino <[email protected]>

* Override spin_once_impl only

This way the StaticSingleThreadedExecutor uses spin_once and
spin_until_future_complete APIs from the base executor class,
but uses its own overrided version of spin_once_impl.

Signed-off-by: Mauro Passerino <[email protected]>

Co-authored-by: Mauro <[email protected]>
  • Loading branch information
mauropasse and Mauro authored Apr 5, 2021
1 parent 7b94f28 commit 06a4ee0
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 165 deletions.
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ class Executor
RCLCPP_DISABLE_COPY(Executor)

RCLCPP_PUBLIC
void
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);

typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ class StaticExecutorEntitiesCollector final
rcl_guard_condition_t * executor_guard_condition);

/// Finalize StaticExecutorEntitiesCollector to clear resources
RCLCPP_PUBLIC
bool
is_init() {return initialized_;}

RCLCPP_PUBLIC
void
fini();
Expand Down Expand Up @@ -339,6 +343,9 @@ class StaticExecutorEntitiesCollector final

/// Executable list: timers, subscribers, clients, services and waitables
rclcpp::experimental::ExecutableList exec_list_;

/// Bool to check if the entities collector has been initialized
bool initialized_ = false;
};

} // namespace executors
Expand Down
143 changes: 45 additions & 98 deletions rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_
#define RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_

#include <chrono>
#include <cassert>
#include <cstdlib>
#include <memory>
Expand Down Expand Up @@ -78,6 +79,42 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
void
spin() override;

/// Static executor implementation of spin some
/**
* This non-blocking function will execute entities that
* were ready when this API was called, until timeout or no
* more work available. Entities that got ready while
* executing work, won't be taken into account here.
*
* Example:
* while(condition) {
* spin_some();
* sleep(); // User should have some sync work or
* // sleep to avoid a 100% CPU usage
* }
*/
RCLCPP_PUBLIC
void
spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)) override;

/// Static executor implementation of spin all
/**
* This non-blocking function will execute entities until
* timeout or no more work available. If new entities get ready
* while executing work available, they will be executed
* as long as the timeout hasn't expired.
*
* Example:
* while(condition) {
* spin_all();
* sleep(); // User should have some sync work or
* // sleep to avoid a 100% CPU usage
* }
*/
RCLCPP_PUBLIC
void
spin_all(std::chrono::nanoseconds max_duration) override;

/// Add a callback group to an executor.
/**
* \sa rclcpp::Executor::add_callback_group
Expand Down Expand Up @@ -155,113 +192,23 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_automatically_added_callback_groups_from_nodes() override;

/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
protected:
/**
* \param[in] future The future to wait on. If this function returns SUCCESS, the future can be
* accessed without blocking (though it may still throw an exception).
* \param[in] timeout Optional timeout parameter, which gets passed to
* Executor::execute_ready_executables.
* `-1` is block forever, `0` is non-blocking.
* If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return
* code.
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*
* Example usage:
* rclcpp::executors::StaticSingleThreadedExecutor exec;
* // ... other part of code like creating node
* // define future
* exec.add_node(node);
* exec.spin_until_future_complete(future);
* @brief Executes ready executables from wait set.
* @param spin_once if true executes only the first ready executable.
* @return true if any executable was ready.
*/
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
rclcpp::FutureReturnCode
spin_until_future_complete(
FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return rclcpp::FutureReturnCode::SUCCESS;
}

auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

entities_collector_->init(&wait_set_, memory_strategy_, &interrupt_guard_condition_);
RCLCPP_SCOPE_EXIT(entities_collector_->fini());

while (rclcpp::ok(this->context_)) {
// Do one set of work.
entities_collector_->refresh_wait_set(timeout_left);
execute_ready_executables();
// Check if the future is set, return SUCCESS if it is.
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return rclcpp::FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return rclcpp::FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
return rclcpp::FutureReturnCode::INTERRUPTED;
}

/// Not yet implemented, see https://github.com/ros2/rclcpp/issues/1219 for tracking
RCLCPP_PUBLIC
void
spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)) override
{
(void)max_duration;
throw rclcpp::exceptions::UnimplementedError(
"spin_some is not implemented for StaticSingleThreadedExecutor, use spin or "
"spin_until_future_complete");
}
bool
execute_ready_executables(bool spin_once = false);

/// Not yet implemented, see https://github.com/ros2/rclcpp/issues/1219 for tracking
RCLCPP_PUBLIC
void
spin_all(std::chrono::nanoseconds) override
{
throw rclcpp::exceptions::UnimplementedError(
"spin_all is not implemented for StaticSingleThreadedExecutor, use spin or "
"spin_until_future_complete");
}
spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive);

/// Not yet implemented, see https://github.com/ros2/rclcpp/issues/1219 for tracking
RCLCPP_PUBLIC
void
spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)) override
{
(void)timeout;
throw rclcpp::exceptions::UnimplementedError(
"spin_once is not implemented for StaticSingleThreadedExecutor, use spin or "
"spin_until_future_complete");
}

protected:
/// Check which executables in ExecutableList struct are ready from wait_set and execute them.
/**
* \param[in] exec_list Structure that can hold subscriptionbases, timerbases, etc
* \param[in] timeout Optional timeout parameter.
*/
RCLCPP_PUBLIC
void
execute_ready_executables();
spin_once_impl(std::chrono::nanoseconds timeout) override;

private:
RCLCPP_DISABLE_COPY(StaticSingleThreadedExecutor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,13 @@ StaticExecutorEntitiesCollector::init(

// Add executor's guard condition
memory_strategy_->add_guard_condition(executor_guard_condition);

// Get memory strategy and executable list. Prepare wait_set_
std::shared_ptr<void> shared_ptr;
execute(shared_ptr);

// The entities collector is now initialized
initialized_ = true;
}

void
Expand Down
109 changes: 105 additions & 4 deletions rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "rclcpp/executors/static_single_threaded_executor.hpp"

#include <chrono>
#include <memory>
#include <vector>

Expand All @@ -29,7 +30,12 @@ StaticSingleThreadedExecutor::StaticSingleThreadedExecutor(
entities_collector_ = std::make_shared<StaticExecutorEntitiesCollector>();
}

StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() {}
StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor()
{
if (entities_collector_->is_init()) {
entities_collector_->fini();
}
}

void
StaticSingleThreadedExecutor::spin()
Expand All @@ -42,7 +48,6 @@ StaticSingleThreadedExecutor::spin()
// Set memory_strategy_ and exec_list_ based on weak_nodes_
// Prepare wait_set_ based on memory_strategy_
entities_collector_->init(&wait_set_, memory_strategy_, &interrupt_guard_condition_);
RCLCPP_SCOPE_EXIT(entities_collector_->fini());

while (rclcpp::ok(this->context_) && spinning.load()) {
// Refresh wait set and wait for work
Expand All @@ -51,6 +56,79 @@ StaticSingleThreadedExecutor::spin()
}
}

void
StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration)
{
// In this context a 0 input max_duration means no duration limit
if (std::chrono::nanoseconds(0) == max_duration) {
max_duration = std::chrono::nanoseconds::max();
}

return this->spin_some_impl(max_duration, false);
}

void
StaticSingleThreadedExecutor::spin_all(std::chrono::nanoseconds max_duration)
{
if (max_duration <= std::chrono::nanoseconds(0)) {
throw std::invalid_argument("max_duration must be positive");
}
return this->spin_some_impl(max_duration, true);
}

void
StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
{
// Make sure the entities collector has been initialized
if (!entities_collector_->is_init()) {
entities_collector_->init(&wait_set_, memory_strategy_, &interrupt_guard_condition_);
}

auto start = std::chrono::steady_clock::now();
auto max_duration_not_elapsed = [max_duration, start]() {
if (std::chrono::nanoseconds(0) == max_duration) {
// told to spin forever if need be
return true;
} else if (std::chrono::steady_clock::now() - start < max_duration) {
// told to spin only for some maximum amount of time
return true;
}
// spun too long
return false;
};

if (spinning.exchange(true)) {
throw std::runtime_error("spin_some() called while already spinning");
}
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );

while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
// Get executables that are ready now
entities_collector_->refresh_wait_set(std::chrono::milliseconds::zero());
// Execute ready executables
bool work_available = execute_ready_executables();
if (!work_available || !exhaustive) {
break;
}
}
}

void
StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
{
// Make sure the entities collector has been initialized
if (!entities_collector_->is_init()) {
entities_collector_->init(&wait_set_, memory_strategy_, &interrupt_guard_condition_);
}

if (rclcpp::ok(context_) && spinning.load()) {
// Wait until we have a ready entity or timeout expired
entities_collector_->refresh_wait_set(timeout);
// Execute ready executables
execute_ready_executables(true);
}
}

void
StaticSingleThreadedExecutor::add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
Expand Down Expand Up @@ -138,14 +216,20 @@ StaticSingleThreadedExecutor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr
this->remove_node(node_ptr->get_node_base_interface(), notify);
}

void
StaticSingleThreadedExecutor::execute_ready_executables()
bool
StaticSingleThreadedExecutor::execute_ready_executables(bool spin_once)
{
bool any_ready_executable = false;

// Execute all the ready subscriptions
for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) {
if (i < entities_collector_->get_number_of_subscriptions()) {
if (wait_set_.subscriptions[i]) {
execute_subscription(entities_collector_->get_subscription(i));
if (spin_once) {
return true;
}
any_ready_executable = true;
}
}
}
Expand All @@ -154,6 +238,10 @@ StaticSingleThreadedExecutor::execute_ready_executables()
if (i < entities_collector_->get_number_of_timers()) {
if (wait_set_.timers[i] && entities_collector_->get_timer(i)->is_ready()) {
execute_timer(entities_collector_->get_timer(i));
if (spin_once) {
return true;
}
any_ready_executable = true;
}
}
}
Expand All @@ -162,6 +250,10 @@ StaticSingleThreadedExecutor::execute_ready_executables()
if (i < entities_collector_->get_number_of_services()) {
if (wait_set_.services[i]) {
execute_service(entities_collector_->get_service(i));
if (spin_once) {
return true;
}
any_ready_executable = true;
}
}
}
Expand All @@ -170,6 +262,10 @@ StaticSingleThreadedExecutor::execute_ready_executables()
if (i < entities_collector_->get_number_of_clients()) {
if (wait_set_.clients[i]) {
execute_client(entities_collector_->get_client(i));
if (spin_once) {
return true;
}
any_ready_executable = true;
}
}
}
Expand All @@ -179,6 +275,11 @@ StaticSingleThreadedExecutor::execute_ready_executables()
if (waitable->is_ready(&wait_set_)) {
auto data = waitable->take_data();
waitable->execute(data);
if (spin_once) {
return true;
}
any_ready_executable = true;
}
}
return any_ready_executable;
}
Loading

0 comments on commit 06a4ee0

Please sign in to comment.