From b65a8c727892d478f36c02f309e8506864564fe6 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Sat, 30 Nov 2024 14:25:06 -0500 Subject: [PATCH] Omnibus fixes for running tests with Connext. (#2684) * Omnibus fixes for running tests with Connext. When running the tests with RTI Connext as the default RMW, some of the tests are failing. There are three different failures fixed here: 1. Setting the liveliness duration to a value smaller than a microsecond causes Connext to throw an error. Set it to a millisecond. 2. Using the SystemDefaultsQoS sets the QoS to KEEP_LAST 1. Connext is somewhat slow in this regard, so it can be the case that we are overwriting a previous service introspection event with the next one. Switch to the ServicesDefaultQoS in the test, which ensures we will not lose events. 3. Connext is slow to match publishers and subscriptions. Thus, when creating a subscription "on-the-fly", we should wait for the publisher to match it before expecting the subscription to actually receive data from it. With these fixes in place, the test_client_common, test_generic_service, test_service_introspection, and test_executors tests all pass for me with rmw_connextdds. Signed-off-by: Chris Lalancette * Fixes for executors. Signed-off-by: Chris Lalancette * One more fix for services. Signed-off-by: Chris Lalancette * More fixes for service_introspection. Signed-off-by: Chris Lalancette * More fixes for introspection. Signed-off-by: Chris Lalancette --------- Signed-off-by: Chris Lalancette (cherry picked from commit 9984197c292d6c5ae0e7661aaea245ffb0fea057) # Conflicts: # rclcpp/test/rclcpp/executors/test_executors.cpp # rclcpp/test/rclcpp/test_generic_service.cpp --- .../test/rclcpp/executors/test_executors.cpp | 78 ++++ rclcpp/test/rclcpp/test_client_common.cpp | 2 +- rclcpp/test/rclcpp/test_generic_service.cpp | 422 ++++++++++++++++++ rclcpp/test/rclcpp/test_service.cpp | 2 +- .../rclcpp/test_service_introspection.cpp | 94 +++- 5 files changed, 578 insertions(+), 20 deletions(-) create mode 100644 rclcpp/test/rclcpp/test_generic_service.cpp diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 9997c06649..90e07c4535 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -701,6 +701,84 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode) } } +<<<<<<< HEAD +======= +// Check that executors are correctly notified while they are spinning +// we notify twice to ensure that the notify waitable is still working +// after the first notification +TYPED_TEST(TestExecutors, notifyTwiceWhileSpinning) +{ + using ExecutorType = TypeParam; + + // Create executor, add the node and start spinning + ExecutorType executor; + executor.add_node(this->node); + std::thread spinner([&]() {executor.spin();}); + + // Wait for executor to be spinning + while (!executor.is_spinning()) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + // Create the first subscription while the executor is already spinning + std::atomic sub1_msg_count {0}; + auto sub1 = this->node->template create_subscription( + this->publisher->get_topic_name(), + rclcpp::QoS(10), + [&sub1_msg_count](test_msgs::msg::Empty::ConstSharedPtr) { + sub1_msg_count++; + }); + + // Wait for the subscription to be matched + size_t tries = 10000; + while (this->publisher->get_subscription_count() < 2 && tries-- > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_EQ(this->publisher->get_subscription_count(), 2); + + // Publish a message and verify it's received + this->publisher->publish(test_msgs::msg::Empty()); + auto start = std::chrono::steady_clock::now(); + while (sub1_msg_count == 0 && (std::chrono::steady_clock::now() - start) < 10s) { + std::this_thread::sleep_for(1ms); + } + EXPECT_EQ(sub1_msg_count, 1u); + + // Create a second subscription while the executor is already spinning + std::atomic sub2_msg_count {0}; + auto sub2 = this->node->template create_subscription( + this->publisher->get_topic_name(), + rclcpp::QoS(10), + [&sub2_msg_count](test_msgs::msg::Empty::ConstSharedPtr) { + sub2_msg_count++; + }); + + // Wait for the subscription to be matched + tries = 10000; + while (this->publisher->get_subscription_count() < 3 && tries-- > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_EQ(this->publisher->get_subscription_count(), 3); + + // Publish a message and verify it's received by both subscriptions + this->publisher->publish(test_msgs::msg::Empty()); + start = std::chrono::steady_clock::now(); + while ( + sub1_msg_count == 1 && + sub2_msg_count == 0 && + (std::chrono::steady_clock::now() - start) < 10s) + { + std::this_thread::sleep_for(1ms); + } + EXPECT_EQ(sub1_msg_count, 2u); + EXPECT_EQ(sub2_msg_count, 1u); + + // Cancel needs to be called before join, so that executor.spin() returns. + executor.cancel(); + spinner.join(); +} + +>>>>>>> 9984197c (Omnibus fixes for running tests with Connext. (#2684)) // Check spin_until_future_complete with node base pointer (instantiates its own executor) TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr) { diff --git a/rclcpp/test/rclcpp/test_client_common.cpp b/rclcpp/test/rclcpp/test_client_common.cpp index 65475bd8fc..62aa2c05d2 100644 --- a/rclcpp/test/rclcpp/test_client_common.cpp +++ b/rclcpp/test/rclcpp/test_client_common.cpp @@ -431,7 +431,7 @@ TYPED_TEST(TestAllClientTypesWithServer, client_qos) rclcpp::ServicesQoS qos_profile; qos_profile.liveliness(rclcpp::LivelinessPolicy::Automatic); - rclcpp::Duration duration(std::chrono::nanoseconds(1)); + rclcpp::Duration duration(std::chrono::milliseconds(1)); qos_profile.deadline(duration); qos_profile.lifespan(duration); qos_profile.liveliness_lease_duration(duration); diff --git a/rclcpp/test/rclcpp/test_generic_service.cpp b/rclcpp/test/rclcpp/test_generic_service.cpp new file mode 100644 index 0000000000..dd75120c1b --- /dev/null +++ b/rclcpp/test/rclcpp/test_generic_service.cpp @@ -0,0 +1,422 @@ +// Copyright 2024 Sony Group Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "../mocking_utils/patch.hpp" +#include "../utils/rclcpp_gtest_macros.hpp" + +#include "rcl_interfaces/srv/list_parameters.hpp" +#include "test_msgs/srv/empty.hpp" +#include "test_msgs/srv/basic_types.hpp" + +using namespace std::chrono_literals; + +class TestGenericService : public ::testing::Test +{ +protected: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + node = std::make_shared("test_node", "/ns"); + } + + void TearDown() + { + node.reset(); + } + + rclcpp::Node::SharedPtr node; +}; + +class TestGenericServiceSub : public ::testing::Test +{ +protected: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + node = std::make_shared("test_node", "/ns"); + subnode = node->create_sub_node("sub_ns"); + } + + void TearDown() + { + node.reset(); + } + + rclcpp::Node::SharedPtr node; + rclcpp::Node::SharedPtr subnode; +}; + +/* + Testing service construction and destruction. + */ +TEST_F(TestGenericService, construction_and_destruction) { + auto callback = []( + rclcpp::GenericService::SharedRequest, + rclcpp::GenericService::SharedResponse) {}; + { + auto generic_service = node->create_generic_service( + "test_generic_service", "rcl_interfaces/srv/ListParameters", callback); + EXPECT_NE(nullptr, generic_service->get_service_handle()); + const rclcpp::ServiceBase * const_service_base = generic_service.get(); + EXPECT_NE(nullptr, const_service_base->get_service_handle()); + } + + { + ASSERT_THROW( + { + auto generic_service = node->create_generic_service( + "invalid_service?", "test_msgs/srv/Empty", callback); + }, rclcpp::exceptions::InvalidServiceNameError); + } + + { + ASSERT_THROW( + { + auto generic_service = node->create_generic_service( + "test_generic_service", "test_msgs/srv/NotExist", callback); + }, rclcpp::exceptions::InvalidServiceTypeError); + } +} + +/* + Testing service construction and destruction for subnodes. + */ +TEST_F(TestGenericServiceSub, construction_and_destruction) { + auto callback = []( + rclcpp::GenericService::SharedRequest, + rclcpp::GenericService::SharedResponse) {}; + { + auto generic_service = subnode->create_generic_service( + "test_generic_service", "rcl_interfaces/srv/ListParameters", callback); + EXPECT_STREQ(generic_service->get_service_name(), "/ns/sub_ns/test_generic_service"); + } + + { + ASSERT_THROW( + { + auto generic_service = subnode->create_generic_service( + "invalid_service?", "test_msgs/srv/Empty", callback); + }, rclcpp::exceptions::InvalidServiceNameError); + } + + { + ASSERT_THROW( + { + auto generic_service = subnode->create_generic_service( + "test_generic_service", "test_msgs/srv/NotExist", callback); + }, rclcpp::exceptions::InvalidServiceTypeError); + } +} + +TEST_F(TestGenericService, construction_and_destruction_rcl_errors) { + auto callback = []( + rclcpp::GenericService::SharedRequest, rclcpp::GenericService::SharedResponse) {}; + + { + auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_service_init, RCL_RET_ERROR); + // reset() isn't necessary for this exception, it just avoids unused return value warning + EXPECT_THROW( + node->create_generic_service("service", "test_msgs/srv/Empty", callback).reset(), + rclcpp::exceptions::RCLError); + } + { + // reset() is required for this one + auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_service_fini, RCL_RET_ERROR); + EXPECT_NO_THROW( + node->create_generic_service("service", "test_msgs/srv/Empty", callback).reset()); + } +} + +TEST_F(TestGenericService, generic_service_take_request) { + auto callback = []( + rclcpp::GenericService::SharedRequest, rclcpp::GenericService::SharedResponse) {}; + auto generic_service = + node->create_generic_service("test_service", "test_msgs/srv/Empty", callback); + { + auto request_id = generic_service->create_request_header(); + auto request = generic_service->create_request(); + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_take_request, RCL_RET_OK); + EXPECT_TRUE(generic_service->take_request(request, *request_id.get())); + } + { + auto request_id = generic_service->create_request_header(); + auto request = generic_service->create_request(); + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_take_request, RCL_RET_SERVICE_TAKE_FAILED); + EXPECT_FALSE(generic_service->take_request(request, *request_id.get())); + } + { + auto request_id = generic_service->create_request_header(); + auto request = generic_service->create_request(); + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_take_request, RCL_RET_ERROR); + EXPECT_THROW( + generic_service->take_request(request, *request_id.get()), rclcpp::exceptions::RCLError); + } +} + +TEST_F(TestGenericService, generic_service_send_response) { + auto callback = []( + const rclcpp::GenericService::SharedRequest, rclcpp::GenericService::SharedResponse) {}; + auto generic_service = + node->create_generic_service("test_service", "test_msgs/srv/Empty", callback); + + { + auto request_id = generic_service->create_request_header(); + auto response = generic_service->create_response(); + auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_send_response, RCL_RET_OK); + EXPECT_NO_THROW(generic_service->send_response(*request_id.get(), response)); + } + + { + auto request_id = generic_service->create_request_header(); + auto response = generic_service->create_response(); + auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_send_response, RCL_RET_ERROR); + EXPECT_THROW( + generic_service->send_response(*request_id.get(), response), + rclcpp::exceptions::RCLError); + } +} + +/* + Testing on_new_request callbacks. + */ +TEST_F(TestGenericService, generic_service_on_new_request_callback) { + auto server_callback = []( + const rclcpp::GenericService::SharedRequest, rclcpp::GenericService::SharedResponse) {FAIL();}; + rclcpp::ServicesQoS service_qos; + service_qos.keep_last(3); + auto generic_service = node->create_generic_service( + "~/test_service", "test_msgs/srv/Empty", server_callback, service_qos); + + std::atomic c1 {0}; + auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;}; + generic_service->set_on_new_request_callback(increase_c1_cb); + + auto client = node->create_client( + "~/test_service", service_qos); + { + auto request = std::make_shared(); + client->async_send_request(request); + } + + auto start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c1 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + + std::atomic c2 {0}; + auto increase_c2_cb = [&c2](size_t count_msgs) {c2 += count_msgs;}; + generic_service->set_on_new_request_callback(increase_c2_cb); + + { + auto request = std::make_shared(); + client->async_send_request(request); + } + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c2 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + + generic_service->clear_on_new_request_callback(); + + { + auto request = std::make_shared(); + client->async_send_request(request); + client->async_send_request(request); + client->async_send_request(request); + } + + std::atomic c3 {0}; + auto increase_c3_cb = [&c3](size_t count_msgs) {c3 += count_msgs;}; + generic_service->set_on_new_request_callback(increase_c3_cb); + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c3 < 3 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + EXPECT_EQ(c3.load(), 3u); + + std::function invalid_cb = nullptr; + EXPECT_THROW(generic_service->set_on_new_request_callback(invalid_cb), std::invalid_argument); +} + +TEST_F(TestGenericService, rcl_service_response_publisher_get_actual_qos_error) { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_service_response_publisher_get_actual_qos, nullptr); + auto callback = []( + const rclcpp::GenericService::SharedRequest, rclcpp::GenericService::SharedResponse) {}; + auto generic_service = + node->create_generic_service("test_service", "test_msgs/srv/Empty", callback); + RCLCPP_EXPECT_THROW_EQ( + generic_service->get_response_publisher_actual_qos(), + std::runtime_error("failed to get service's response publisher qos settings: error not set")); +} + +TEST_F(TestGenericService, rcl_service_request_subscription_get_actual_qos_error) { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_service_request_subscription_get_actual_qos, nullptr); + auto callback = []( + const rclcpp::GenericService::SharedRequest, rclcpp::GenericService::SharedResponse) {}; + auto generic_service = + node->create_generic_service("test_service", "test_msgs/srv/Empty", callback); + RCLCPP_EXPECT_THROW_EQ( + generic_service->get_request_subscription_actual_qos(), + std::runtime_error("failed to get service's request subscription qos settings: error not set")); +} + +TEST_F(TestGenericService, generic_service_qos) { + rclcpp::ServicesQoS qos_profile; + qos_profile.liveliness(rclcpp::LivelinessPolicy::Automatic); + rclcpp::Duration duration(std::chrono::milliseconds(1)); + qos_profile.deadline(duration); + qos_profile.lifespan(duration); + qos_profile.liveliness_lease_duration(duration); + + auto callback = []( + const rclcpp::GenericService::SharedRequest, rclcpp::GenericService::SharedResponse) {}; + auto generic_service = + node->create_generic_service("test_service", "test_msgs/srv/Empty", callback, qos_profile); + + auto rs_qos = generic_service->get_request_subscription_actual_qos(); + auto rp_qos = generic_service->get_response_publisher_actual_qos(); + + EXPECT_EQ(qos_profile, rp_qos); + // Lifespan has no meaning for subscription/readers + rs_qos.lifespan(qos_profile.lifespan()); + EXPECT_EQ(qos_profile, rs_qos); +} + +TEST_F(TestGenericService, generic_service_qos_depth) { + uint64_t server_cb_count_ = 0; + auto server_callback = [&]( + const rclcpp::GenericService::SharedRequest, + rclcpp::GenericService::SharedResponse) {server_cb_count_++;}; + + auto server_node = std::make_shared("server_node", "/ns"); + + rclcpp::QoS server_qos_profile(2); + + auto generic_service = server_node->create_generic_service( + "test_qos_depth", "test_msgs/srv/Empty", std::move(server_callback), server_qos_profile); + + rclcpp::QoS client_qos_profile(rclcpp::QoSInitialization::from_rmw(rmw_qos_profile_default)); + auto client = node->create_client("test_qos_depth", client_qos_profile); + + ::testing::AssertionResult request_result = ::testing::AssertionSuccess(); + auto request = std::make_shared(); + + auto client_callback = [&request_result]( + rclcpp::Client::SharedFuture future_response) { + if (nullptr == future_response.get()) { + request_result = ::testing::AssertionFailure() << "Future response was null"; + } + }; + + uint64_t client_requests = 5; + for (uint64_t i = 0; i < client_requests; i++) { + client->async_send_request(request, client_callback); + std::this_thread::sleep_for(10ms); + } + + auto start = std::chrono::steady_clock::now(); + while ((server_cb_count_ < server_qos_profile.depth()) && + (std::chrono::steady_clock::now() - start) < 1s) + { + rclcpp::spin_some(server_node); + std::this_thread::sleep_for(1ms); + } + + // Spin an extra time to check if server QoS depth has been ignored, + // so more server responses might be processed than expected. + rclcpp::spin_some(server_node); + + EXPECT_EQ(server_cb_count_, server_qos_profile.depth()); +} + +TEST_F(TestGenericService, generic_service_and_client) { + const std::string service_name = "test_service"; + const std::string service_type = "test_msgs/srv/BasicTypes"; + int64_t expected_change = 87654321; + + auto callback = [&expected_change]( + const rclcpp::GenericService::SharedRequest request, + rclcpp::GenericService::SharedResponse response) { + auto typed_request = static_cast(request.get()); + auto typed_response = static_cast(response.get()); + + typed_response->int64_value = typed_request->int64_value + expected_change; + }; + auto generic_service = node->create_generic_service(service_name, service_type, callback); + + auto client = node->create_client(service_name); + + ASSERT_TRUE(client->wait_for_service(std::chrono::seconds(5))); + ASSERT_TRUE(client->service_is_ready()); + + auto request = std::make_shared(); + request->int64_value = 12345678; + + auto generic_client_callback = [&request, &expected_change]( + std::shared_future future) { + auto response = future.get(); + EXPECT_EQ(response->int64_value, (request->int64_value + expected_change)); + }; + + auto future = + client->async_send_request(request, generic_client_callback); + rclcpp::spin_until_future_complete( + node->get_node_base_interface(), future, std::chrono::seconds(5)); +} diff --git a/rclcpp/test/rclcpp/test_service.cpp b/rclcpp/test/rclcpp/test_service.cpp index c6b3d3ace1..fc3756a0bc 100644 --- a/rclcpp/test/rclcpp/test_service.cpp +++ b/rclcpp/test/rclcpp/test_service.cpp @@ -335,7 +335,7 @@ TEST_F(TestService, rcl_service_request_subscription_get_actual_qos_error) { TEST_F(TestService, server_qos) { rclcpp::ServicesQoS qos_profile; qos_profile.liveliness(rclcpp::LivelinessPolicy::Automatic); - rclcpp::Duration duration(std::chrono::nanoseconds(1)); + rclcpp::Duration duration(std::chrono::milliseconds(1)); qos_profile.deadline(duration); qos_profile.lifespan(duration); qos_profile.liveliness_lease_duration(duration); diff --git a/rclcpp/test/rclcpp/test_service_introspection.cpp b/rclcpp/test/rclcpp/test_service_introspection.cpp index 94c94a58ce..074438bcf1 100644 --- a/rclcpp/test/rclcpp/test_service_introspection.cpp +++ b/rclcpp/test/rclcpp/test_service_introspection.cpp @@ -92,9 +92,16 @@ TEST_F(TestServiceIntrospection, service_introspection_nominal) request->set__int64_value(42); client->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); service->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + + // Wait for the introspection to attach to our subscription + size_t tries = 1000; + while (this->sub->get_publisher_count() < 2 && tries-- > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_EQ(sub->get_publisher_count(), 2u); auto future = client->async_send_request(request); ASSERT_EQ( @@ -149,9 +156,11 @@ TEST_F(TestServiceIntrospection, service_introspection_nominal) TEST_F(TestServiceIntrospection, service_introspection_enable_disable_events) { client->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_OFF); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_OFF); service->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_OFF); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_OFF); + + ASSERT_EQ(sub->get_publisher_count(), 0); auto request = std::make_shared(); request->set__bool_value(true); @@ -169,9 +178,16 @@ TEST_F(TestServiceIntrospection, service_introspection_enable_disable_events) events.clear(); client->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_METADATA); service->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_OFF); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_OFF); + + // Wait for the introspection to attach to our subscription + size_t tries = 1000; + while (this->sub->get_publisher_count() < 1 && tries-- > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_EQ(sub->get_publisher_count(), 1u); future = client->async_send_request(request); ASSERT_EQ( @@ -186,9 +202,16 @@ TEST_F(TestServiceIntrospection, service_introspection_enable_disable_events) events.clear(); client->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_OFF); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_OFF); service->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + + // Wait for the introspection to attach to our subscription + tries = 1000; + while (this->sub->get_publisher_count() < 1 && tries-- > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_EQ(sub->get_publisher_count(), 1u); future = client->async_send_request(request); ASSERT_EQ( @@ -203,9 +226,16 @@ TEST_F(TestServiceIntrospection, service_introspection_enable_disable_events) events.clear(); client->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_METADATA); service->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + + // Wait for the introspection to attach to our subscription + tries = 1000; + while (this->sub->get_publisher_count() < 2 && tries-- > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_EQ(sub->get_publisher_count(), 2u); future = client->async_send_request(request); ASSERT_EQ( @@ -221,9 +251,16 @@ TEST_F(TestServiceIntrospection, service_introspection_enable_disable_events) TEST_F(TestServiceIntrospection, service_introspection_enable_disable_event_content) { client->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_METADATA); service->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + + // Wait for the introspection to attach to our subscription + size_t tries = 1000; + while (this->sub->get_publisher_count() < 2 && tries-- > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_EQ(sub->get_publisher_count(), 2u); auto request = std::make_shared(); request->set__bool_value(true); @@ -245,9 +282,16 @@ TEST_F(TestServiceIntrospection, service_introspection_enable_disable_event_cont events.clear(); client->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); service->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + + // Wait for the introspection to attach to our subscription + tries = 1000; + while (this->sub->get_publisher_count() < 2 && tries-- > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_EQ(sub->get_publisher_count(), 2u); future = client->async_send_request(request); ASSERT_EQ( @@ -278,9 +322,16 @@ TEST_F(TestServiceIntrospection, service_introspection_enable_disable_event_cont events.clear(); client->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_METADATA); service->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + + // Wait for the introspection to attach to our subscription + tries = 1000; + while (this->sub->get_publisher_count() < 2 && tries-- > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_EQ(sub->get_publisher_count(), 2u); future = client->async_send_request(request); ASSERT_EQ( @@ -311,9 +362,16 @@ TEST_F(TestServiceIntrospection, service_introspection_enable_disable_event_cont events.clear(); client->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); service->configure_introspection( - node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + node->get_clock(), rclcpp::ServicesQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + + // Wait for the introspection to attach to our subscription + tries = 1000; + while (this->sub->get_publisher_count() < 2 && tries-- > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_EQ(sub->get_publisher_count(), 2u); future = client->async_send_request(request); ASSERT_EQ(