Skip to content

Commit

Permalink
Utilize rclcpp::WaitSet as part of the executors (ros2#2142)
Browse files Browse the repository at this point in the history
* Deprecate callback_group call taking context

Signed-off-by: Michael Carroll <[email protected]>

* Add base executor objects that can be used by implementors

Signed-off-by: Michael Carroll <[email protected]>

* Template common operations

Signed-off-by: Michael Carroll <[email protected]>

* Address reviewer feedback:

* Add callback to EntitiesCollector constructor
* Make function to check automatically added callback groups take a list

Signed-off-by: Michael Carroll <[email protected]>

* Lint

Signed-off-by: Michael Carroll <[email protected]>

* Address reviewer feedback and fix templates

Signed-off-by: Michael Carroll <[email protected]>

* Lint and docs

Signed-off-by: Michael Carroll <[email protected]>

* Make executor own the notify waitable

Signed-off-by: Michael Carroll <[email protected]>

* Add pending queue to collector, remove from waitable

Also change node's get_guard_condition to return shared_ptr

Signed-off-by: Michael Carroll <[email protected]>

* Change interrupt guard condition to shared_ptr

Check if guard condition is valid before adding it to the waitable

Signed-off-by: Michael Carroll <[email protected]>

* Lint and docs

Signed-off-by: Michael Carroll <[email protected]>

* Utilize rclcpp::WaitSet as part of the executors

Signed-off-by: Michael Carroll <[email protected]>

* Don't exchange atomic twice

Signed-off-by: Michael Carroll <[email protected]>

* Fix add_node and add more tests

Signed-off-by: Michael Carroll <[email protected]>

* Make get_notify_guard_condition follow API tick-tock

Signed-off-by: Michael Carroll <[email protected]>

* Improve callback group tick-tocking

Signed-off-by: Michael Carroll <[email protected]>

* Don't lock twice

Signed-off-by: Michael Carroll <[email protected]>

* Address reviewer feedback

Signed-off-by: Michael Carroll <[email protected]>

* Add thread safety annotations and make locks consistent

Signed-off-by: Michael Carroll <[email protected]>

* @wip

Signed-off-by: Michael Carroll <[email protected]>

* Reset callback groups for multithreaded executor

Signed-off-by: Michael Carroll <[email protected]>

* Avoid many small function calls when building executables

Signed-off-by: Michael Carroll <[email protected]>

* Re-trigger guard condition if buffer has data

Signed-off-by: Michael Carroll <[email protected]>

* Address reviewer feedback

Signed-off-by: Michael Carroll <[email protected]>

* Trace points

Signed-off-by: Michael Carroll <[email protected]>

* Remove tracepoints

Signed-off-by: Michael Carroll <[email protected]>

* Reducing diff

Signed-off-by: Michael Carroll <[email protected]>

* Reduce diff

Signed-off-by: Michael Carroll <[email protected]>

* Uncrustify

Signed-off-by: Michael Carroll <[email protected]>

* Restore tests

Signed-off-by: Michael Carroll <[email protected]>

* Back to weak_ptr and reduce test time

Signed-off-by: Michael Carroll <[email protected]>

* reduce diff and lint

Signed-off-by: Michael Carroll <[email protected]>

* Restore static single threaded tests that weren't working before

Signed-off-by: Michael Carroll <[email protected]>

* Restore more tests

Signed-off-by: Michael Carroll <[email protected]>

* Fix multithreaded test

Signed-off-by: Michael Carroll <[email protected]>

* Fix assert

Signed-off-by: Michael Carroll <[email protected]>

* Fix constructor test

Signed-off-by: Michael Carroll <[email protected]>

* Change ready_executables signature back

Signed-off-by: Michael Carroll <[email protected]>

* Don't enforce removing callback groups before nodes

Signed-off-by: Michael Carroll <[email protected]>

* Remove the "add_valid_node" API

Signed-off-by: Michael Carroll <[email protected]>

* Only notify if the trigger condition is valid

Signed-off-by: Michael Carroll <[email protected]>

* Only trigger if valid and needed

Signed-off-by: Michael Carroll <[email protected]>

* Fix spin_some/spin_all implementation

Signed-off-by: Michael Carroll <[email protected]>

* Restore single threaded executor

Signed-off-by: Michael Carroll <[email protected]>

* Picking ABI-incompatible executor changes

Signed-off-by: Michael Carroll <[email protected]>

* Add PIMPL

Signed-off-by: Michael Carroll <[email protected]>

* Additional waitset prune

Signed-off-by: Michael Carroll <[email protected]>

* Fix bad merge

Signed-off-by: Michael Carroll <[email protected]>

* Expand test timeout

Signed-off-by: Michael Carroll <[email protected]>

* Introduce method to clear expired entities from a collection

Signed-off-by: Michael Carroll <[email protected]>

* Make sure to call remove_expired_entities().

Signed-off-by: Chris Lalancette <[email protected]>

* Prune queued work when callback group is removed

Signed-off-by: Michael Carroll <[email protected]>

* Prune subscriptions from dynamic storage

Signed-off-by: Michael Carroll <[email protected]>

* Styles fixes.

Signed-off-by: Chris Lalancette <[email protected]>

* Re-trigger guard conditions

Signed-off-by: Michael Carroll <[email protected]>

* Condense to just use watiable.take_data

Signed-off-by: Michael Carroll <[email protected]>

* Lint

Signed-off-by: Michael Carroll <[email protected]>

* Address reviewer comments (nits)

Signed-off-by: Michael Carroll <[email protected]>

* Lock mutex when copying

Signed-off-by: Michael Carroll <[email protected]>

* Refactors to static single threaded based on reviewers

Signed-off-by: Michael Carroll <[email protected]>

* More small refactoring

Signed-off-by: Michael Carroll <[email protected]>

* Lint

Signed-off-by: Michael Carroll <[email protected]>

* Lint

Signed-off-by: Michael Carroll <[email protected]>

* Add ready executable accessors to WaitResult

Signed-off-by: Michael Carroll <[email protected]>

* Make use of accessors from wait_set

Signed-off-by: Michael Carroll <[email protected]>

* Fix tests

Signed-off-by: Michael Carroll <[email protected]>

* Fix more tests

Signed-off-by: Michael Carroll <[email protected]>

* Tidy up single threaded executor implementation

Signed-off-by: Michael Carroll <[email protected]>

* Don't null out timer, rely on call

Signed-off-by: Michael Carroll <[email protected]>

* change how timers are checked from wait result in executors

Signed-off-by: William Woodall <[email protected]>

* peak -> peek

Signed-off-by: William Woodall <[email protected]>

* fix bug in next_waitable logic

Signed-off-by: William Woodall <[email protected]>

* fix bug in StaticSTE that broke the add callback groups to executor tests

Signed-off-by: William Woodall <[email protected]>

* style

Signed-off-by: William Woodall <[email protected]>

---------

Signed-off-by: Michael Carroll <[email protected]>
Signed-off-by: Michael Carroll <[email protected]>
Signed-off-by: Chris Lalancette <[email protected]>
Signed-off-by: William Woodall <[email protected]>
Co-authored-by: Chris Lalancette <[email protected]>
Co-authored-by: William Woodall <[email protected]>
  • Loading branch information
3 people authored and apojomovsky committed Jun 17, 2024
1 parent 77a6992 commit 2228e75
Show file tree
Hide file tree
Showing 31 changed files with 976 additions and 1,537 deletions.
1 change: 0 additions & 1 deletion rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/executors/executor_notify_waitable.cpp
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/executors/static_executor_entities_collector.cpp
src/rclcpp/executors/static_single_threaded_executor.cpp
src/rclcpp/expand_topic_or_service_name.cpp
src/rclcpp/experimental/executors/events_executor/events_executor.cpp
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/any_executable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ struct AnyExecutable
rclcpp::ClientBase::SharedPtr client;
rclcpp::Waitable::SharedPtr waitable;
// These are used to keep the scope on the containing items
rclcpp::CallbackGroup::SharedPtr callback_group;
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base;
std::shared_ptr<void> data;
rclcpp::CallbackGroup::SharedPtr callback_group {nullptr};
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base {nullptr};
std::shared_ptr<void> data {nullptr};
};

} // namespace rclcpp
Expand Down
27 changes: 25 additions & 2 deletions rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,41 @@ class CallbackGroup
* \return the number of entities in the callback group.
*/
RCLCPP_PUBLIC
size_t size() const;
size_t
size() const;

