diff --git a/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp b/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp index 1e5346116a..83a6c7462c 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp @@ -33,6 +33,7 @@ class BufferImplementationBase virtual void clear() = 0; virtual bool has_data() const = 0; + virtual size_t available_capacity() const = 0; }; } // namespace buffers diff --git a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp index 05092bb23b..af936cb342 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp @@ -44,6 +44,7 @@ class IntraProcessBufferBase virtual bool has_data() const = 0; virtual bool use_take_shared_method() const = 0; + virtual size_t available_capacity() const = 0; }; template< @@ -143,6 +144,11 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer::value; } + size_t available_capacity() const override + { + return buffer_->available_capacity(); + } + private: std::unique_ptr> buffer_; diff --git a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp index 2c06ea6cbe..5c6ba59291 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp @@ -148,6 +148,18 @@ class RingBufferImplementation : public BufferImplementationBase return is_full_(); } + /// Get the remaining capacity to store messages + /** + * This member function is thread-safe. + * + * \return the number of free capacity for new messages + */ + size_t available_capacity() const + { + std::lock_guard lock(mutex_); + return available_capacity_(); + } + void clear() { TRACEPOINT(rclcpp_ring_buffer_clear, static_cast(this)); @@ -189,6 +201,17 @@ class RingBufferImplementation : public BufferImplementationBase return size_ == capacity_; } + /// Get the remaining capacity to store messages + /** + * This member function is not thread-safe. + * + * \return the number of free capacity for new messages + */ + inline size_t available_capacity_() const + { + return capacity_ - size_; + } + size_t capacity_; std::vector ring_buffer_; diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 11f2dda6a4..a152632a53 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -306,6 +306,11 @@ class IntraProcessManager rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr get_subscription_intra_process(uint64_t intra_process_subscription_id); + /// Return the lowest available capacity for all subscription buffers for a publisher id. + RCLCPP_PUBLIC + size_t + lowest_available_capacity(const uint64_t intra_process_publisher_id) const; + private: struct SplittedSubscriptions { diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 6583e74ae7..c590207b90 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -62,6 +62,11 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable void add_to_wait_set(rcl_wait_set_t * wait_set) override; + RCLCPP_PUBLIC + virtual + size_t + available_capacity() const = 0; + bool is_ready(rcl_wait_set_t * wait_set) override = 0; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp index 30debed83a..16c4b6a8db 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp @@ -169,6 +169,11 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff return buffer_->use_take_shared_method(); } + size_t available_capacity() const override + { + return buffer_->available_capacity(); + } + protected: void trigger_guard_condition() override diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index d9181bea43..3253595361 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -215,6 +215,17 @@ class PublisherBase : public std::enable_shared_from_this std::vector get_network_flow_endpoints() const; + /// Return the lowest available capacity for all subscription buffers. + /** + * For intraprocess communication return the lowest buffer capacity for all subscriptions. + * If intraprocess is disabled or no intraprocess subscriptions present, return maximum of size_t. + * On failure return 0. + * \return lowest buffer capacity for all subscriptions + */ + RCLCPP_PUBLIC + size_t + lowest_available_ipm_capacity() const; + /// Wait until all published messages are acknowledged or until the specified timeout elapses. /** * This method waits until all published messages are acknowledged by all matching diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index efce4afeaf..ea068f03ae 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -225,5 +225,52 @@ IntraProcessManager::can_communicate( return true; } +size_t +IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publisher_id) const +{ + size_t capacity = std::numeric_limits::max(); + + auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); + if (publisher_it == pub_to_subs_.end()) { + // Publisher is either invalid or no longer exists. + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling lowest_available_capacity for invalid or no longer existing publisher id"); + return 0u; + } + + if (publisher_it->second.take_shared_subscriptions.empty() && + publisher_it->second.take_ownership_subscriptions.empty()) + { + // no subscriptions available + return 0u; + } + + auto available_capacity = [this, &capacity](const uint64_t intra_process_subscription_id) + { + auto subscription_it = subscriptions_.find(intra_process_subscription_id); + if (subscription_it != subscriptions_.end()) { + auto subscription = subscription_it->second.lock(); + if (subscription) { + capacity = std::min(capacity, subscription->available_capacity()); + } + } else { + // Subscription is either invalid or no longer exists. + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling available_capacity for invalid or no longer existing subscription id"); + } + }; + + for (const auto sub_id : publisher_it->second.take_shared_subscriptions) { + available_capacity(sub_id); + } + + for (const auto sub_id : publisher_it->second.take_ownership_subscriptions) { + available_capacity(sub_id); + } + + return capacity; +} } // namespace experimental } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index 698db2d559..0e37dcf3fa 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -384,3 +384,22 @@ std::vector PublisherBase::get_network_flow_endpoin return network_flow_endpoint_vector; } + +size_t PublisherBase::lowest_available_ipm_capacity() const +{ + if (!intra_process_is_enabled_) { + return 0u; + } + + auto ipm = weak_ipm_.lock(); + + if (!ipm) { + // TODO(ivanpauno): should this raise an error? + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Intra process manager died for a publisher."); + return 0u; + } + + return ipm->lowest_available_capacity(intra_process_publisher_id_); +} diff --git a/rclcpp/test/rclcpp/test_intra_process_buffer.cpp b/rclcpp/test/rclcpp/test_intra_process_buffer.cpp index 16c457c96f..eace6445a9 100644 --- a/rclcpp/test/rclcpp/test_intra_process_buffer.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_buffer.cpp @@ -238,3 +238,75 @@ TEST(TestIntraProcessBuffer, unique_buffer_consume) { EXPECT_EQ(original_value, *popped_unique_msg); EXPECT_EQ(original_message_pointer, popped_message_pointer); } + +/* + Check the available buffer capacity while storing and consuming data from an intra-process + buffer. + The initial available buffer capacity should equal the buffer size. + Inserting a message should decrease the available buffer capacity by 1. + Consuming a message should increase the available buffer capacity by 1. + */ +TEST(TestIntraProcessBuffer, available_capacity) { + using MessageT = char; + using Alloc = std::allocator; + using Deleter = std::default_delete; + using SharedMessageT = std::shared_ptr; + using UniqueMessageT = std::unique_ptr; + using UniqueIntraProcessBufferT = rclcpp::experimental::buffers::TypedIntraProcessBuffer< + MessageT, Alloc, Deleter, UniqueMessageT>; + + constexpr auto history_depth = 5u; + + auto buffer_impl = + std::make_unique>( + history_depth); + + UniqueIntraProcessBufferT intra_process_buffer(std::move(buffer_impl)); + + EXPECT_EQ(history_depth, intra_process_buffer.available_capacity()); + + auto original_unique_msg = std::make_unique('a'); + auto original_message_pointer = reinterpret_cast(original_unique_msg.get()); + auto original_value = *original_unique_msg; + + intra_process_buffer.add_unique(std::move(original_unique_msg)); + + EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity()); + + SharedMessageT popped_shared_msg; + popped_shared_msg = intra_process_buffer.consume_shared(); + auto popped_message_pointer = reinterpret_cast(popped_shared_msg.get()); + + EXPECT_EQ(history_depth, intra_process_buffer.available_capacity()); + EXPECT_EQ(original_value, *popped_shared_msg); + EXPECT_EQ(original_message_pointer, popped_message_pointer); + + original_unique_msg = std::make_unique('b'); + original_message_pointer = reinterpret_cast(original_unique_msg.get()); + original_value = *original_unique_msg; + + intra_process_buffer.add_unique(std::move(original_unique_msg)); + + auto second_unique_msg = std::make_unique('c'); + auto second_message_pointer = reinterpret_cast(second_unique_msg.get()); + auto second_value = *second_unique_msg; + + intra_process_buffer.add_unique(std::move(second_unique_msg)); + + EXPECT_EQ(history_depth - 2u, intra_process_buffer.available_capacity()); + + UniqueMessageT popped_unique_msg; + popped_unique_msg = intra_process_buffer.consume_unique(); + popped_message_pointer = reinterpret_cast(popped_unique_msg.get()); + + EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity()); + EXPECT_EQ(original_value, *popped_unique_msg); + EXPECT_EQ(original_message_pointer, popped_message_pointer); + + popped_unique_msg = intra_process_buffer.consume_unique(); + popped_message_pointer = reinterpret_cast(popped_unique_msg.get()); + + EXPECT_EQ(history_depth, intra_process_buffer.available_capacity()); + EXPECT_EQ(second_value, *popped_unique_msg); + EXPECT_EQ(second_message_pointer, popped_message_pointer); +} diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index 45d916b004..83cf586e99 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -156,18 +156,26 @@ class IntraProcessBuffer { message_ptr = reinterpret_cast(msg.get()); shared_msg = msg; + ++num_msgs; } void add(MessageUniquePtr msg) { message_ptr = reinterpret_cast(msg.get()); unique_msg = std::move(msg); + ++num_msgs; } void pop(std::uintptr_t & msg_ptr) { msg_ptr = message_ptr; message_ptr = 0; + --num_msgs; + } + + size_t size() const + { + return num_msgs; } // need to store the messages somewhere otherwise the memory address will be reused @@ -175,6 +183,8 @@ class IntraProcessBuffer MessageUniquePtr unique_msg; std::uintptr_t message_ptr; + // count add and pop + size_t num_msgs = 0u; }; } // namespace mock @@ -221,6 +231,10 @@ class SubscriptionIntraProcessBase return topic_name.c_str(); } + virtual + size_t + available_capacity() const = 0; + rclcpp::QoS qos_profile; std::string topic_name; }; @@ -280,6 +294,12 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase return take_shared_method; } + size_t + available_capacity() const override + { + return qos_profile.depth() - buffer->size(); + } + bool take_shared_method; typename rclcpp::experimental::buffers::mock::IntraProcessBuffer::UniquePtr buffer; @@ -712,3 +732,91 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { EXPECT_EQ(original_message_pointer, received_message_pointer_10); EXPECT_NE(original_message_pointer, received_message_pointer_11); } + +/* + This tests the method "lowest_available_capacity": + - Creates 1 publisher. + - The available buffer capacity should be at least history size. + - Add 2 subscribers. + - Add everything to the intra-process manager. + - All the entities are expected to have different ids. + - Check the subscriptions count for the publisher. + - The available buffer capacity should be the history size. + - Publish one message (without receiving it). + - The available buffer capacity should decrease by 1. + - Publish another message (without receiving it). + - The available buffer capacity should decrease by 1. + - One subscriber receives one message. + - The available buffer capacity should stay the same, + as the other subscriber still has not freed its buffer. + - The other subscriber receives one message. + - The available buffer capacity should increase by 1. + - One subscription goes out of scope. + - The available buffer capacity should not change. + */ +TEST(TestIntraProcessManager, lowest_available_capacity) { + using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; + using MessageT = rcl_interfaces::msg::Log; + using PublisherT = rclcpp::mock::Publisher; + using SubscriptionIntraProcessT = rclcpp::experimental::mock::SubscriptionIntraProcess; + + constexpr auto history_depth = 10u; + + auto ipm = std::make_shared(); + + auto p1 = std::make_shared(rclcpp::QoS(history_depth).best_effort()); + + auto s1 = std::make_shared(rclcpp::QoS(history_depth).best_effort()); + auto s2 = std::make_shared(rclcpp::QoS(history_depth).best_effort()); + + auto p1_id = ipm->add_publisher(p1); + p1->set_intra_process_manager(p1_id, ipm); + + auto c1 = ipm->lowest_available_capacity(p1_id); + + ASSERT_LE(0u, c1); + + auto s1_id = ipm->add_subscription(s1); + auto s2_id = ipm->add_subscription(s2); + + bool unique_ids = s1_id != s2_id && p1_id != s1_id; + ASSERT_TRUE(unique_ids); + + size_t p1_subs = ipm->get_subscription_count(p1_id); + size_t non_existing_pub_subs = ipm->get_subscription_count(42); + ASSERT_EQ(2u, p1_subs); + ASSERT_EQ(0u, non_existing_pub_subs); + + c1 = ipm->lowest_available_capacity(p1_id); + auto non_existing_pub_c = ipm->lowest_available_capacity(42); + + ASSERT_EQ(history_depth, c1); + ASSERT_EQ(0u, non_existing_pub_c); + + auto unique_msg = std::make_unique(); + p1->publish(std::move(unique_msg)); + + c1 = ipm->lowest_available_capacity(p1_id); + ASSERT_EQ(history_depth - 1u, c1); + + unique_msg = std::make_unique(); + p1->publish(std::move(unique_msg)); + + c1 = ipm->lowest_available_capacity(p1_id); + ASSERT_EQ(history_depth - 2u, c1); + + s1->pop(); + + c1 = ipm->lowest_available_capacity(p1_id); + ASSERT_EQ(history_depth - 2u, c1); + + s2->pop(); + + c1 = ipm->lowest_available_capacity(p1_id); + ASSERT_EQ(history_depth - 1u, c1); + + ipm->get_subscription_intra_process(s1_id).reset(); + + c1 = ipm->lowest_available_capacity(p1_id); + ASSERT_EQ(history_depth - 1u, c1); +} diff --git a/rclcpp/test/rclcpp/test_publisher.cpp b/rclcpp/test/rclcpp/test_publisher.cpp index 20a46194fc..a0c3ec8b75 100644 --- a/rclcpp/test/rclcpp/test_publisher.cpp +++ b/rclcpp/test/rclcpp/test_publisher.cpp @@ -629,6 +629,41 @@ TEST_P(TestPublisherWaitForAllAcked, check_wait_for_all_acked_with_QosPolicy) { EXPECT_TRUE(pub->wait_for_all_acked(std::chrono::milliseconds(6000))); } +TEST_F(TestPublisher, lowest_available_ipm_capacity) { + constexpr auto history_depth = 10u; + + initialize(rclcpp::NodeOptions().use_intra_process_comms(true)); + + rclcpp::PublisherOptionsWithAllocator> options_ipm_disabled; + options_ipm_disabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; + + rclcpp::PublisherOptionsWithAllocator> options_ipm_enabled; + options_ipm_enabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + + auto do_nothing = [](std::shared_ptr) {}; + auto pub_ipm_disabled = node->create_publisher( + "topic", history_depth, + options_ipm_disabled); + auto pub_ipm_enabled = node->create_publisher( + "topic", history_depth, + options_ipm_enabled); + auto sub = node->create_subscription( + "topic", + history_depth, + do_nothing); + + ASSERT_EQ(1, pub_ipm_enabled->get_intra_process_subscription_count()); + ASSERT_EQ(0, pub_ipm_disabled->lowest_available_ipm_capacity()); + ASSERT_EQ(history_depth, pub_ipm_enabled->lowest_available_ipm_capacity()); + + auto msg = std::make_shared(); + ASSERT_NO_THROW(pub_ipm_disabled->publish(*msg)); + ASSERT_NO_THROW(pub_ipm_enabled->publish(*msg)); + + ASSERT_EQ(0, pub_ipm_disabled->lowest_available_ipm_capacity()); + ASSERT_EQ(history_depth - 1u, pub_ipm_enabled->lowest_available_ipm_capacity()); +} + INSTANTIATE_TEST_SUITE_P( TestWaitForAllAckedWithParm, TestPublisherWaitForAllAcked,