diff --git a/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp index a3b90cf97b..da8513a6a1 100644 --- a/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp @@ -59,7 +59,7 @@ class EventsExecutorNotifyWaitable final : public EventWaitable void set_events_executor_callback( const rclcpp::executors::EventsExecutor * executor, - rmw_listener_cb_t executor_callback) const override + rmw_listener_cb_t executor_callback) override { for (auto gc : notify_guard_conditions_) { rcl_ret_t ret = rcl_guard_condition_set_listener_callback( diff --git a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp index 80fa244aaf..674051d66b 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp @@ -101,7 +101,14 @@ class RingBufferImplementation : public BufferImplementationBase return size_ == capacity_; } - void clear() {} + void clear() + { + std::lock_guard lock(mutex_); + ring_buffer_ = std::vector(capacity_); + write_index_ = capacity_ - 1; + read_index_ = 0; + size_ = 0; + } private: size_t capacity_; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 618db3cac1..05badefa9f 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -118,14 +118,25 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase provide_intra_process_message(ConstMessageSharedPtr message) { buffer_->add_shared(std::move(message)); - trigger_guard_condition(); + std::lock_guard lock(executor_callback_mutex_); + if (executor_callback_) { + executor_callback_(executor_, {this, WAITABLE_EVENT}); + } else { + trigger_guard_condition(); + } } void provide_intra_process_message(MessageUniquePtr message) { buffer_->add_unique(std::move(message)); - trigger_guard_condition(); + + std::lock_guard lock(executor_callback_mutex_); + if (executor_callback_) { + executor_callback_(executor_, {this, WAITABLE_EVENT}); + } else { + trigger_guard_condition(); + } } bool @@ -134,6 +145,18 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase return buffer_->use_take_shared_method(); } + void + set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_cb_t executor_callback) override + { + std::lock_guard lock(executor_callback_mutex_); + executor_ = executor; + executor_callback_ = executor_callback; + // Buffer must be cleared under the executor callback lock to make sure that other threads wait for this + buffer_->clear(); + } + private: void trigger_guard_condition() diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 8684c9f632..d7086e92b2 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -74,12 +74,16 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable void set_events_executor_callback( const rclcpp::executors::EventsExecutor * executor, - rmw_listener_cb_t executor_callback) const override; + rmw_listener_cb_t executor_callback) override; protected: std::recursive_mutex reentrant_mutex_; rcl_guard_condition_t gc_; + const rclcpp::executors::EventsExecutor * executor_; + rmw_listener_cb_t executor_callback_ = nullptr; + std::mutex executor_callback_mutex_; + private: virtual void trigger_guard_condition() = 0; diff --git a/rclcpp/include/rclcpp/qos_event.hpp b/rclcpp/include/rclcpp/qos_event.hpp index 4e662ca9c3..d4de702ca3 100644 --- a/rclcpp/include/rclcpp/qos_event.hpp +++ b/rclcpp/include/rclcpp/qos_event.hpp @@ -107,7 +107,7 @@ class QOSEventHandlerBase : public Waitable void set_events_executor_callback( const rclcpp::executors::EventsExecutor * executor, - rmw_listener_cb_t executor_callback) const override; + rmw_listener_cb_t executor_callback) override; protected: rcl_event_t event_handle_; diff --git a/rclcpp/include/rclcpp/waitable.hpp b/rclcpp/include/rclcpp/waitable.hpp index c8aa1bf2c6..19092e5d08 100644 --- a/rclcpp/include/rclcpp/waitable.hpp +++ b/rclcpp/include/rclcpp/waitable.hpp @@ -175,7 +175,7 @@ class Waitable void set_events_executor_callback( const rclcpp::executors::EventsExecutor * executor, - rmw_listener_cb_t executor_callback) const; + rmw_listener_cb_t executor_callback); private: std::atomic in_use_by_wait_set_{false}; diff --git a/rclcpp/src/rclcpp/qos_event.cpp b/rclcpp/src/rclcpp/qos_event.cpp index 1fbfa6559c..d76c724116 100644 --- a/rclcpp/src/rclcpp/qos_event.cpp +++ b/rclcpp/src/rclcpp/qos_event.cpp @@ -71,7 +71,7 @@ QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set) void QOSEventHandlerBase::set_events_executor_callback( const rclcpp::executors::EventsExecutor * executor, - rmw_listener_cb_t executor_callback) const + rmw_listener_cb_t executor_callback) { rcl_ret_t ret = rcl_event_set_listener_callback( &event_handle_, diff --git a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp index f3a4b72f6d..204df3c920 100644 --- a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp @@ -37,20 +37,12 @@ SubscriptionIntraProcessBase::get_actual_qos() const return qos_profile_; } - void SubscriptionIntraProcessBase::set_events_executor_callback( const rclcpp::executors::EventsExecutor * executor, - rmw_listener_cb_t executor_callback) const + rmw_listener_cb_t executor_callback) { - rcl_ret_t ret = rcl_guard_condition_set_listener_callback( - &gc_, - executor_callback, - executor, - this, - true /*Use previous events*/); - - if (RCL_RET_OK != ret) { - throw std::runtime_error("Couldn't set guard condition callback"); - } -} \ No newline at end of file + (void)executor; + (void)executor_callback; + assert(0); +} diff --git a/rclcpp/src/rclcpp/waitable.cpp b/rclcpp/src/rclcpp/waitable.cpp index 47e56fec05..e6d342a18a 100644 --- a/rclcpp/src/rclcpp/waitable.cpp +++ b/rclcpp/src/rclcpp/waitable.cpp @@ -61,7 +61,7 @@ Waitable::exchange_in_use_by_wait_set_state(bool in_use_state) void Waitable::set_events_executor_callback( const rclcpp::executors::EventsExecutor * executor, - rmw_listener_cb_t executor_callback) const + rmw_listener_cb_t executor_callback) { (void)executor; (void)executor_callback;