/// Return a reference to the 'can be taken' atomic boolean.
/**
* The resulting bool will be true in the case that no executor is currently
* using an executable entity from this group.
* The resulting bool will be false in the case that an executor is currently
* using an executable entity from this group, and the group policy doesn't
* allow a second take (eg mutual exclusion)
* \return a reference to the flag
*/
RCLCPP_PUBLIC
std::atomic_bool &
can_be_taken_from();

/// Get the group type.
/**
* \return the group type
*/
RCLCPP_PUBLIC
const CallbackGroupType &
type() const;

/// Collect all of the entity pointers contained in this callback group.
/**
* \param[in] sub_func Function to execute for each subscription
* \param[in] service_func Function to execute for each service
* \param[in] client_func Function to execute for each client
* \param[in] timer_func Function to execute for each timer
* \param[in] waitable_fuinc Function to execute for each waitable
*/
RCLCPP_PUBLIC
void collect_all_ptrs(
void
collect_all_ptrs(
std::function<void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,
std::function<void(const rclcpp::ServiceBase::SharedPtr &)> service_func,
std::function<void(const rclcpp::ClientBase::SharedPtr &)> client_func,
Expand Down
180 changes: 29 additions & 151 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,24 @@

#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/executors/executor_notify_waitable.hpp"
#include "rcpputils/scope_exit.hpp"

#include "rclcpp/context.hpp"
#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/executor_options.hpp"
#include "rclcpp/executors/executor_entities_collection.hpp"
#include "rclcpp/executors/executor_entities_collector.hpp"
#include "rclcpp/future_return_code.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/wait_set.hpp"

namespace rclcpp
{

typedef std::map<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;

// Forward declaration is used in convenience method signature.
class Node;
class ExecutorImplementation;
Expand Down Expand Up @@ -403,17 +401,6 @@ class Executor
void
cancel();

/// Support dynamic switching of the memory strategy.
/**
* Switching the memory strategy while the executor is spinning in another threading could have
* unintended consequences.
* \param[in] memory_strategy Shared pointer to the memory strategy to set.
* \throws std::runtime_error if memory_strategy is null
*/
RCLCPP_PUBLIC
void
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);

/// Returns true if the executor is currently spinning.
/**
* This function can be called asynchronously from any thread.
Expand Down Expand Up @@ -498,6 +485,11 @@ class Executor
static void
execute_client(rclcpp::ClientBase::SharedPtr client);

/// Gather all of the waitable entities from associated nodes and callback groups.
RCLCPP_PUBLIC
void
collect_entities();

/// Block until more work becomes avilable or timeout is reached.
/**
* Builds a set of waitable entities, which are passed to the middleware.
Expand All @@ -509,62 +501,6 @@ class Executor
void
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Find node associated with a callback group
/**
* \param[in] weak_groups_to_nodes map of callback groups to nodes
* \param[in] group callback group to find assocatiated node
* \return Pointer to associated node if found, else nullptr
*/
RCLCPP_PUBLIC
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
get_node_by_group(
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
rclcpp::CallbackGroup::SharedPtr group);

/// Return true if the node has been added to this executor.
/**
* \param[in] node_ptr a shared pointer that points to a node base interface
* \param[in] weak_groups_to_nodes map to nodes to lookup
* \return true if the node is associated with the executor, otherwise false
*/
RCLCPP_PUBLIC
bool
has_node(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;

/// Find the callback group associated with a timer
/**
* \param[in] timer Timer to find associated callback group
* \return Pointer to callback group node if found, else nullptr
*/
RCLCPP_PUBLIC
rclcpp::CallbackGroup::SharedPtr
get_group_by_timer(rclcpp::TimerBase::SharedPtr timer);

/// Add a callback group to an executor
/**
* \see rclcpp::Executor::add_callback_group
*/
RCLCPP_PUBLIC
virtual void
add_callback_group_to_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);

/// Remove a callback group from the executor.
/**
* \see rclcpp::Executor::remove_callback_group
*/
RCLCPP_PUBLIC
virtual void
remove_callback_group_from_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);

/// Check for executable in ready state and populate union structure.
/**
* \param[out] any_executable populated union structure of ready executable
Expand All @@ -575,33 +511,6 @@ class Executor
bool
get_next_ready_executable(AnyExecutable & any_executable);

/// Check for executable in ready state and populate union structure.
/**
* This is the implementation of get_next_ready_executable that takes into
* account the current state of callback groups' association with nodes and
* executors.
*
* This checks in a particular order for available work:
* * Timers
* * Subscriptions
* * Services
* * Clients
* * Waitable
*
* If the next executable is not associated with this executor/node pair,
* then this method will return false.
*
* \param[out] any_executable populated union structure of ready executable
* \param[in] weak_groups_to_nodes mapping of callback groups to nodes
* \return true if an executable was ready and any_executable was populated,
* otherwise false
*/
RCLCPP_PUBLIC
bool
get_next_ready_executable_from_map(
AnyExecutable & any_executable,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);

/// Wait for executable in ready state and populate union structure.
/**
* If an executable is ready, it will return immediately, otherwise
Expand All @@ -619,21 +528,6 @@ class Executor
AnyExecutable & any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Add all callback groups that can be automatically added from associated nodes.
/**
* The executor, before collecting entities, verifies if any callback group from
* nodes associated with the executor, which is not already associated to an executor,
* can be automatically added to this executor.
* This takes care of any callback group that has been added to a node but not explicitly added
* to the executor.
* It is important to note that in order for the callback groups to be automatically added to an
* executor through this function, the node of the callback groups needs to have been added
* through the `add_node` method.
*/
RCLCPP_PUBLIC
virtual void
add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);

