From 80b6ccc69dc09de73fe731f9a94e7a12882cff91 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 26 Feb 2021 15:42:21 -0300 Subject: [PATCH 1/7] Rework executor callback data --- rclcpp/include/rclcpp/client.hpp | 4 +- .../rclcpp/executors/events_executor.hpp | 17 ++-- .../events_executor_entities_collector.hpp | 9 ++ .../executors/events_executor_event_types.hpp | 81 ++++++++++++++++ .../events_executor_notify_waitable.hpp | 7 +- .../experimental/buffers/events_queue.hpp | 10 +- .../buffers/simple_events_queue.hpp | 12 +-- .../subscription_intra_process_base.hpp | 4 +- rclcpp/include/rclcpp/qos_event.hpp | 4 +- rclcpp/include/rclcpp/service.hpp | 4 +- rclcpp/include/rclcpp/subscription_base.hpp | 4 +- rclcpp/include/rclcpp/waitable.hpp | 4 +- rclcpp/src/rclcpp/client.cpp | 7 +- .../src/rclcpp/executors/events_executor.cpp | 20 ++-- .../events_executor_entities_collector.cpp | 93 +++++++++++++++---- rclcpp/src/rclcpp/qos_event.cpp | 7 +- rclcpp/src/rclcpp/service.cpp | 7 +- rclcpp/src/rclcpp/subscription_base.cpp | 7 +- .../subscription_intra_process_base.cpp | 7 +- rclcpp/src/rclcpp/waitable.cpp | 6 +- rclcpp/test/rclcpp/CMakeLists.txt | 2 +- .../rclcpp/executors/test_events_queue.cpp | 11 ++- .../test/rclcpp/executors/test_executors.cpp | 7 +- 23 files changed, 240 insertions(+), 94 deletions(-) create mode 100644 rclcpp/include/rclcpp/executors/events_executor_event_types.hpp diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index 47b657fe5c..1a66d128e6 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -159,8 +159,8 @@ class ClientBase RCLCPP_PUBLIC void set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const; + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const; protected: RCLCPP_DISABLE_COPY(ClientBase) diff --git a/rclcpp/include/rclcpp/executors/events_executor.hpp b/rclcpp/include/rclcpp/executors/events_executor.hpp index 49e3b24c7d..a5ceffd81a 100644 --- a/rclcpp/include/rclcpp/executors/events_executor.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor.hpp @@ -22,6 +22,7 @@ #include "rclcpp/executor.hpp" #include "rclcpp/executors/events_executor_entities_collector.hpp" +#include "rclcpp/executors/events_executor_event_types.hpp" #include "rclcpp/executors/events_executor_notify_waitable.hpp" #include "rclcpp/executors/timers_manager.hpp" #include "rclcpp/experimental/buffers/events_queue.hpp" @@ -214,20 +215,20 @@ class EventsExecutor : public rclcpp::Executor // This function is called by the DDS entities when an event happened, // like a subscription receiving a message. static void - push_event(void * executor_ptr, rmw_listener_event_t event) + push_event(const void * event_data) { - // Check if the executor pointer is not valid - if (!executor_ptr) { - throw std::runtime_error("The executor pointer is not valid."); + if (!event_data) { + throw std::runtime_error("Executor event data not valid."); } - auto this_executor = static_cast(executor_ptr); + auto data = static_cast(event_data); + + executors::EventsExecutor * this_executor = data->executor; // Event queue mutex scope { std::unique_lock lock(this_executor->push_mutex_); - - this_executor->events_queue_->push(event); + this_executor->events_queue_->push({data->entity_id, data->event_type}); } // Notify that the event queue has some events in it. this_executor->events_queue_cv_.notify_one(); @@ -236,7 +237,7 @@ class EventsExecutor : public rclcpp::Executor // Execute a single event RCLCPP_PUBLIC void - execute_event(const rmw_listener_event_t & event); + execute_event(const ExecutorEvent & event); // Queue where entities can push events rclcpp::experimental::buffers::EventsQueue::SharedPtr events_queue_; diff --git a/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp index 7a07d61375..75716d236b 100644 --- a/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp @@ -22,6 +22,7 @@ #include #include "rclcpp/executors/event_waitable.hpp" +#include "rclcpp/executors/events_executor_event_types.hpp" #include "rclcpp/executors/timers_manager.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" @@ -213,6 +214,12 @@ class EventsExecutorEntitiesCollector final void unset_guard_condition_callback(const rcl_guard_condition_t * guard_condition); + void + remove_callback_data(void * entity_id, ExecutorEventType type); + + const EventsExecutorCallbackData * + get_callback_data(void * entity_id, ExecutorEventType type); + /// Return true if the node belongs to the collector /** * \param[in] group_ptr a node base interface shared pointer @@ -262,6 +269,8 @@ class EventsExecutorEntitiesCollector final EventsExecutor * associated_executor_ = nullptr; /// Instance of the timers manager used by the associated executor TimersManager::SharedPtr timers_manager_; + /// Callback data from entities mapped to a counter of each + std::unordered_map callback_data_map_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp b/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp new file mode 100644 index 0000000000..d1b6e150d1 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp @@ -0,0 +1,81 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EVENTS_EXECUTOR_EVENT_TYPES_HPP_ +#define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_EVENT_TYPES_HPP_ + +namespace rclcpp +{ +namespace executors +{ + +// forward declaration of EventsExecutor to avoid circular dependency +class EventsExecutor; + +enum ExecutorEventType +{ + SUBSCRIPTION_EVENT, + SERVICE_EVENT, + CLIENT_EVENT, + WAITABLE_EVENT +}; + +struct ExecutorEvent +{ + const void * entity_id; + ExecutorEventType type; +}; + +struct EventsExecutorCallbackData +{ + EventsExecutorCallbackData( + EventsExecutor * exec, + void * id, + ExecutorEventType type) + { + executor = exec; + entity_id = id; + event_type = type; + } + + // Equal operator + bool operator==(const EventsExecutorCallbackData & other) const + { + return (executor == other.executor) && + (entity_id == other.entity_id) && + (event_type == other.event_type); + } + + // Struct members + EventsExecutor * executor; + void * entity_id; + ExecutorEventType event_type; +}; + +// To be able to use std::unordered_map with an EventsExecutorCallbackData +// as key, we need a hasher: +struct KeyHasher +{ + size_t operator()(const EventsExecutorCallbackData & k) const + { + return ((std::hash()(k.executor) ^ + (std::hash()(k.entity_id) << 1)) >> 1) ^ + (std::hash()(k.event_type) << 1); + } +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EVENTS_EXECUTOR_EVENT_TYPES_HPP_ diff --git a/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp index b9987d4bd4..5c00a8ed7a 100644 --- a/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp @@ -61,15 +61,14 @@ class EventsExecutorNotifyWaitable final : public EventWaitable RCLCPP_PUBLIC void set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const override + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const override { for (auto gc : notify_guard_conditions_) { rcl_ret_t ret = rcl_guard_condition_set_listener_callback( gc, executor_callback, - executor, - this, + executor_callback_data, false); if (RCL_RET_OK != ret) { diff --git a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp index 625d8c7651..92d6e5e6c8 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp @@ -20,7 +20,7 @@ #include "rclcpp/macros.hpp" #include "rclcpp/visibility_control.hpp" -#include "rmw/listener_event_types.h" +#include "rclcpp/executors/events_executor_event_types.hpp" namespace rclcpp { @@ -31,7 +31,7 @@ namespace buffers /** * @brief This abstract class can be used to implement different types of queues - * where `rmw_listener_event_t` can be stored. + * where `ExecutorEvent` can be stored. * The derived classes should choose which underlying container to use and * the strategy for pushing and popping events. * For example a queue implementation may be bounded or unbounded and have @@ -57,7 +57,7 @@ class EventsQueue RCLCPP_PUBLIC virtual void - push(const rmw_listener_event_t & event) = 0; + push(const rclcpp::executors::ExecutorEvent & event) = 0; /** * @brief removes front element from the queue. @@ -73,7 +73,7 @@ class EventsQueue */ RCLCPP_PUBLIC virtual - rmw_listener_event_t + rclcpp::executors::ExecutorEvent front() const = 0; /** @@ -108,7 +108,7 @@ class EventsQueue */ RCLCPP_PUBLIC virtual - std::queue + std::queue pop_all_events() = 0; }; diff --git a/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp index 278e50d7be..4f560a91cf 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp @@ -46,7 +46,7 @@ class SimpleEventsQueue : public EventsQueue RCLCPP_PUBLIC virtual void - push(const rmw_listener_event_t & event) + push(const rclcpp::executors::ExecutorEvent & event) { event_queue_.push(event); } @@ -68,7 +68,7 @@ class SimpleEventsQueue : public EventsQueue */ RCLCPP_PUBLIC virtual - rmw_listener_event_t + rclcpp::executors::ExecutorEvent front() const { return event_queue_.front(); @@ -107,7 +107,7 @@ class SimpleEventsQueue : public EventsQueue init() { // Make sure the queue is empty when we start - std::queue local_queue; + std::queue local_queue; std::swap(event_queue_, local_queue); } @@ -118,16 +118,16 @@ class SimpleEventsQueue : public EventsQueue */ RCLCPP_PUBLIC virtual - std::queue + std::queue pop_all_events() { - std::queue local_queue; + std::queue local_queue; std::swap(event_queue_, local_queue); return local_queue; } private: - std::queue event_queue_; + std::queue event_queue_; }; } // namespace buffers diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 645d6b65cc..9227e21623 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -77,8 +77,8 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable RCLCPP_PUBLIC void set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const override; + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const override; protected: std::recursive_mutex reentrant_mutex_; diff --git a/rclcpp/include/rclcpp/qos_event.hpp b/rclcpp/include/rclcpp/qos_event.hpp index 9a3716528e..4d886a99ff 100644 --- a/rclcpp/include/rclcpp/qos_event.hpp +++ b/rclcpp/include/rclcpp/qos_event.hpp @@ -111,8 +111,8 @@ class QOSEventHandlerBase : public Waitable RCLCPP_PUBLIC void set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const override; + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const override; protected: rcl_event_t event_handle_; diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp index 00d4227e62..93a7669dbd 100644 --- a/rclcpp/include/rclcpp/service.hpp +++ b/rclcpp/include/rclcpp/service.hpp @@ -129,8 +129,8 @@ class ServiceBase RCLCPP_PUBLIC void set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const; + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const; protected: RCLCPP_DISABLE_COPY(ServiceBase) diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index fa7789ceb6..9ae9c30a1b 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -272,8 +272,8 @@ class SubscriptionBase : public std::enable_shared_from_this RCLCPP_PUBLIC void set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const; + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const; protected: template diff --git a/rclcpp/include/rclcpp/waitable.hpp b/rclcpp/include/rclcpp/waitable.hpp index 449fcb5b46..4c89af80ac 100644 --- a/rclcpp/include/rclcpp/waitable.hpp +++ b/rclcpp/include/rclcpp/waitable.hpp @@ -212,8 +212,8 @@ class Waitable virtual void set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const; + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const; private: std::atomic in_use_by_wait_set_{false}; diff --git a/rclcpp/src/rclcpp/client.cpp b/rclcpp/src/rclcpp/client.cpp index dd9e7d4e7e..fc80044525 100644 --- a/rclcpp/src/rclcpp/client.cpp +++ b/rclcpp/src/rclcpp/client.cpp @@ -201,14 +201,13 @@ ClientBase::exchange_in_use_by_wait_set_state(bool in_use_state) void ClientBase::set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const { rcl_ret_t ret = rcl_client_set_listener_callback( client_handle_.get(), executor_callback, - executor, - this); + executor_callback_data); if (RCL_RET_OK != ret) { throw std::runtime_error("Couldn't set the EventsExecutor's callback to client"); diff --git a/rclcpp/src/rclcpp/executors/events_executor.cpp b/rclcpp/src/rclcpp/executors/events_executor.cpp index 2737f76e94..eb53c2e3c7 100644 --- a/rclcpp/src/rclcpp/executors/events_executor.cpp +++ b/rclcpp/src/rclcpp/executors/events_executor.cpp @@ -49,7 +49,7 @@ EventsExecutor::EventsExecutor( executor_notifier_ = std::make_shared(); executor_notifier_->add_guard_condition(&shutdown_guard_condition_->get_rcl_guard_condition()); executor_notifier_->add_guard_condition(&interrupt_guard_condition_); - executor_notifier_->set_events_executor_callback(this, &EventsExecutor::push_event); + entities_collector_->add_waitable(executor_notifier_); } @@ -71,12 +71,12 @@ EventsExecutor::spin() // We wait here until something has been pushed to the event queue events_queue_cv_.wait(push_lock, has_event_predicate); // Move all events into a local events queue to allow entities to push while we execute them - std::queue execution_events_queue = events_queue_->pop_all_events(); + std::queue execution_events_queue = events_queue_->pop_all_events(); // Unlock the mutex push_lock.unlock(); // Consume all available events while (!execution_events_queue.empty()) { - rmw_listener_event_t event = execution_events_queue.front(); + ExecutorEvent event = execution_events_queue.front(); execution_events_queue.pop(); this->execute_event(event); } @@ -139,7 +139,7 @@ EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhau bool has_event = !events_queue_->empty(); if (has_event) { - rmw_listener_event_t event = events_queue_->front(); + ExecutorEvent event = events_queue_->front(); events_queue_->pop(); this->execute_event(event); executed_events++; @@ -183,7 +183,7 @@ EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout) // When condition variable is notified, check this predicate to proceed auto has_event_predicate = [this]() {return !events_queue_->empty();}; - rmw_listener_event_t event; + ExecutorEvent event; bool has_event = false; { @@ -246,12 +246,12 @@ EventsExecutor::remove_node(std::shared_ptr node_ptr, bool notify) } void -EventsExecutor::execute_event(const rmw_listener_event_t & event) +EventsExecutor::execute_event(const ExecutorEvent & event) { switch (event.type) { case SUBSCRIPTION_EVENT: { - auto subscription = entities_collector_->get_subscription(event.entity); + auto subscription = entities_collector_->get_subscription(event.entity_id); if (subscription) { execute_subscription(subscription); @@ -261,7 +261,7 @@ EventsExecutor::execute_event(const rmw_listener_event_t & event) case SERVICE_EVENT: { - auto service = entities_collector_->get_service(event.entity); + auto service = entities_collector_->get_service(event.entity_id); if (service) { execute_service(service); @@ -271,7 +271,7 @@ EventsExecutor::execute_event(const rmw_listener_event_t & event) case CLIENT_EVENT: { - auto client = entities_collector_->get_client(event.entity); + auto client = entities_collector_->get_client(event.entity_id); if (client) { execute_client(client); @@ -281,7 +281,7 @@ EventsExecutor::execute_event(const rmw_listener_event_t & event) case WAITABLE_EVENT: { - auto waitable = entities_collector_->get_waitable(event.entity); + auto waitable = entities_collector_->get_waitable(event.entity_id); if (waitable) { auto data = waitable->take_data(); diff --git a/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp index c2b08a9de6..c6bf4df55d 100644 --- a/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp +++ b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp @@ -20,6 +20,7 @@ #include "rclcpp/executors/events_executor.hpp" #include "rclcpp/executors/events_executor_entities_collector.hpp" +using rclcpp::executors::EventsExecutorCallbackData; using rclcpp::executors::EventsExecutorEntitiesCollector; EventsExecutorEntitiesCollector::EventsExecutorEntitiesCollector( @@ -75,6 +76,7 @@ EventsExecutorEntitiesCollector::~EventsExecutorEntitiesCollector() weak_nodes_.clear(); weak_clients_map_.clear(); weak_services_map_.clear(); + callback_data_map_.clear(); weak_waitables_map_.clear(); weak_subscriptions_map_.clear(); weak_nodes_to_guard_conditions_.clear(); @@ -86,7 +88,7 @@ void EventsExecutorEntitiesCollector::init() { // Add the EventsExecutorEntitiesCollector shared_ptr to waitables map - add_waitable(this->shared_from_this()); + weak_waitables_map_.emplace(this, this->shared_from_this()); } void @@ -234,40 +236,44 @@ EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks( group->find_subscription_ptrs_if( [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { if (subscription) { - subscription->set_events_executor_callback( - associated_executor_, - &EventsExecutor::push_event); weak_subscriptions_map_.emplace(subscription.get(), subscription); + + subscription->set_events_executor_callback( + &EventsExecutor::push_event, + get_callback_data(subscription.get(), SUBSCRIPTION_EVENT)); } return false; }); group->find_service_ptrs_if( [this](const rclcpp::ServiceBase::SharedPtr & service) { if (service) { - service->set_events_executor_callback( - associated_executor_, - &EventsExecutor::push_event); weak_services_map_.emplace(service.get(), service); + + service->set_events_executor_callback( + &EventsExecutor::push_event, + get_callback_data(service.get(), SERVICE_EVENT)); } return false; }); group->find_client_ptrs_if( [this](const rclcpp::ClientBase::SharedPtr & client) { if (client) { - client->set_events_executor_callback( - associated_executor_, - &EventsExecutor::push_event); weak_clients_map_.emplace(client.get(), client); + + client->set_events_executor_callback( + &EventsExecutor::push_event, + get_callback_data(client.get(), CLIENT_EVENT)); } return false; }); group->find_waitable_ptrs_if( [this](const rclcpp::Waitable::SharedPtr & waitable) { if (waitable) { - waitable->set_events_executor_callback( - associated_executor_, - &EventsExecutor::push_event); weak_waitables_map_.emplace(waitable.get(), waitable); + + waitable->set_events_executor_callback( + &EventsExecutor::push_event, + get_callback_data(waitable.get(), WAITABLE_EVENT)); } return false; }); @@ -292,6 +298,7 @@ EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks( if (subscription) { subscription->set_events_executor_callback(nullptr, nullptr); weak_subscriptions_map_.erase(subscription.get()); + remove_callback_data(subscription.get(), SUBSCRIPTION_EVENT); } return false; }); @@ -300,6 +307,7 @@ EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks( if (service) { service->set_events_executor_callback(nullptr, nullptr); weak_services_map_.erase(service.get()); + remove_callback_data(service.get(), SERVICE_EVENT); } return false; }); @@ -308,6 +316,7 @@ EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks( if (client) { client->set_events_executor_callback(nullptr, nullptr); weak_clients_map_.erase(client.get()); + remove_callback_data(client.get(), CLIENT_EVENT); } return false; }); @@ -316,6 +325,7 @@ EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks( if (waitable) { waitable->set_events_executor_callback(nullptr, nullptr); weak_waitables_map_.erase(waitable.get()); + remove_callback_data(waitable.get(), WAITABLE_EVENT); } return false; }); @@ -480,8 +490,7 @@ EventsExecutorEntitiesCollector::set_guard_condition_callback( rcl_ret_t ret = rcl_guard_condition_set_listener_callback( guard_condition, &EventsExecutor::push_event, - associated_executor_, - this, + get_callback_data(this, WAITABLE_EVENT), false /* Discard previous events */); if (ret != RCL_RET_OK) { @@ -497,12 +506,13 @@ EventsExecutorEntitiesCollector::unset_guard_condition_callback( guard_condition, nullptr, nullptr, - nullptr, false /* Discard previous events */); if (ret != RCL_RET_OK) { throw std::runtime_error("Couldn't unset guard condition event callback"); } + + remove_callback_data(this, WAITABLE_EVENT); } rclcpp::SubscriptionBase::SharedPtr @@ -585,4 +595,55 @@ void EventsExecutorEntitiesCollector::add_waitable(rclcpp::Waitable::SharedPtr waitable) { weak_waitables_map_.emplace(waitable.get(), waitable); + + waitable->set_events_executor_callback( + &EventsExecutor::push_event, + get_callback_data(waitable.get(), WAITABLE_EVENT)); +} + +const EventsExecutorCallbackData * +EventsExecutorEntitiesCollector::get_callback_data( + void * entity_id, ExecutorEventType event_type) +{ + // Create an entity callback data object and check if + // we already have stored one like it + EventsExecutorCallbackData data(associated_executor_, entity_id, event_type); + + auto it = callback_data_map_.find(data); + + if (it != callback_data_map_.end()) { + // We found a callback data matching entity ID and type. + // Increment callback data counter and return pointer to data + it->second++; + return &it->first; + } + + // There was no callback data object matching ID and type, + // create one and set counter to 1. + callback_data_map_.emplace(data, 1); + + // Return a pointer to the just added entity callback data. + it = callback_data_map_.find(data); + return &it->first; +} + +void +EventsExecutorEntitiesCollector::remove_callback_data( + void * entity_id, ExecutorEventType event_type) +{ + // Create an entity callback data object and check if + // we already have stored one like it + EventsExecutorCallbackData data(associated_executor_, entity_id, event_type); + + auto it = callback_data_map_.find(data); + + if (it != callback_data_map_.end()) { + // We found a callback data matching entity ID and type. + // If we have more than 1 decrement counter, otherwise remove it. + if (it->second > 1) { + it->second--; + } else { + callback_data_map_.erase(it); + } + } } diff --git a/rclcpp/src/rclcpp/qos_event.cpp b/rclcpp/src/rclcpp/qos_event.cpp index d9e27dc32f..055cab306e 100644 --- a/rclcpp/src/rclcpp/qos_event.cpp +++ b/rclcpp/src/rclcpp/qos_event.cpp @@ -70,14 +70,13 @@ QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set) void QOSEventHandlerBase::set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const { rcl_ret_t ret = rcl_event_set_listener_callback( &event_handle_, executor_callback, - executor, - this, + executor_callback_data, false /* Discard previous events */); if (RCL_RET_OK != ret) { diff --git a/rclcpp/src/rclcpp/service.cpp b/rclcpp/src/rclcpp/service.cpp index e0afdbcad9..0bf08e0dbc 100644 --- a/rclcpp/src/rclcpp/service.cpp +++ b/rclcpp/src/rclcpp/service.cpp @@ -87,14 +87,13 @@ ServiceBase::exchange_in_use_by_wait_set_state(bool in_use_state) void ServiceBase::set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const { rcl_ret_t ret = rcl_service_set_listener_callback( service_handle_.get(), executor_callback, - executor, - this); + executor_callback_data); if (RCL_RET_OK != ret) { throw std::runtime_error("Couldn't set the EventsExecutor's callback to service"); diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 5f37848726..ef1ad4e10b 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -291,14 +291,13 @@ SubscriptionBase::exchange_in_use_by_wait_set_state( void SubscriptionBase::set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const { rcl_ret_t ret = rcl_subscription_set_listener_callback( subscription_handle_.get(), executor_callback, - executor, - this); + executor_callback_data); if (RCL_RET_OK != ret) { throw std::runtime_error("Couldn't set the EventsExecutor's callback to subscription"); diff --git a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp index 1c906e95cd..83c51d3bbb 100644 --- a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp @@ -39,14 +39,13 @@ SubscriptionIntraProcessBase::get_actual_qos() const void SubscriptionIntraProcessBase::set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const { rcl_ret_t ret = rcl_guard_condition_set_listener_callback( &gc_, executor_callback, - executor, - this, + executor_callback_data, true /*Use previous events*/); if (RCL_RET_OK != ret) { diff --git a/rclcpp/src/rclcpp/waitable.cpp b/rclcpp/src/rclcpp/waitable.cpp index 4c5bd672ce..61c2cf3f18 100644 --- a/rclcpp/src/rclcpp/waitable.cpp +++ b/rclcpp/src/rclcpp/waitable.cpp @@ -62,11 +62,11 @@ Waitable::exchange_in_use_by_wait_set_state(bool in_use_state) void Waitable::set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const { - (void)executor; (void)executor_callback; + (void)executor_callback_data; throw std::runtime_error( "Custom waitables should override set_events_executor_callback() to use events executor"); diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 13d41a7fca..dc04ec5e43 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -574,7 +574,7 @@ if(TARGET test_events_queue) ament_target_dependencies(test_events_queue "rcl" "test_msgs") - target_link_libraries(test_events_queue ${PROJECT_NAME} mimick) + target_link_libraries(test_events_queue ${PROJECT_NAME}) endif() ament_add_gtest(test_events_executor_entities_collector executors/test_events_executor_entities_collector.cpp diff --git a/rclcpp/test/rclcpp/executors/test_events_queue.cpp b/rclcpp/test/rclcpp/executors/test_events_queue.cpp index 9f540687d0..9d600e61f7 100644 --- a/rclcpp/test/rclcpp/executors/test_events_queue.cpp +++ b/rclcpp/test/rclcpp/executors/test_events_queue.cpp @@ -25,7 +25,7 @@ TEST(TestEventsQueue, SimpleQueueTest) { // Create a SimpleEventsQueue and a local queue auto simple_queue = std::make_unique(); - std::queue local_events_queue; + std::queue local_events_queue; // Make sure the queue is empty after init simple_queue->init(); @@ -33,7 +33,7 @@ TEST(TestEventsQueue, SimpleQueueTest) // Push 11 messages for (int i = 0; i < 11; i++) { - rmw_listener_event_t stub_event; + rclcpp::executors::ExecutorEvent stub_event; simple_queue->push(stub_event); } @@ -52,13 +52,14 @@ TEST(TestEventsQueue, SimpleQueueTest) EXPECT_TRUE(simple_queue->empty()); // Lets push an event into the queue and get it back - rmw_listener_event_t push_event = {simple_queue.get(), SUBSCRIPTION_EVENT}; + rclcpp::executors::ExecutorEvent push_event = {simple_queue.get(), + rclcpp::executors::ExecutorEventType::SUBSCRIPTION_EVENT}; simple_queue->push(push_event); - rmw_listener_event_t front_event = simple_queue->front(); + rclcpp::executors::ExecutorEvent front_event = simple_queue->front(); // The events should be equal - EXPECT_EQ(push_event.entity, front_event.entity); + EXPECT_EQ(push_event.entity_id, front_event.entity_id); EXPECT_EQ(push_event.type, front_event.type); } diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 76e6e66b3c..c00c8b441b 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -469,14 +469,13 @@ class TestWaitable : public rclcpp::Waitable void set_events_executor_callback( - rclcpp::executors::EventsExecutor * executor, - rmw_listener_callback_t executor_callback) const override + rmw_listener_callback_t executor_callback, + const void * executor_callback_data) const override { rcl_ret_t ret = rcl_guard_condition_set_listener_callback( &gc_, executor_callback, - executor, - this, + executor_callback_data, true /*Use previous events*/); if (RCL_RET_OK != ret) { From 36e66ed0a06592c7eb68a8ee8e72800dfb2fce8b Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 26 Feb 2021 18:20:12 -0300 Subject: [PATCH 2/7] Use entity_id to compute hash. Add comments --- .../events_executor_entities_collector.hpp | 11 +++++++- .../executors/events_executor_event_types.hpp | 25 +++++++++++-------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp index 75716d236b..2781026492 100644 --- a/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp @@ -269,7 +269,16 @@ class EventsExecutorEntitiesCollector final EventsExecutor * associated_executor_ = nullptr; /// Instance of the timers manager used by the associated executor TimersManager::SharedPtr timers_manager_; - /// Callback data from entities mapped to a counter of each + + /// Callback data objects mapped to the number of listeners sharing the same object. + /// When no more listeners use the object, it can be removed from the map. + /// For example, the entities collector holds every node's guard condition, which + /// share the same EventsExecutorCallbackData object ptr to use as their callback arg: + /// cb_data_object = {executor_ptr, entities_collector_ptr, WAITABLE_EVENT}; + /// Node1->gc(&cb_data_object) + /// Node2->gc(&cb_data_object) + /// So the maps has: (cb_data_object, 2) + /// When both nodes are removed (counter = 0), the cb_data_object can be destroyed. std::unordered_map callback_data_map_; }; diff --git a/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp b/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp index d1b6e150d1..e03adab2f5 100644 --- a/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp @@ -37,16 +37,20 @@ struct ExecutorEvent ExecutorEventType type; }; +// The EventsExecutorCallbackData struct is what the listeners +// will use as argument when calling their callbacks from the +// RMW implementation. The listeners get a (void *) of this struct, +// and the executor is in charge to cast it back and use the data. struct EventsExecutorCallbackData { EventsExecutorCallbackData( - EventsExecutor * exec, - void * id, - ExecutorEventType type) + EventsExecutor * _executor, + void * _entity_id, + ExecutorEventType _event_type) { - executor = exec; - entity_id = id; - event_type = type; + executor = _executor; + entity_id = _entity_id; + event_type = _event_type; } // Equal operator @@ -64,14 +68,13 @@ struct EventsExecutorCallbackData }; // To be able to use std::unordered_map with an EventsExecutorCallbackData -// as key, we need a hasher: +// as key, we need a hasher. We use the entity ID as hash, since it is +// unique for each EventsExecutorCallbackData object. struct KeyHasher { - size_t operator()(const EventsExecutorCallbackData & k) const + size_t operator()(const EventsExecutorCallbackData & key) const { - return ((std::hash()(k.executor) ^ - (std::hash()(k.entity_id) << 1)) >> 1) ^ - (std::hash()(k.event_type) << 1); + return std::hash()(key.entity_id); } }; From 8e137140f477176375131a6ad3125fa1134bb871 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 26 Feb 2021 18:23:57 -0300 Subject: [PATCH 3/7] Use RMW renamed file --- rclcpp/include/rclcpp/executors/events_executor.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/executors/events_executor.hpp b/rclcpp/include/rclcpp/executors/events_executor.hpp index a5ceffd81a..35b7271399 100644 --- a/rclcpp/include/rclcpp/executors/events_executor.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor.hpp @@ -29,7 +29,7 @@ #include "rclcpp/experimental/buffers/simple_events_queue.hpp" #include "rclcpp/node.hpp" -#include "rmw/listener_event_types.h" +#include "rmw/listener_callback_type.h" namespace rclcpp { From 2b9b3ad05d22258d65189bc05df7fa2f07281e05 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 26 Feb 2021 18:29:07 -0300 Subject: [PATCH 4/7] fix hash type --- rclcpp/include/rclcpp/executors/events_executor_event_types.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp b/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp index e03adab2f5..2557af107f 100644 --- a/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp @@ -74,7 +74,7 @@ struct KeyHasher { size_t operator()(const EventsExecutorCallbackData & key) const { - return std::hash()(key.entity_id); + return std::hash()(key.entity_id); } }; From 930d9a7cf17b4f65e37387d85f0b7f310ec9eb1d Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Wed, 10 Mar 2021 17:51:41 -0300 Subject: [PATCH 5/7] Update EventsExecutorCallbackData --- .../include/rclcpp/executors/events_executor.hpp | 2 +- .../executors/events_executor_event_types.hpp | 15 +++++---------- .../events_executor_entities_collector.cpp | 6 ++++-- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/rclcpp/include/rclcpp/executors/events_executor.hpp b/rclcpp/include/rclcpp/executors/events_executor.hpp index 35b7271399..375206fca0 100644 --- a/rclcpp/include/rclcpp/executors/events_executor.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor.hpp @@ -228,7 +228,7 @@ class EventsExecutor : public rclcpp::Executor // Event queue mutex scope { std::unique_lock lock(this_executor->push_mutex_); - this_executor->events_queue_->push({data->entity_id, data->event_type}); + this_executor->events_queue_->push(data->event); } // Notify that the event queue has some events in it. this_executor->events_queue_cv_.notify_one(); diff --git a/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp b/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp index 2557af107f..045cc51d57 100644 --- a/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp @@ -45,26 +45,21 @@ struct EventsExecutorCallbackData { EventsExecutorCallbackData( EventsExecutor * _executor, - void * _entity_id, - ExecutorEventType _event_type) + ExecutorEvent _event) { executor = _executor; - entity_id = _entity_id; - event_type = _event_type; + event = _event; } // Equal operator bool operator==(const EventsExecutorCallbackData & other) const { - return (executor == other.executor) && - (entity_id == other.entity_id) && - (event_type == other.event_type); + return (event.entity_id == other.event.entity_id); } // Struct members EventsExecutor * executor; - void * entity_id; - ExecutorEventType event_type; + ExecutorEvent event; }; // To be able to use std::unordered_map with an EventsExecutorCallbackData @@ -74,7 +69,7 @@ struct KeyHasher { size_t operator()(const EventsExecutorCallbackData & key) const { - return std::hash()(key.entity_id); + return std::hash()(key.event.entity_id); } }; diff --git a/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp index c6bf4df55d..4198547366 100644 --- a/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp +++ b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp @@ -607,7 +607,8 @@ EventsExecutorEntitiesCollector::get_callback_data( { // Create an entity callback data object and check if // we already have stored one like it - EventsExecutorCallbackData data(associated_executor_, entity_id, event_type); + ExecutorEvent event = {entity_id, event_type}; + EventsExecutorCallbackData data(associated_executor_, event); auto it = callback_data_map_.find(data); @@ -633,7 +634,8 @@ EventsExecutorEntitiesCollector::remove_callback_data( { // Create an entity callback data object and check if // we already have stored one like it - EventsExecutorCallbackData data(associated_executor_, entity_id, event_type); + ExecutorEvent event = {entity_id, event_type}; + EventsExecutorCallbackData data(associated_executor_, event); auto it = callback_data_map_.find(data); From 52afe8168849710279b8b0c198d07a50d7bd1101 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Thu, 11 Mar 2021 11:51:59 -0300 Subject: [PATCH 6/7] Use RCLCPP_SMART_PTR_ALIASES_ONLY --- rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp index 92d6e5e6c8..f0958d36bd 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp @@ -42,7 +42,7 @@ namespace buffers class EventsQueue { public: - RCLCPP_SMART_PTR_DEFINITIONS(EventsQueue) + RCLCPP_SMART_PTR_ALIASES_ONLY(EventsQueue) /** * @brief Destruct the object. From df37d31e6b48d37dda2b45bac10d49405209150a Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Thu, 11 Mar 2021 12:16:44 -0300 Subject: [PATCH 7/7] Rename set_events_executor_callback->set_listener_callback --- rclcpp/include/rclcpp/client.hpp | 6 +++--- .../events_executor_notify_waitable.hpp | 10 +++++----- .../subscription_intra_process_base.hpp | 6 +++--- rclcpp/include/rclcpp/qos_event.hpp | 7 +++---- rclcpp/include/rclcpp/service.hpp | 6 +++--- rclcpp/include/rclcpp/subscription_base.hpp | 6 +++--- rclcpp/include/rclcpp/waitable.hpp | 6 +++--- rclcpp/src/rclcpp/client.cpp | 12 ++++++------ .../events_executor_entities_collector.cpp | 18 +++++++++--------- rclcpp/src/rclcpp/qos_event.cpp | 12 ++++++------ rclcpp/src/rclcpp/service.cpp | 12 ++++++------ rclcpp/src/rclcpp/subscription_base.cpp | 12 ++++++------ .../rclcpp/subscription_intra_process_base.cpp | 12 ++++++------ rclcpp/src/rclcpp/waitable.cpp | 12 ++++++------ .../test/rclcpp/executors/test_executors.cpp | 10 +++++----- 15 files changed, 73 insertions(+), 74 deletions(-) diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index 1a66d128e6..b8a2e484ae 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -158,9 +158,9 @@ class ClientBase RCLCPP_PUBLIC void - set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const; + set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const; protected: RCLCPP_DISABLE_COPY(ClientBase) diff --git a/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp index 5c00a8ed7a..1a9a72ebff 100644 --- a/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp @@ -60,15 +60,15 @@ class EventsExecutorNotifyWaitable final : public EventWaitable RCLCPP_PUBLIC void - set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const override + set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const override { for (auto gc : notify_guard_conditions_) { rcl_ret_t ret = rcl_guard_condition_set_listener_callback( gc, - executor_callback, - executor_callback_data, + callback, + user_data, false); if (RCL_RET_OK != ret) { diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 9227e21623..f40f997762 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -76,9 +76,9 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable RCLCPP_PUBLIC void - set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const override; + set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const override; protected: std::recursive_mutex reentrant_mutex_; diff --git a/rclcpp/include/rclcpp/qos_event.hpp b/rclcpp/include/rclcpp/qos_event.hpp index 4d886a99ff..afca761961 100644 --- a/rclcpp/include/rclcpp/qos_event.hpp +++ b/rclcpp/include/rclcpp/qos_event.hpp @@ -107,12 +107,11 @@ class QOSEventHandlerBase : public Waitable bool is_ready(rcl_wait_set_t * wait_set) override; - /// Set EventsExecutor's callback RCLCPP_PUBLIC void - set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const override; + set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const override; protected: rcl_event_t event_handle_; diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp index 93a7669dbd..bf0471d4b5 100644 --- a/rclcpp/include/rclcpp/service.hpp +++ b/rclcpp/include/rclcpp/service.hpp @@ -128,9 +128,9 @@ class ServiceBase RCLCPP_PUBLIC void - set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const; + set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const; protected: RCLCPP_DISABLE_COPY(ServiceBase) diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 9ae9c30a1b..2b5653f896 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -271,9 +271,9 @@ class SubscriptionBase : public std::enable_shared_from_this RCLCPP_PUBLIC void - set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const; + set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const; protected: template diff --git a/rclcpp/include/rclcpp/waitable.hpp b/rclcpp/include/rclcpp/waitable.hpp index 4c89af80ac..dc194e549b 100644 --- a/rclcpp/include/rclcpp/waitable.hpp +++ b/rclcpp/include/rclcpp/waitable.hpp @@ -211,9 +211,9 @@ class Waitable RCLCPP_PUBLIC virtual void - set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const; + set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const; private: std::atomic in_use_by_wait_set_{false}; diff --git a/rclcpp/src/rclcpp/client.cpp b/rclcpp/src/rclcpp/client.cpp index fc80044525..c74dd59c8d 100644 --- a/rclcpp/src/rclcpp/client.cpp +++ b/rclcpp/src/rclcpp/client.cpp @@ -200,16 +200,16 @@ ClientBase::exchange_in_use_by_wait_set_state(bool in_use_state) } void -ClientBase::set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const +ClientBase::set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const { rcl_ret_t ret = rcl_client_set_listener_callback( client_handle_.get(), - executor_callback, - executor_callback_data); + callback, + user_data); if (RCL_RET_OK != ret) { - throw std::runtime_error("Couldn't set the EventsExecutor's callback to client"); + throw std::runtime_error("Couldn't set listener callback to client"); } } diff --git a/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp index 4198547366..a02c07c641 100644 --- a/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp +++ b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp @@ -238,7 +238,7 @@ EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks( if (subscription) { weak_subscriptions_map_.emplace(subscription.get(), subscription); - subscription->set_events_executor_callback( + subscription->set_listener_callback( &EventsExecutor::push_event, get_callback_data(subscription.get(), SUBSCRIPTION_EVENT)); } @@ -249,7 +249,7 @@ EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks( if (service) { weak_services_map_.emplace(service.get(), service); - service->set_events_executor_callback( + service->set_listener_callback( &EventsExecutor::push_event, get_callback_data(service.get(), SERVICE_EVENT)); } @@ -260,7 +260,7 @@ EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks( if (client) { weak_clients_map_.emplace(client.get(), client); - client->set_events_executor_callback( + client->set_listener_callback( &EventsExecutor::push_event, get_callback_data(client.get(), CLIENT_EVENT)); } @@ -271,7 +271,7 @@ EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks( if (waitable) { weak_waitables_map_.emplace(waitable.get(), waitable); - waitable->set_events_executor_callback( + waitable->set_listener_callback( &EventsExecutor::push_event, get_callback_data(waitable.get(), WAITABLE_EVENT)); } @@ -296,7 +296,7 @@ EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks( group->find_subscription_ptrs_if( [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { if (subscription) { - subscription->set_events_executor_callback(nullptr, nullptr); + subscription->set_listener_callback(nullptr, nullptr); weak_subscriptions_map_.erase(subscription.get()); remove_callback_data(subscription.get(), SUBSCRIPTION_EVENT); } @@ -305,7 +305,7 @@ EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks( group->find_service_ptrs_if( [this](const rclcpp::ServiceBase::SharedPtr & service) { if (service) { - service->set_events_executor_callback(nullptr, nullptr); + service->set_listener_callback(nullptr, nullptr); weak_services_map_.erase(service.get()); remove_callback_data(service.get(), SERVICE_EVENT); } @@ -314,7 +314,7 @@ EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks( group->find_client_ptrs_if( [this](const rclcpp::ClientBase::SharedPtr & client) { if (client) { - client->set_events_executor_callback(nullptr, nullptr); + client->set_listener_callback(nullptr, nullptr); weak_clients_map_.erase(client.get()); remove_callback_data(client.get(), CLIENT_EVENT); } @@ -323,7 +323,7 @@ EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks( group->find_waitable_ptrs_if( [this](const rclcpp::Waitable::SharedPtr & waitable) { if (waitable) { - waitable->set_events_executor_callback(nullptr, nullptr); + waitable->set_listener_callback(nullptr, nullptr); weak_waitables_map_.erase(waitable.get()); remove_callback_data(waitable.get(), WAITABLE_EVENT); } @@ -596,7 +596,7 @@ EventsExecutorEntitiesCollector::add_waitable(rclcpp::Waitable::SharedPtr waitab { weak_waitables_map_.emplace(waitable.get(), waitable); - waitable->set_events_executor_callback( + waitable->set_listener_callback( &EventsExecutor::push_event, get_callback_data(waitable.get(), WAITABLE_EVENT)); } diff --git a/rclcpp/src/rclcpp/qos_event.cpp b/rclcpp/src/rclcpp/qos_event.cpp index 055cab306e..6a23b95c2f 100644 --- a/rclcpp/src/rclcpp/qos_event.cpp +++ b/rclcpp/src/rclcpp/qos_event.cpp @@ -69,18 +69,18 @@ QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set) } void -QOSEventHandlerBase::set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const +QOSEventHandlerBase::set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const { rcl_ret_t ret = rcl_event_set_listener_callback( &event_handle_, - executor_callback, - executor_callback_data, + callback, + user_data, false /* Discard previous events */); if (RCL_RET_OK != ret) { - throw std::runtime_error("Couldn't set EventsExecutor's callback in QOSEventHandlerBase"); + throw std::runtime_error("Couldn't set listener callback to QOSEventHandlerBase"); } } diff --git a/rclcpp/src/rclcpp/service.cpp b/rclcpp/src/rclcpp/service.cpp index 0bf08e0dbc..e279668bdf 100644 --- a/rclcpp/src/rclcpp/service.cpp +++ b/rclcpp/src/rclcpp/service.cpp @@ -86,16 +86,16 @@ ServiceBase::exchange_in_use_by_wait_set_state(bool in_use_state) } void -ServiceBase::set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const +ServiceBase::set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const { rcl_ret_t ret = rcl_service_set_listener_callback( service_handle_.get(), - executor_callback, - executor_callback_data); + callback, + user_data); if (RCL_RET_OK != ret) { - throw std::runtime_error("Couldn't set the EventsExecutor's callback to service"); + throw std::runtime_error("Couldn't set listener callback to service"); } } diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index ef1ad4e10b..065de5cbbd 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -290,16 +290,16 @@ SubscriptionBase::exchange_in_use_by_wait_set_state( } void -SubscriptionBase::set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const +SubscriptionBase::set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const { rcl_ret_t ret = rcl_subscription_set_listener_callback( subscription_handle_.get(), - executor_callback, - executor_callback_data); + callback, + user_data); if (RCL_RET_OK != ret) { - throw std::runtime_error("Couldn't set the EventsExecutor's callback to subscription"); + throw std::runtime_error("Couldn't set listener callback to subscription"); } } diff --git a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp index 83c51d3bbb..65e3cbbe83 100644 --- a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp @@ -38,17 +38,17 @@ SubscriptionIntraProcessBase::get_actual_qos() const } void -SubscriptionIntraProcessBase::set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const +SubscriptionIntraProcessBase::set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const { rcl_ret_t ret = rcl_guard_condition_set_listener_callback( &gc_, - executor_callback, - executor_callback_data, + callback, + user_data, true /*Use previous events*/); if (RCL_RET_OK != ret) { - throw std::runtime_error("Couldn't set guard condition callback"); + throw std::runtime_error("Couldn't set guard condition listener callback"); } } diff --git a/rclcpp/src/rclcpp/waitable.cpp b/rclcpp/src/rclcpp/waitable.cpp index 61c2cf3f18..213fc9353a 100644 --- a/rclcpp/src/rclcpp/waitable.cpp +++ b/rclcpp/src/rclcpp/waitable.cpp @@ -61,13 +61,13 @@ Waitable::exchange_in_use_by_wait_set_state(bool in_use_state) } void -Waitable::set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const +Waitable::set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const { - (void)executor_callback; - (void)executor_callback_data; + (void)callback; + (void)user_data; throw std::runtime_error( - "Custom waitables should override set_events_executor_callback() to use events executor"); + "Custom waitables should override set_listener_callback() to use events executor"); } diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index c00c8b441b..1a3602d3a9 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -468,14 +468,14 @@ class TestWaitable : public rclcpp::Waitable } void - set_events_executor_callback( - rmw_listener_callback_t executor_callback, - const void * executor_callback_data) const override + set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const override { rcl_ret_t ret = rcl_guard_condition_set_listener_callback( &gc_, - executor_callback, - executor_callback_data, + callback, + user_data, true /*Use previous events*/); if (RCL_RET_OK != ret) {