Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only update connection strings if they contained a wildcard. #56

Merged
merged 2 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions src/network/NetworkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "iomanager/connectioninfo/InfoNljs.hpp"

#include "utilities/Resolver.hpp"
#include "ipm/PluginInfo.hpp"
#include "logging/Logging.hpp"

Expand Down Expand Up @@ -84,8 +85,8 @@ NetworkManager::configure(const Connections_t& connections,
}
TLOG_DEBUG(17) << "ConnectionServer host and port are " << connectionServer << ":" << connectionPort;
m_config_client = std::make_unique<ConfigClient>(connectionServer, connectionPort, config_client_interval);
m_config_client_interval = config_client_interval;
}
m_config_client_interval = config_client_interval;
}

void
Expand Down Expand Up @@ -271,8 +272,21 @@ NetworkManager::create_receiver(std::vector<ConnectionInfo> connections, Connect
} else {
config_json["connection_string"] = connections[0].uri;
}
connections[0].uri = plugin->connect_for_receives(config_json);
TLOG_DEBUG(12) << "Receiver reports connected to URI " << connections[0].uri;
auto newCs = plugin->connect_for_receives(config_json);
TLOG_DEBUG(12) << "Receiver reports connected to URI " << newCs;

// Replace with resolved if there are wildcards (host and/or port)
if(connections[0].uri.find("*") != std::string::npos) {
auto newUri = utilities::parse_connection_string(newCs);
auto oldUri = utilities::parse_connection_string(connections[0].uri);

if (oldUri.port == "*")
oldUri.port = newUri.port;
if (oldUri.host == "*")
oldUri.host = newUri.host;

connections[0].uri = oldUri.to_string();
}

if (is_pubsub) {
TLOG_DEBUG(12) << "Subscribing to topic " << connections[0].data_type << " after connect_for_receives";
Expand Down Expand Up @@ -304,8 +318,21 @@ NetworkManager::create_sender(ConnectionInfo connection)
TLOG_DEBUG(11) << "Creating sender plugin of type " << plugin_type;
auto plugin = dunedaq::ipm::make_ipm_sender(plugin_type);
TLOG_DEBUG(11) << "Connecting sender plugin to " << connection.uri;
connection.uri = plugin->connect_for_sends({ { "connection_string", connection.uri } });
TLOG_DEBUG(11) << "Sender Plugin connected, reports URI " << connection.uri;
auto newCs = plugin->connect_for_sends({ { "connection_string", connection.uri } });
TLOG_DEBUG(11) << "Sender Plugin connected, reports URI " << newCs;

// Replace with resolved if there are wildcards (host and/or port)
if(connection.uri.find("*") != std::string::npos) {
auto newUri = utilities::parse_connection_string(newCs);
auto oldUri = utilities::parse_connection_string(connection.uri);

if (oldUri.port == "*")
oldUri.port = newUri.port;
if (oldUri.host == "*")
oldUri.host = newUri.host;

connection.uri = oldUri.to_string();
}

if (m_config_client != nullptr && is_pubsub) {
m_config_client->publish(connection);
Expand Down
42 changes: 14 additions & 28 deletions unittest/IOManager_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,11 @@ struct Data2
{
int d1;
double d2;
std::string d3;

Data2() = default;
Data2(int i, double d, std::string s)
Data2(int i, double d)
: d1(i)
, d2(d)
, d3(s)
{
}
virtual ~Data2() = default;
Expand All @@ -62,19 +60,15 @@ struct Data2
Data2(Data2&&) = default;
Data2& operator=(Data2&&) = default;

DUNE_DAQ_SERIALIZE(Data2, d1, d2, d3);
DUNE_DAQ_SERIALIZE(Data2, d1, d2);
};
struct Data3
{
int d1;
double d2;
std::string d3;

Data3() = default;
Data3(int i, double d, std::string s)
Data3(int i)
: d1(i)
, d2(d)
, d3(s)
{
}
virtual ~Data3() = default;
Expand All @@ -83,7 +77,7 @@ struct Data3
Data3(Data3&&) = default;
Data3& operator=(Data3&&) = default;

DUNE_DAQ_SERIALIZE(Data3, d1, d2, d3);
DUNE_DAQ_SERIALIZE(Data3, d1);
};

struct NonCopyableData
Expand Down Expand Up @@ -200,7 +194,7 @@ struct ConfigurationTestFixture
connections.emplace_back(Connection{ pub1_id, "inproc://bar", ConnectionType::kPubSub });
connections.emplace_back(Connection{ pub2_id, "inproc://baz", ConnectionType::kPubSub });
connections.emplace_back(Connection{ pub3_id, "inproc://qui", ConnectionType::kPubSub });
IOManager::get()->configure(queues, connections, false, 0ms); // Not using connectivity service
IOManager::get()->configure(queues, connections, false, 1000ms); // Not using connectivity service
}
~ConfigurationTestFixture() { IOManager::get()->reset(); }

Expand Down Expand Up @@ -284,7 +278,7 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture)
auto sub3_receiver = IOManager::get()->get_receiver<Data3>(sub3_id);

// Sub1 is subscribed to all data_t publishers, Sub2 only to pub2, Sub3 to all data2_t
Data2 sent_t1(56, 26.5, "test1");
Data2 sent_t1(56, 26.5);
pub1_sender->send(std::move(sent_t1), dunedaq::iomanager::Sender::s_no_block);

