From 079b7d981ba0e0e3b08e042ecca194ffb9349b50 Mon Sep 17 00:00:00 2001 From: Eric Flumerfelt Date: Fri, 28 Apr 2023 10:31:34 -0400 Subject: [PATCH 1/2] Only update connection strings if they contained a wildcard. --- src/network/NetworkManager.cpp | 35 ++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/src/network/NetworkManager.cpp b/src/network/NetworkManager.cpp index 579965d..f1924aa 100755 --- a/src/network/NetworkManager.cpp +++ b/src/network/NetworkManager.cpp @@ -11,6 +11,7 @@ #include "iomanager/connectioninfo/InfoNljs.hpp" +#include "utilities/Resolver.hpp" #include "ipm/PluginInfo.hpp" #include "logging/Logging.hpp" @@ -271,8 +272,21 @@ NetworkManager::create_receiver(std::vector connections, Connect } else { config_json["connection_string"] = connections[0].uri; } - connections[0].uri = plugin->connect_for_receives(config_json); - TLOG_DEBUG(12) << "Receiver reports connected to URI " << connections[0].uri; + auto newCs = plugin->connect_for_receives(config_json); + TLOG_DEBUG(12) << "Receiver reports connected to URI " << newCs; + + // Replace with resolved if there are wildcards (host and/or port) + if(connections[0].uri.find("*") != std::string::npos) { + auto newUri = utilities::parse_connection_string(newCs); + auto oldUri = utilities::parse_connection_string(connections[0].uri); + + if (oldUri.port == "*") + oldUri.port = newUri.port; + if (oldUri.host == "*") + oldUri.host = newUri.host; + + connections[0].uri = oldUri.to_string(); + } if (is_pubsub) { TLOG_DEBUG(12) << "Subscribing to topic " << connections[0].data_type << " after connect_for_receives"; @@ -304,8 +318,21 @@ NetworkManager::create_sender(ConnectionInfo connection) TLOG_DEBUG(11) << "Creating sender plugin of type " << plugin_type; auto plugin = dunedaq::ipm::make_ipm_sender(plugin_type); TLOG_DEBUG(11) << "Connecting sender plugin to " << connection.uri; - connection.uri = plugin->connect_for_sends({ { "connection_string", connection.uri } }); - TLOG_DEBUG(11) << "Sender Plugin connected, reports URI " << connection.uri; + auto newCs = plugin->connect_for_sends({ { "connection_string", connection.uri } }); + TLOG_DEBUG(11) << "Sender Plugin connected, reports URI " << newCs; + + // Replace with resolved if there are wildcards (host and/or port) + if(connection.uri.find("*") != std::string::npos) { + auto newUri = utilities::parse_connection_string(newCs); + auto oldUri = utilities::parse_connection_string(connection.uri); + + if (oldUri.port == "*") + oldUri.port = newUri.port; + if (oldUri.host == "*") + oldUri.host = newUri.host; + + connection.uri = oldUri.to_string(); + } if (m_config_client != nullptr && is_pubsub) { m_config_client->publish(connection); From 83935b9f5c659dec2917d3b749d191fff8513924 Mon Sep 17 00:00:00 2001 From: Eric Flumerfelt Date: Wed, 31 May 2023 14:46:37 -0400 Subject: [PATCH 2/2] Always set config_client_interval, as it is used by the subscriber update thread. Make the Data structs in IOManager_test different. --- src/network/NetworkManager.cpp | 2 +- unittest/IOManager_test.cxx | 42 ++++++++++++---------------------- 2 files changed, 15 insertions(+), 29 deletions(-) diff --git a/src/network/NetworkManager.cpp b/src/network/NetworkManager.cpp index f1924aa..0fd767f 100755 --- a/src/network/NetworkManager.cpp +++ b/src/network/NetworkManager.cpp @@ -85,8 +85,8 @@ NetworkManager::configure(const Connections_t& connections, } TLOG_DEBUG(17) << "ConnectionServer host and port are " << connectionServer << ":" << connectionPort; m_config_client = std::make_unique(connectionServer, connectionPort, config_client_interval); - m_config_client_interval = config_client_interval; } + m_config_client_interval = config_client_interval; } void diff --git a/unittest/IOManager_test.cxx b/unittest/IOManager_test.cxx index f0e8eda..dddbedb 100755 --- a/unittest/IOManager_test.cxx +++ b/unittest/IOManager_test.cxx @@ -47,13 +47,11 @@ struct Data2 { int d1; double d2; - std::string d3; Data2() = default; - Data2(int i, double d, std::string s) + Data2(int i, double d) : d1(i) , d2(d) - , d3(s) { } virtual ~Data2() = default; @@ -62,19 +60,15 @@ struct Data2 Data2(Data2&&) = default; Data2& operator=(Data2&&) = default; - DUNE_DAQ_SERIALIZE(Data2, d1, d2, d3); + DUNE_DAQ_SERIALIZE(Data2, d1, d2); }; struct Data3 { int d1; - double d2; - std::string d3; Data3() = default; - Data3(int i, double d, std::string s) + Data3(int i) : d1(i) - , d2(d) - , d3(s) { } virtual ~Data3() = default; @@ -83,7 +77,7 @@ struct Data3 Data3(Data3&&) = default; Data3& operator=(Data3&&) = default; - DUNE_DAQ_SERIALIZE(Data3, d1, d2, d3); + DUNE_DAQ_SERIALIZE(Data3, d1); }; struct NonCopyableData @@ -200,7 +194,7 @@ struct ConfigurationTestFixture connections.emplace_back(Connection{ pub1_id, "inproc://bar", ConnectionType::kPubSub }); connections.emplace_back(Connection{ pub2_id, "inproc://baz", ConnectionType::kPubSub }); connections.emplace_back(Connection{ pub3_id, "inproc://qui", ConnectionType::kPubSub }); - IOManager::get()->configure(queues, connections, false, 0ms); // Not using connectivity service + IOManager::get()->configure(queues, connections, false, 1000ms); // Not using connectivity service } ~ConfigurationTestFixture() { IOManager::get()->reset(); } @@ -284,7 +278,7 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture) auto sub3_receiver = IOManager::get()->get_receiver(sub3_id); // Sub1 is subscribed to all data_t publishers, Sub2 only to pub2, Sub3 to all data2_t - Data2 sent_t1(56, 26.5, "test1"); + Data2 sent_t1(56, 26.5); pub1_sender->send(std::move(sent_t1), dunedaq::iomanager::Sender::s_no_block); auto ret1 = sub1_receiver->receive(std::chrono::milliseconds(10)); @@ -294,9 +288,8 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture) sub3_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; }); BOOST_CHECK_EQUAL(ret1.d1, 56); BOOST_CHECK_EQUAL(ret1.d2, 26.5); - BOOST_CHECK_EQUAL(ret1.d3, "test1"); - Data2 sent_t2(57, 27.5, "test2"); + Data2 sent_t2(57, 27.5); pub2_sender->send(std::move(sent_t2), dunedaq::iomanager::Sender::s_no_block); ret1 = sub1_receiver->receive(std::chrono::milliseconds(10)); @@ -305,12 +298,10 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture) sub3_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; }); BOOST_CHECK_EQUAL(ret1.d1, 57); BOOST_CHECK_EQUAL(ret1.d2, 27.5); - BOOST_CHECK_EQUAL(ret1.d3, "test2"); BOOST_CHECK_EQUAL(ret2.d1, 57); BOOST_CHECK_EQUAL(ret2.d2, 27.5); - BOOST_CHECK_EQUAL(ret2.d3, "test2"); - Data3 sent_t3(58, 28.5, "test3"); + Data3 sent_t3(58); pub3_sender->send(std::move(sent_t3), dunedaq::iomanager::Sender::s_no_block); BOOST_REQUIRE_EXCEPTION( @@ -319,8 +310,6 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture) sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; }); auto ret3 = sub3_receiver->receive(std::chrono::milliseconds(10)); BOOST_CHECK_EQUAL(ret3.d1, 58); - BOOST_CHECK_EQUAL(ret3.d2, 28.5); - BOOST_CHECK_EQUAL(ret3.d3, "test3"); } BOOST_FIXTURE_TEST_CASE(PubSubWithTopic, ConfigurationTestFixture) @@ -334,54 +323,51 @@ BOOST_FIXTURE_TEST_CASE(PubSubWithTopic, ConfigurationTestFixture) sub2_receiver->subscribe("sub2_topic"); // Sub1 is subscribed to all data_t publishers, Sub2 only to pub2 - Data2 sent_t0(54, 24.5, "test0"); + Data2 sent_t0(54, 24.5); pub1_sender->send(std::move(sent_t0), dunedaq::iomanager::Sender::s_no_block); auto ret1 = sub1_receiver->receive(std::chrono::milliseconds(10)); BOOST_CHECK_EQUAL(ret1.d1, 54); BOOST_CHECK_EQUAL(ret1.d2, 24.5); - BOOST_CHECK_EQUAL(ret1.d3, "test0"); BOOST_REQUIRE_EXCEPTION( sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; }); sub1_receiver->unsubscribe("data2_t"); - Data2 sent_t1(55, 25.5, "test1"); + Data2 sent_t1(55, 25.5); pub1_sender->send(std::move(sent_t1), dunedaq::iomanager::Sender::s_no_block); BOOST_REQUIRE_EXCEPTION( sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; }); BOOST_REQUIRE_EXCEPTION( sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; }); - Data2 sent_t2(56, 26.5, "test2"); + Data2 sent_t2(56, 26.5); pub1_sender->send_with_topic(std::move(sent_t2), dunedaq::iomanager::Sender::s_no_block, "test_topic"); BOOST_REQUIRE_EXCEPTION( sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; }); BOOST_REQUIRE_EXCEPTION( sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; }); - Data2 sent_t3{ 57, 27.5, "test3" }; + Data2 sent_t3{ 57, 27.5}; pub1_sender->send_with_topic(std::move(sent_t3), dunedaq::iomanager::Sender::s_no_block, "sub1_topic"); ret1 = sub1_receiver->receive(std::chrono::milliseconds(10)); BOOST_CHECK_EQUAL(ret1.d1, 57); BOOST_CHECK_EQUAL(ret1.d2, 27.5); - BOOST_CHECK_EQUAL(ret1.d3, "test3"); BOOST_REQUIRE_EXCEPTION( sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; }); - Data2 sent_t4{ 58, 28.5, "test4" }; + Data2 sent_t4{ 58, 28.5 }; pub1_sender->send_with_topic(std::move(sent_t4), dunedaq::iomanager::Sender::s_no_block, "sub2_topic"); BOOST_REQUIRE_EXCEPTION( sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; }); BOOST_REQUIRE_EXCEPTION( sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; }); - Data2 sent_t5{ 59, 29.5, "test5" }; + Data2 sent_t5{ 59, 29.5 }; pub2_sender->send_with_topic(std::move(sent_t5), dunedaq::iomanager::Sender::s_no_block, "sub2_topic"); BOOST_REQUIRE_EXCEPTION( sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; }); auto ret2 = sub2_receiver->receive(std::chrono::milliseconds(10)); BOOST_CHECK_EQUAL(ret2.d1, 59); BOOST_CHECK_EQUAL(ret2.d2, 29.5); - BOOST_CHECK_EQUAL(ret2.d3, "test5"); } BOOST_FIXTURE_TEST_CASE(NonSerializableSendReceive, ConfigurationTestFixture)