Skip to content

Commit

Permalink
Utilize rclcpp::WaitSet as part of the executors
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Carroll <[email protected]>
  • Loading branch information
mjcarroll committed Mar 29, 2023
1 parent 9099635 commit ad3f3e5
Show file tree
Hide file tree
Showing 18 changed files with 337 additions and 3,047 deletions.
4 changes: 1 addition & 3 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/executor.cpp
src/rclcpp/executors.cpp
src/rclcpp/executors/executor_notify_waitable.cpp
src/rclcpp/executors/executor_entities_collector.cpp
src/rclcpp/executors/executor_entities_collection.cpp
src/rclcpp/executors/executor_entities_collector.cpp
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/executors/static_executor_entities_collector.cpp
src/rclcpp/executors/static_single_threaded_executor.cpp
src/rclcpp/expand_topic_or_service_name.cpp
src/rclcpp/future_return_code.cpp
src/rclcpp/generic_publisher.cpp
Expand Down
4 changes: 4 additions & 0 deletions rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ class CallbackGroup
bool
has_valid_node();

RCLCPP_PUBLIC
bool
has_valid_node();

RCLCPP_PUBLIC
std::atomic_bool &
can_be_taken_from();
Expand Down
163 changes: 8 additions & 155 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cassert>
#include <chrono>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <list>
#include <map>
Expand All @@ -35,20 +36,17 @@
#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/executor_options.hpp"
#include "rclcpp/executors/executor_entities_collection.hpp"
#include "rclcpp/executors/executor_entities_collector.hpp"
#include "rclcpp/future_return_code.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/wait_set.hpp"

namespace rclcpp
{

typedef std::map<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;

// Forward declaration is used in convenience method signature.
class Node;

Expand Down Expand Up @@ -402,17 +400,6 @@ class Executor
void
cancel();

/// Support dynamic switching of the memory strategy.
/**
* Switching the memory strategy while the executor is spinning in another threading could have
* unintended consequences.
* \param[in] memory_strategy Shared pointer to the memory strategy to set.
* \throws std::runtime_error if memory_strategy is null
*/
RCLCPP_PUBLIC
void
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);

/// Returns true if the executor is currently spinning.
/**
* This function can be called asynchronously from any thread.
Expand Down Expand Up @@ -508,62 +495,6 @@ class Executor
void
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Find node associated with a callback group
/**
* \param[in] weak_groups_to_nodes map of callback groups to nodes
* \param[in] group callback group to find assocatiated node
* \return Pointer to associated node if found, else nullptr
*/
RCLCPP_PUBLIC
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
get_node_by_group(
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
rclcpp::CallbackGroup::SharedPtr group);

/// Return true if the node has been added to this executor.
/**
* \param[in] node_ptr a shared pointer that points to a node base interface
* \param[in] weak_groups_to_nodes map to nodes to lookup
* \return true if the node is associated with the executor, otherwise false
*/
RCLCPP_PUBLIC
bool
has_node(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;

/// Find the callback group associated with a timer
/**
* \param[in] timer Timer to find associated callback group
* \return Pointer to callback group node if found, else nullptr
*/
RCLCPP_PUBLIC
rclcpp::CallbackGroup::SharedPtr
get_group_by_timer(rclcpp::TimerBase::SharedPtr timer);

/// Add a callback group to an executor
/**
* \see rclcpp::Executor::add_callback_group
*/
RCLCPP_PUBLIC
virtual void
add_callback_group_to_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);

/// Remove a callback group from the executor.
/**
* \see rclcpp::Executor::remove_callback_group
*/
RCLCPP_PUBLIC
virtual void
remove_callback_group_from_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);

/// Check for executable in ready state and populate union structure.
/**
* \param[out] any_executable populated union structure of ready executable
Expand All @@ -574,33 +505,6 @@ class Executor
bool
get_next_ready_executable(AnyExecutable & any_executable);

/// Check for executable in ready state and populate union structure.
/**
* This is the implementation of get_next_ready_executable that takes into
* account the current state of callback groups' association with nodes and
* executors.
*
* This checks in a particular order for available work:
* * Timers
* * Subscriptions
* * Services
* * Clients
* * Waitable
*
* If the next executable is not associated with this executor/node pair,
* then this method will return false.
*
* \param[out] any_executable populated union structure of ready executable
* \param[in] weak_groups_to_nodes mapping of callback groups to nodes
* \return true if an executable was ready and any_executable was populated,
* otherwise false
*/
RCLCPP_PUBLIC
bool
get_next_ready_executable_from_map(
AnyExecutable & any_executable,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);

/// Wait for executable in ready state and populate union structure.
/**
* If an executable is ready, it will return immediately, otherwise
Expand All @@ -618,21 +522,6 @@ class Executor
AnyExecutable & any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Add all callback groups that can be automatically added from associated nodes.
/**
* The executor, before collecting entities, verifies if any callback group from
* nodes associated with the executor, which is not already associated to an executor,
* can be automatically added to this executor.
* This takes care of any callback group that has been added to a node but not explicitly added
* to the executor.
* It is important to note that in order for the callback groups to be automatically added to an
* executor through this function, the node of the callback groups needs to have been added
* through the `add_node` method.
*/
RCLCPP_PUBLIC
virtual void
add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);

/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;

Expand All @@ -641,16 +530,8 @@ class Executor

std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;

/// Wait set for managing entities that the rmw layer waits on.
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();

// Mutex to protect the subsequent memory_strategy_.
mutable std::mutex mutex_;

/// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr
memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);

/// The context associated with this executor.
std::shared_ptr<rclcpp::Context> context_;

Expand All @@ -660,39 +541,11 @@ class Executor
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);

typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;

typedef std::map<rclcpp::CallbackGroup::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
WeakCallbackGroupsToGuardConditionsMap;

/// maps nodes to guard conditions
WeakNodesToGuardConditionsMap
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps callback groups to guard conditions
WeakCallbackGroupsToGuardConditionsMap
weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps callback groups associated to nodes
WeakCallbackGroupsToNodesMap
weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps callback groups to nodes associated with executor
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps all callback groups to nodes
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
rclcpp::executors::ExecutorEntitiesCollector collector_;
rclcpp::executors::ExecutorEntitiesCollection current_collection_;

/// nodes that are associated with the executor
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::shared_ptr<rclcpp::WaitSet> wait_set_;
std::deque<rclcpp::AnyExecutable> ready_executables_;

/// shutdown callback handle registered to Context
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_;
Expand Down
1 change: 0 additions & 1 deletion rclcpp/include/rclcpp/executors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

#include "rclcpp/executors/multi_threaded_executor.hpp"
#include "rclcpp/executors/single_threaded_executor.hpp"
#include "rclcpp/executors/static_single_threaded_executor.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,5 @@ class ExecutorEntitiesCollector
};
} // namespace executors
} // namespace rclcpp

//
#endif // RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTOR_HPP_
Loading

0 comments on commit ad3f3e5

Please sign in to comment.