auto ret1 = sub1_receiver->receive(std::chrono::milliseconds(10));
Expand All @@ -294,9 +288,8 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture)
sub3_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
BOOST_CHECK_EQUAL(ret1.d1, 56);
BOOST_CHECK_EQUAL(ret1.d2, 26.5);
BOOST_CHECK_EQUAL(ret1.d3, "test1");

Data2 sent_t2(57, 27.5, "test2");
Data2 sent_t2(57, 27.5);
pub2_sender->send(std::move(sent_t2), dunedaq::iomanager::Sender::s_no_block);

ret1 = sub1_receiver->receive(std::chrono::milliseconds(10));
Expand All @@ -305,12 +298,10 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture)
sub3_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
BOOST_CHECK_EQUAL(ret1.d1, 57);
BOOST_CHECK_EQUAL(ret1.d2, 27.5);
BOOST_CHECK_EQUAL(ret1.d3, "test2");
BOOST_CHECK_EQUAL(ret2.d1, 57);
BOOST_CHECK_EQUAL(ret2.d2, 27.5);
BOOST_CHECK_EQUAL(ret2.d3, "test2");

Data3 sent_t3(58, 28.5, "test3");
Data3 sent_t3(58);
pub3_sender->send(std::move(sent_t3), dunedaq::iomanager::Sender::s_no_block);

BOOST_REQUIRE_EXCEPTION(
Expand All @@ -319,8 +310,6 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture)
sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
auto ret3 = sub3_receiver->receive(std::chrono::milliseconds(10));
BOOST_CHECK_EQUAL(ret3.d1, 58);
BOOST_CHECK_EQUAL(ret3.d2, 28.5);
BOOST_CHECK_EQUAL(ret3.d3, "test3");
}

BOOST_FIXTURE_TEST_CASE(PubSubWithTopic, ConfigurationTestFixture)
Expand All @@ -334,54 +323,51 @@ BOOST_FIXTURE_TEST_CASE(PubSubWithTopic, ConfigurationTestFixture)
sub2_receiver->subscribe("sub2_topic");

// Sub1 is subscribed to all data_t publishers, Sub2 only to pub2
Data2 sent_t0(54, 24.5, "test0");
Data2 sent_t0(54, 24.5);
pub1_sender->send(std::move(sent_t0), dunedaq::iomanager::Sender::s_no_block);
auto ret1 = sub1_receiver->receive(std::chrono::milliseconds(10));
BOOST_CHECK_EQUAL(ret1.d1, 54);
BOOST_CHECK_EQUAL(ret1.d2, 24.5);
BOOST_CHECK_EQUAL(ret1.d3, "test0");
BOOST_REQUIRE_EXCEPTION(
sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });

sub1_receiver->unsubscribe("data2_t");
Data2 sent_t1(55, 25.5, "test1");
Data2 sent_t1(55, 25.5);
pub1_sender->send(std::move(sent_t1), dunedaq::iomanager::Sender::s_no_block);
BOOST_REQUIRE_EXCEPTION(
sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
BOOST_REQUIRE_EXCEPTION(
sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });

Data2 sent_t2(56, 26.5, "test2");
Data2 sent_t2(56, 26.5);
pub1_sender->send_with_topic(std::move(sent_t2), dunedaq::iomanager::Sender::s_no_block, "test_topic");
BOOST_REQUIRE_EXCEPTION(
sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
BOOST_REQUIRE_EXCEPTION(
sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });

Data2 sent_t3{ 57, 27.5, "test3" };
Data2 sent_t3{ 57, 27.5};
pub1_sender->send_with_topic(std::move(sent_t3), dunedaq::iomanager::Sender::s_no_block, "sub1_topic");
ret1 = sub1_receiver->receive(std::chrono::milliseconds(10));
BOOST_CHECK_EQUAL(ret1.d1, 57);
BOOST_CHECK_EQUAL(ret1.d2, 27.5);
BOOST_CHECK_EQUAL(ret1.d3, "test3");
BOOST_REQUIRE_EXCEPTION(
sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });

Data2 sent_t4{ 58, 28.5, "test4" };
Data2 sent_t4{ 58, 28.5 };
pub1_sender->send_with_topic(std::move(sent_t4), dunedaq::iomanager::Sender::s_no_block, "sub2_topic");
BOOST_REQUIRE_EXCEPTION(
sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
BOOST_REQUIRE_EXCEPTION(
sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });

Data2 sent_t5{ 59, 29.5, "test5" };
Data2 sent_t5{ 59, 29.5 };
pub2_sender->send_with_topic(std::move(sent_t5), dunedaq::iomanager::Sender::s_no_block, "sub2_topic");
BOOST_REQUIRE_EXCEPTION(
sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
auto ret2 = sub2_receiver->receive(std::chrono::milliseconds(10));
BOOST_CHECK_EQUAL(ret2.d1, 59);
BOOST_CHECK_EQUAL(ret2.d2, 29.5);
BOOST_CHECK_EQUAL(ret2.d3, "test5");
}

BOOST_FIXTURE_TEST_CASE(NonSerializableSendReceive, ConfigurationTestFixture)
Expand Down