/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;

Expand All @@ -643,16 +537,8 @@ class Executor
/// Guard condition for signaling the rmw layer to wake up for system shutdown.
std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;

/// Wait set for managing entities that the rmw layer waits on.
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();

// Mutex to protect the subsequent memory_strategy_.
mutable std::mutex mutex_;

/// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr
memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);

/// The context associated with this executor.
std::shared_ptr<rclcpp::Context> context_;

Expand All @@ -662,39 +548,31 @@ class Executor
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);

typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;

typedef std::map<rclcpp::CallbackGroup::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
WeakCallbackGroupsToGuardConditionsMap;

/// maps nodes to guard conditions
WeakNodesToGuardConditionsMap
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// Waitable containing guard conditions controlling the executor flow.
/**
* This waitable contains the interrupt and shutdown guard condition, as well
* as the guard condition associated with each node and callback group.
* By default, if any change is detected in the monitored entities, the notify
* waitable will awake the executor and rebuild the collections.
*/
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;

/// maps callback groups to guard conditions
WeakCallbackGroupsToGuardConditionsMap
weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::atomic_bool entities_need_rebuild_;

/// maps callback groups associated to nodes
WeakCallbackGroupsToNodesMap
weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// Collector used to associate executable entities from nodes and guard conditions
rclcpp::executors::ExecutorEntitiesCollector collector_;

