From 8cd49bd0cb91126963eeec38cdaa06e4922db346 Mon Sep 17 00:00:00 2001 From: Jaison Titus Date: Fri, 18 Oct 2019 14:38:40 -0700 Subject: [PATCH] - Modified topic_cache to include qos policies - Modified call to topic_cache.addTopic at custom_participant_info.hpp - Wrote tests for topic_cache Signed-off-by: Jaison Titus --- .../custom_participant_info.hpp | 7 +- .../rmw_fastrtps_shared_cpp/topic_cache.hpp | 81 ++++-- .../test/test_topic_cache.cpp | 251 ++++++++++++++---- 3 files changed, 273 insertions(+), 66 deletions(-) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp index eb6fbc4fe..b66510e37 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp @@ -169,8 +169,11 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener { std::lock_guard guard(topic_cache.getMutex()); if (is_alive) { - trigger = topic_cache().addTopic(proxyData.RTPSParticipantKey(), - proxyData.topicName().to_string(), proxyData.typeName().to_string()); + trigger = topic_cache().addTopic( + proxyData.RTPSParticipantKey(), + proxyData.topicName().to_string(), + proxyData.typeName().to_string(), + proxyData.m_qos); } else { trigger = topic_cache().removeTopic(proxyData.RTPSParticipantKey(), proxyData.topicName().to_string(), proxyData.typeName().to_string()); diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp index c5604ee93..872a2ae3d 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp @@ -24,10 +24,12 @@ #include #include #include +#include #include "fastrtps/participant/Participant.h" #include "fastrtps/rtps/common/Guid.h" #include "fastrtps/rtps/common/InstanceHandle.h" +#include "qos_converter.hpp" #include "rcpputils/thread_safety_annotations.hpp" #include "rcutils/logging_macros.h" @@ -39,19 +41,20 @@ typedef eprosima::fastrtps::rtps::GUID_t GUID_t; class TopicCache { private: - typedef std::map>> ParticipantTopicMap; typedef std::unordered_map> TopicToTypes; + typedef std::map ParticipantTopicMap; + typedef std::vector> TopicData; + typedef std::unordered_map TopicNameToTopicData; /** - * Map of topic names to a vector of types that topic may use. + * Map of topic names to TopicData. Where topic data is vector of tuples containing + * participant GUID, topic type and the qos policy of the respective participant. * Topics here are represented as one to many, DDS XTypes 1.2 * specifies application code 'generally' uses a 1-1 relationship. * However, generic services such as logger and monitor, can discover * multiple types on the same topic. - * */ - TopicToTypes topic_to_types_; + TopicNameToTopicData topic_name_to_topic_data_; /** * Map of participant GUIDS to a set of topic-type. @@ -59,11 +62,27 @@ class TopicCache ParticipantTopicMap participant_to_topics_; /** - * Helper function to initialize a topic vector. + * Helper function to initialize an empty TopicData for a topic name. * - * @param topic_name + * @param topic_name the topic name for which the TopicNameToTopicData map should be initialised. + * @param topic_name_to_topic_data the map to initialise. */ - void initializeTopic(const std::string & topic_name, TopicToTypes & topic_to_types) + void initializeTopicDataMap( + const std::string & topic_name, + TopicNameToTopicData & topic_name_to_topic_data) + { + if (topic_name_to_topic_data.find(topic_name) == topic_name_to_topic_data.end()) { + topic_name_to_topic_data[topic_name] = TopicData(); + } + } + + /** + * Helper function to initialize a topic vector. + * + * @param topic_name the topic name for which the TopicToTypes map should be initialised. + * @param topic_to_types the map to initialise. + */ + void initializeTopicTypesMap(const std::string & topic_name, TopicToTypes & topic_to_types) { if (topic_to_types.find(topic_name) == topic_to_types.end()) { topic_to_types[topic_name] = std::vector(); @@ -89,9 +108,16 @@ class TopicCache /** * @return a map of topic name to the vector of topic types used. */ - const TopicToTypes & getTopicToTypes() const + const TopicToTypes getTopicToTypes() const { - return topic_to_types_; + TopicToTypes topic_to_types; + for (const auto & it : topic_name_to_topic_data_) { + topic_to_types[it.first] = std::vector(); + for (const auto & mit : it.second) { + topic_to_types[it.first].push_back(std::get<1>(mit)); + } + } + return topic_to_types; } /** @@ -102,6 +128,14 @@ class TopicCache return participant_to_topics_; } + /** + * @return a map of topic name to a vector of GUID_t, type name and qos profile tuple. + */ + const TopicNameToTopicData & getTopicNameToTopicData() const + { + return topic_name_to_topic_data_; + } + /** * Add a topic based on discovery. * @@ -110,15 +144,17 @@ class TopicCache * @param type_name * @return true if a change has been recorded */ + template bool addTopic( const eprosima::fastrtps::rtps::InstanceHandle_t & rtpsParticipantKey, const std::string & topic_name, - const std::string & type_name) + const std::string & type_name, + const T & dds_qos) { - initializeTopic(topic_name, topic_to_types_); + initializeTopicDataMap(topic_name, topic_name_to_topic_data_); auto guid = iHandle2GUID(rtpsParticipantKey); initializeParticipantMap(participant_to_topics_, guid); - initializeTopic(topic_name, participant_to_topics_[guid]); + initializeTopicTypesMap(topic_name, participant_to_topics_[guid]); if (rcutils_logging_logger_is_enabled_for("rmw_fastrtps_shared_cpp", RCUTILS_LOG_SEVERITY_DEBUG)) { @@ -129,7 +165,9 @@ class TopicCache "Adding topic '%s' with type '%s' for node '%s'", topic_name.c_str(), type_name.c_str(), guid_stream.str().c_str()); } - topic_to_types_[topic_name].push_back(type_name); + auto qos_profile = rmw_qos_profile_t(); + dds_qos_to_rmw_qos(dds_qos, &qos_profile); + topic_name_to_topic_data_[topic_name].push_back(std::make_tuple(guid, type_name, qos_profile)); participant_to_topics_[guid][topic_name].push_back(type_name); return true; } @@ -147,22 +185,26 @@ class TopicCache const std::string & topic_name, const std::string & type_name) { - if (topic_to_types_.find(topic_name) == topic_to_types_.end()) { + if (topic_name_to_topic_data_.find(topic_name) == topic_name_to_topic_data_.end()) { RCUTILS_LOG_DEBUG_NAMED( "rmw_fastrtps_shared_cpp", "unexpected removal on topic '%s' with type '%s'", topic_name.c_str(), type_name.c_str()); return false; } + auto guid = iHandle2GUID(rtpsParticipantKey); { - auto & type_vec = topic_to_types_[topic_name]; - type_vec.erase(std::find(type_vec.begin(), type_vec.end(), type_name)); + auto & type_vec = topic_name_to_topic_data_[topic_name]; + type_vec.erase(std::find_if(type_vec.begin(), type_vec.end(), + [type_name, guid](const auto & topic_info) { + return type_name.compare(std::get<1>(topic_info)) == 0 && + guid == std::get<0>(topic_info); + })); if (type_vec.empty()) { - topic_to_types_.erase(topic_name); + topic_name_to_topic_data_.erase(topic_name); } } - auto guid = iHandle2GUID(rtpsParticipantKey); auto guid_topics_pair = participant_to_topics_.find(guid); if (guid_topics_pair != participant_to_topics_.end() && guid_topics_pair->second.find(topic_name) != guid_topics_pair->second.end()) @@ -180,6 +222,7 @@ class TopicCache "rmw_fastrtps_shared_cpp", "Unable to remove topic, does not exist '%s' with type '%s'", topic_name.c_str(), type_name.c_str()); + return false; } return true; } diff --git a/rmw_fastrtps_shared_cpp/test/test_topic_cache.cpp b/rmw_fastrtps_shared_cpp/test/test_topic_cache.cpp index c4d21eaf9..180fe501e 100644 --- a/rmw_fastrtps_shared_cpp/test/test_topic_cache.cpp +++ b/rmw_fastrtps_shared_cpp/test/test_topic_cache.cpp @@ -11,15 +11,20 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include +#include #include "gtest/gtest.h" +#include "rmw_fastrtps_shared_cpp/topic_cache.hpp" + #include "fastrtps/qos/ReaderQos.h" #include "fastrtps/qos/WriterQos.h" #include "fastrtps/rtps/common/InstanceHandle.h" - #include "rmw/types.h" -#include "rmw_fastrtps_shared_cpp/topic_cache.hpp" + using eprosima::fastrtps::WriterQos; using eprosima::fastrtps::ReaderQos; @@ -27,62 +32,218 @@ using eprosima::fastrtps::rtps::GUID_t; using eprosima::fastrtps::rtps::GuidPrefix_t; using eprosima::fastrtps::rtps::InstanceHandle_t; -TEST(TopicCacheTest, test_topic_cache_add_topic) { - // Create an instance handler - const auto guid = GUID_t(GuidPrefix_t(), 1); - InstanceHandle_t instance_handler = InstanceHandle_t(); - instance_handler = guid; +class TopicCacheTestFixture : public ::testing::Test +{ +public: + TopicCache topic_cache; + WriterQos qos[2]; + rmw_qos_profile_t rmw_qos[2]; + InstanceHandle_t instance_handler[2]; + GUID_t guid[2]; + void SetUp() + { + // Create an instance handlers + for (int i = 0; i < 2; i++) { + guid[i] = GUID_t(GuidPrefix_t(), i + 1); + instance_handler[i] = guid[i]; + } - // Create an instance of TopicCache - auto topic_cache = TopicCache(); - // Add Topic - bool did_add = topic_cache.addTopic(instance_handler, "TestTopic", "TestType"); - // Verify that the returned value was true - EXPECT_TRUE(did_add); + // DDS qos + qos[0].m_durability.kind = eprosima::fastrtps::TRANSIENT_DURABILITY_QOS; + qos[0].m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; + qos[0].m_deadline.period = {123, 5678}; + qos[0].m_lifespan.duration = {190, 1234}; + qos[0].m_liveliness.kind = eprosima::fastrtps::MANUAL_BY_TOPIC_LIVELINESS_QOS; + qos[0].m_liveliness.lease_duration = {80, 5555555}; - // Verify that there exists a value of the guid in the map - const auto & it = topic_cache.getTopicToTypes().find("TestTopic"); - ASSERT_FALSE(it == topic_cache.getTopicToTypes().end()); + // equivalent rmq qos + rmw_qos[0].durability = RMW_QOS_POLICY_DURABILITY_UNKNOWN; + rmw_qos[0].reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + rmw_qos[0].liveliness = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC; + rmw_qos[0].liveliness_lease_duration.sec = 80u; + rmw_qos[0].liveliness_lease_duration.nsec = 5555555u; + rmw_qos[0].deadline.sec = 123u; + rmw_qos[0].deadline.nsec = 5678u; + rmw_qos[0].lifespan.sec = 190u; + rmw_qos[0].lifespan.nsec = 1234u; + // DDS qos + qos[1].m_durability.kind = eprosima::fastrtps::TRANSIENT_DURABILITY_QOS; + qos[1].m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; + qos[1].m_deadline.period = {12, 1234}; + qos[1].m_lifespan.duration = {19, 5432}; + qos[1].m_liveliness.kind = eprosima::fastrtps::MANUAL_BY_TOPIC_LIVELINESS_QOS; + qos[1].m_liveliness.lease_duration = {8, 78901234}; + + // equivalent rmq qos + rmw_qos[1].durability = RMW_QOS_POLICY_DURABILITY_UNKNOWN; + rmw_qos[1].reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + rmw_qos[1].liveliness = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC; + rmw_qos[1].liveliness_lease_duration.sec = 8u; + rmw_qos[1].liveliness_lease_duration.nsec = 78901234u; + rmw_qos[1].deadline.sec = 12u; + rmw_qos[1].deadline.nsec = 1234u; + rmw_qos[1].lifespan.sec = 19u; + rmw_qos[1].lifespan.nsec = 5432u; + + // Add some topics + topic_cache.addTopic(instance_handler[0], "topic1", "type1", qos[0]); + topic_cache.addTopic(instance_handler[0], "topic2", "type2", qos[0]); + topic_cache.addTopic(instance_handler[1], "topic1", "type1", qos[1]); + topic_cache.addTopic(instance_handler[1], "topic2", "type1", qos[1]); + } +}; + +TEST_F(TopicCacheTestFixture, test_topic_cache_get_topic_types) +{ + const auto & topic_type_map = this->topic_cache.getTopicToTypes(); + + // topic1 + const auto & it = topic_type_map.find("topic1"); + ASSERT_TRUE(it != topic_type_map.end()); // Verify that the object returned from the map is indeed the one expected - const auto topic_types = it->second; - // Verify that this vector only has one element - EXPECT_EQ(topic_types.size(), 1u); - // Verify the topic topic is present in the vector - EXPECT_TRUE(std::find(topic_types.begin(), topic_types.end(), "TestType") != topic_types.end()); + auto topic_types = it->second; + // Verify that there are two entries for type1 + EXPECT_EQ(std::count(topic_types.begin(), topic_types.end(), "type1"), 2); + EXPECT_EQ(topic_types.at(0), "type1"); + + // topic2 + const auto & it2 = topic_type_map.find("topic2"); + ASSERT_TRUE(it2 != topic_type_map.end()); + // Verify that the object returned from the map is indeed the one expected + topic_types = it2->second; + // Verify that there are two entries for type1 + EXPECT_TRUE(std::find(topic_types.begin(), topic_types.end(), "type1") != topic_types.end()); + EXPECT_TRUE(std::find(topic_types.begin(), topic_types.end(), "type2") != topic_types.end()); } -TEST(TopicCacheTest, test_topic_cache_remove_policy_element_exists) { - // Create an instance handler - const auto guid = GUID_t(GuidPrefix_t(), 2); - InstanceHandle_t instance_handler = InstanceHandle_t(); - instance_handler = guid; +TEST_F(TopicCacheTestFixture, test_topic_cache_get_participant_map) +{ + const auto & participant_topic_map = this->topic_cache.getParticipantToTopics(); - // Create an instance of TopicCache - auto topic_cache = TopicCache(); - // add topic - topic_cache.addTopic(instance_handler, "TestTopic", "TestType"); - // remove topic - auto did_remove = topic_cache.removeTopic(instance_handler, "TestTopic", "TestType"); + // participant 1 + const auto & it = participant_topic_map.find(this->guid[0]); + ASSERT_TRUE(it != participant_topic_map.end()); + // Verify that the topic and respective types are present + auto & topic_type_map = it->second; + + const auto & topic1it = topic_type_map.find("topic1"); + ASSERT_TRUE(topic1it != topic_type_map.end()); + auto topic_types = topic1it->second; + EXPECT_TRUE(std::find(topic_types.begin(), topic_types.end(), "type1") != topic_types.end()); + + const auto & topic2it = topic_type_map.find("topic2"); + ASSERT_TRUE(topic2it != topic_type_map.end()); + topic_types = topic2it->second; + EXPECT_TRUE(std::find(topic_types.begin(), topic_types.end(), "type2") != topic_types.end()); + + // participant 2 + const auto & it2 = participant_topic_map.find(this->guid[1]); + ASSERT_TRUE(it2 != participant_topic_map.end()); + // Verify that the topic and respective types are present + auto & topic_type_map2 = it2->second; + + const auto & topic1it2 = topic_type_map2.find("topic1"); + ASSERT_TRUE(topic1it2 != topic_type_map2.end()); + topic_types = topic1it2->second; + EXPECT_TRUE(std::find(topic_types.begin(), topic_types.end(), "type1") != topic_types.end()); + + const auto & topic2it2 = topic_type_map2.find("topic2"); + ASSERT_TRUE(topic2it2 != topic_type_map2.end()); + topic_types = topic2it2->second; + EXPECT_TRUE(std::find(topic_types.begin(), topic_types.end(), "type1") != topic_types.end()); +} + +TEST_F(TopicCacheTestFixture, test_topic_cache_get_topic_name_topic_data_map) +{ + const auto & topic_data_map = this->topic_cache.getTopicNameToTopicData(); + auto expected_results = std::map>>(); + expected_results["topic1"].push_back(std::make_tuple(guid[0], "type1", rmw_qos[0])); + expected_results["topic1"].push_back(std::make_tuple(guid[1], "type1", rmw_qos[1])); + expected_results["topic2"].push_back(std::make_tuple(guid[0], "type2", rmw_qos[0])); + expected_results["topic2"].push_back(std::make_tuple(guid[1], "type1", rmw_qos[1])); + for (auto & result_it : expected_results) { + auto & topic_name = result_it.first; + auto & expected_topic_data = result_it.second; + + const auto & it = topic_data_map.find(topic_name); + ASSERT_TRUE(it != topic_data_map.end()); + // Verify that the topic has all the associated data + auto & topic_data = it->second; + for (auto i = 0u; i < expected_topic_data.size(); i++) { + // GUID + EXPECT_EQ(std::get<0>(topic_data.at(i)), std::get<0>(expected_topic_data.at(i))); + // TYPE + EXPECT_EQ(std::get<1>(topic_data.at(i)), std::get<1>(expected_topic_data.at(i))); + // QOS + auto & qos = std::get<2>(topic_data.at(i)); + auto & expected_qos = std::get<2>(expected_topic_data.at(i)); + EXPECT_EQ(qos.durability, expected_qos.durability); + EXPECT_EQ(qos.reliability, expected_qos.reliability); + EXPECT_EQ(qos.liveliness, expected_qos.liveliness); + EXPECT_EQ(qos.liveliness_lease_duration.sec, expected_qos.liveliness_lease_duration.sec); + EXPECT_EQ(qos.liveliness_lease_duration.nsec, expected_qos.liveliness_lease_duration.nsec); + EXPECT_EQ(qos.deadline.sec, expected_qos.deadline.sec); + EXPECT_EQ(qos.deadline.nsec, expected_qos.deadline.nsec); + EXPECT_EQ(qos.lifespan.sec, expected_qos.lifespan.sec); + EXPECT_EQ(qos.lifespan.nsec, expected_qos.lifespan.nsec); + } + } +} + +TEST_F(TopicCacheTestFixture, test_topic_cache_add_topic) +{ + // Add Topic + bool did_add = this->topic_cache.addTopic(this->instance_handler[1], "TestTopic", "TestType", + this->qos[1]); + // Verify that the returned value was true + EXPECT_TRUE(did_add); +} + +TEST_F(TopicCacheTestFixture, test_topic_cache_remove_topic_element_exists) +{ + auto did_remove = this->topic_cache.removeTopic(this->instance_handler[0], "topic1", "type1"); // Assert that the return was true ASSERT_TRUE(did_remove); + // Verify it is removed from TopicToTypes + const auto & topic_type_map = this->topic_cache.getTopicToTypes(); + const auto & topic_type_it = topic_type_map.find("topic1"); + ASSERT_TRUE(topic_type_it != topic_type_map.end()); + EXPECT_EQ(std::count(topic_type_it->second.begin(), topic_type_it->second.end(), "type1"), 1); + // Verify it is removed from ParticipantTopicMap + const auto & participant_topic_map = this->topic_cache.getParticipantToTopics(); + const auto & participant_topic_it = participant_topic_map.find(this->guid[0]); + ASSERT_TRUE(participant_topic_it != participant_topic_map.end()); + const auto & p_topic_type_it = participant_topic_it->second.find("topic1"); + ASSERT_TRUE(p_topic_type_it == participant_topic_it->second.end()); + // Verify it is removed from TopicNameToTopicTypeMap + const auto & topic_data_map = this->topic_cache.getTopicNameToTopicData(); + const auto & topic_data_it = topic_data_map.find("topic1"); + ASSERT_TRUE(topic_data_it != topic_data_map.end()); + EXPECT_EQ(topic_data_it->second.size(), 1u); - // Verify that there does not exist a value for the guid in the map - const auto & it = topic_cache.getTopicToTypes().find("TestTopic"); - ASSERT_TRUE(it == topic_cache.getTopicToTypes().end()); + did_remove = this->topic_cache.removeTopic(this->instance_handler[1], "topic1", "type1"); + ASSERT_TRUE(did_remove); + const auto & topic_type_map2 = this->topic_cache.getTopicToTypes(); + const auto & topic_type_it2 = topic_type_map2.find("topic1"); + ASSERT_TRUE(topic_type_it2 == topic_type_map2.end()); + const auto & participant_topic_map2 = this->topic_cache.getParticipantToTopics(); + const auto & participant_topic_it2 = participant_topic_map2.find(this->guid[1]); + ASSERT_TRUE(participant_topic_it2 != participant_topic_map2.end()); + const auto & p_topic_type_it2 = participant_topic_it2->second.find("topic1"); + ASSERT_TRUE(p_topic_type_it2 == participant_topic_it2->second.end()); + const auto & topic_data_map2 = this->topic_cache.getTopicNameToTopicData(); + const auto & topic_data_it2 = topic_data_map2.find("topic1"); + ASSERT_TRUE(topic_data_it2 == topic_data_map2.end()); } -TEST(TopicCacheTest, test_topic_cache_remove_policy_element_does_not_exist) { - // Create an instance handler - const auto guid = GUID_t(GuidPrefix_t(), 3); - InstanceHandle_t instance_handler = InstanceHandle_t(); - instance_handler = guid; - - // Create an instance of TopicCache - auto topic_cache = TopicCache(); +TEST_F(TopicCacheTestFixture, test_topic_cache_remove_policy_element_does_not_exist) +{ // add topic - topic_cache.addTopic(instance_handler, "TestTopic", "TestType"); + this->topic_cache.addTopic(this->instance_handler[1], "TestTopic", "TestType", this->qos[1]); // Assert that the return was false - auto const did_remove = topic_cache.removeTopic(instance_handler, "NewTestTopic", "TestType"); + auto const did_remove = this->topic_cache.removeTopic(this->instance_handler[1], "NewTestTopic", + "TestType"); ASSERT_FALSE(did_remove); }