/// maps callback groups to nodes associated with executor
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// WaitSet to be waited on.
rclcpp::WaitSet wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>> wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps all callback groups to nodes
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// Hold the current state of the collection being waited on by the waitset
rclcpp::executors::ExecutorEntitiesCollection current_collection_ RCPPUTILS_TSA_GUARDED_BY(
mutex_);

/// nodes that are associated with the executor
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// Hold the current state of the notify waitable being waited on by the waitset
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> current_notify_waitable_
RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// shutdown callback handle registered to Context
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ struct ExecutorEntitiesCollection

/// Clear the entities collection
void clear();

/// Remove entities that have expired weak ownership
/**
* \return The total number of removed entities
*/
size_t remove_expired_entities();
};

/// Build an entities collection from callback groups
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
~ExecutorNotifyWaitable() override = default;

RCLCPP_PUBLIC
ExecutorNotifyWaitable(const ExecutorNotifyWaitable & other);
ExecutorNotifyWaitable(ExecutorNotifyWaitable & other);


RCLCPP_PUBLIC
ExecutorNotifyWaitable & operator=(const ExecutorNotifyWaitable & other);
ExecutorNotifyWaitable & operator=(ExecutorNotifyWaitable & other);

/// Add conditions to the wait set
/**
Expand Down
Loading

0 comments on commit 2228e75

Please sign in to comment.