Skip to content

Commit

Permalink
Merge pull request #56 from DUNE-DAQ/eflumerf/DontResolveConnect
Browse files Browse the repository at this point in the history
Only update connection strings if they contained a wildcard.
  • Loading branch information
plasorak authored Jun 8, 2023
2 parents d464f15 + 83935b9 commit d5a48a0
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 33 deletions.
37 changes: 32 additions & 5 deletions src/network/NetworkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "iomanager/connectioninfo/InfoNljs.hpp"

#include "utilities/Resolver.hpp"
#include "ipm/PluginInfo.hpp"
#include "logging/Logging.hpp"

Expand Down Expand Up @@ -84,8 +85,8 @@ NetworkManager::configure(const Connections_t& connections,
}
TLOG_DEBUG(17) << "ConnectionServer host and port are " << connectionServer << ":" << connectionPort;
m_config_client = std::make_unique<ConfigClient>(connectionServer, connectionPort, config_client_interval);
m_config_client_interval = config_client_interval;
}
m_config_client_interval = config_client_interval;
}

void
Expand Down Expand Up @@ -271,8 +272,21 @@ NetworkManager::create_receiver(std::vector<ConnectionInfo> connections, Connect
} else {
config_json["connection_string"] = connections[0].uri;
}
connections[0].uri = plugin->connect_for_receives(config_json);
TLOG_DEBUG(12) << "Receiver reports connected to URI " << connections[0].uri;
auto newCs = plugin->connect_for_receives(config_json);
TLOG_DEBUG(12) << "Receiver reports connected to URI " << newCs;

// Replace with resolved if there are wildcards (host and/or port)
if(connections[0].uri.find("*") != std::string::npos) {
auto newUri = utilities::parse_connection_string(newCs);
auto oldUri = utilities::parse_connection_string(connections[0].uri);

if (oldUri.port == "*")
oldUri.port = newUri.port;
if (oldUri.host == "*")
oldUri.host = newUri.host;

connections[0].uri = oldUri.to_string();
}

if (is_pubsub) {
TLOG_DEBUG(12) << "Subscribing to topic " << connections[0].data_type << " after connect_for_receives";
Expand Down Expand Up @@ -304,8 +318,21 @@ NetworkManager::create_sender(ConnectionInfo connection)
TLOG_DEBUG(11) << "Creating sender plugin of type " << plugin_type;
auto plugin = dunedaq::ipm::make_ipm_sender(plugin_type);
TLOG_DEBUG(11) << "Connecting sender plugin to " << connection.uri;
connection.uri = plugin->connect_for_sends({ { "connection_string", connection.uri } });
TLOG_DEBUG(11) << "Sender Plugin connected, reports URI " << connection.uri;
auto newCs = plugin->connect_for_sends({ { "connection_string", connection.uri } });
TLOG_DEBUG(11) << "Sender Plugin connected, reports URI " << newCs;

// Replace with resolved if there are wildcards (host and/or port)
if(connection.uri.find("*") != std::string::npos) {
auto newUri = utilities::parse_connection_string(newCs);
auto oldUri = utilities::parse_connection_string(connection.uri);

if (oldUri.port == "*")
oldUri.port = newUri.port;
if (oldUri.host == "*")
oldUri.host = newUri.host;

connection.uri = oldUri.to_string();
}

if (m_config_client != nullptr && is_pubsub) {
m_config_client->publish(connection);
Expand Down
42 changes: 14 additions & 28 deletions unittest/IOManager_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,11 @@ struct Data2
{
int d1;
double d2;
std::string d3;

Data2() = default;
Data2(int i, double d, std::string s)
Data2(int i, double d)
: d1(i)
, d2(d)
, d3(s)
{
}
virtual ~Data2() = default;
Expand All @@ -62,19 +60,15 @@ struct Data2
Data2(Data2&&) = default;
Data2& operator=(Data2&&) = default;

DUNE_DAQ_SERIALIZE(Data2, d1, d2, d3);
DUNE_DAQ_SERIALIZE(Data2, d1, d2);
};
struct Data3
{
int d1;
double d2;
std::string d3;

Data3() = default;
Data3(int i, double d, std::string s)
Data3(int i)
: d1(i)
, d2(d)
, d3(s)
{
}
virtual ~Data3() = default;
Expand All @@ -83,7 +77,7 @@ struct Data3
Data3(Data3&&) = default;
Data3& operator=(Data3&&) = default;

DUNE_DAQ_SERIALIZE(Data3, d1, d2, d3);
DUNE_DAQ_SERIALIZE(Data3, d1);
};

struct NonCopyableData
Expand Down Expand Up @@ -200,7 +194,7 @@ struct ConfigurationTestFixture
connections.emplace_back(Connection{ pub1_id, "inproc://bar", ConnectionType::kPubSub });
connections.emplace_back(Connection{ pub2_id, "inproc://baz", ConnectionType::kPubSub });
connections.emplace_back(Connection{ pub3_id, "inproc://qui", ConnectionType::kPubSub });
IOManager::get()->configure(queues, connections, false, 0ms); // Not using connectivity service
IOManager::get()->configure(queues, connections, false, 1000ms); // Not using connectivity service
}
~ConfigurationTestFixture() { IOManager::get()->reset(); }

Expand Down Expand Up @@ -284,7 +278,7 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture)
auto sub3_receiver = IOManager::get()->get_receiver<Data3>(sub3_id);

// Sub1 is subscribed to all data_t publishers, Sub2 only to pub2, Sub3 to all data2_t
Data2 sent_t1(56, 26.5, "test1");
Data2 sent_t1(56, 26.5);
pub1_sender->send(std::move(sent_t1), dunedaq::iomanager::Sender::s_no_block);

auto ret1 = sub1_receiver->receive(std::chrono::milliseconds(10));
Expand All @@ -294,9 +288,8 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture)
sub3_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
BOOST_CHECK_EQUAL(ret1.d1, 56);
BOOST_CHECK_EQUAL(ret1.d2, 26.5);
BOOST_CHECK_EQUAL(ret1.d3, "test1");

Data2 sent_t2(57, 27.5, "test2");
Data2 sent_t2(57, 27.5);
pub2_sender->send(std::move(sent_t2), dunedaq::iomanager::Sender::s_no_block);

ret1 = sub1_receiver->receive(std::chrono::milliseconds(10));
Expand All @@ -305,12 +298,10 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture)
sub3_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
BOOST_CHECK_EQUAL(ret1.d1, 57);
BOOST_CHECK_EQUAL(ret1.d2, 27.5);
BOOST_CHECK_EQUAL(ret1.d3, "test2");
BOOST_CHECK_EQUAL(ret2.d1, 57);
BOOST_CHECK_EQUAL(ret2.d2, 27.5);
BOOST_CHECK_EQUAL(ret2.d3, "test2");

Data3 sent_t3(58, 28.5, "test3");
Data3 sent_t3(58);
pub3_sender->send(std::move(sent_t3), dunedaq::iomanager::Sender::s_no_block);

BOOST_REQUIRE_EXCEPTION(
Expand All @@ -319,8 +310,6 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture)
sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
auto ret3 = sub3_receiver->receive(std::chrono::milliseconds(10));
BOOST_CHECK_EQUAL(ret3.d1, 58);
BOOST_CHECK_EQUAL(ret3.d2, 28.5);
BOOST_CHECK_EQUAL(ret3.d3, "test3");
}

BOOST_FIXTURE_TEST_CASE(PubSubWithTopic, ConfigurationTestFixture)
Expand All @@ -334,54 +323,51 @@ BOOST_FIXTURE_TEST_CASE(PubSubWithTopic, ConfigurationTestFixture)
sub2_receiver->subscribe("sub2_topic");

// Sub1 is subscribed to all data_t publishers, Sub2 only to pub2
Data2 sent_t0(54, 24.5, "test0");
Data2 sent_t0(54, 24.5);
pub1_sender->send(std::move(sent_t0), dunedaq::iomanager::Sender::s_no_block);
auto ret1 = sub1_receiver->receive(std::chrono::milliseconds(10));
BOOST_CHECK_EQUAL(ret1.d1, 54);
BOOST_CHECK_EQUAL(ret1.d2, 24.5);
BOOST_CHECK_EQUAL(ret1.d3, "test0");
BOOST_REQUIRE_EXCEPTION(
sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });

sub1_receiver->unsubscribe("data2_t");
Data2 sent_t1(55, 25.5, "test1");
Data2 sent_t1(55, 25.5);
pub1_sender->send(std::move(sent_t1), dunedaq::iomanager::Sender::s_no_block);
BOOST_REQUIRE_EXCEPTION(
sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
BOOST_REQUIRE_EXCEPTION(
sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });

Data2 sent_t2(56, 26.5, "test2");
Data2 sent_t2(56, 26.5);
pub1_sender->send_with_topic(std::move(sent_t2), dunedaq::iomanager::Sender::s_no_block, "test_topic");
BOOST_REQUIRE_EXCEPTION(
sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
BOOST_REQUIRE_EXCEPTION(
sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });

Data2 sent_t3{ 57, 27.5, "test3" };
Data2 sent_t3{ 57, 27.5};
pub1_sender->send_with_topic(std::move(sent_t3), dunedaq::iomanager::Sender::s_no_block, "sub1_topic");
ret1 = sub1_receiver->receive(std::chrono::milliseconds(10));
BOOST_CHECK_EQUAL(ret1.d1, 57);
BOOST_CHECK_EQUAL(ret1.d2, 27.5);
BOOST_CHECK_EQUAL(ret1.d3, "test3");
BOOST_REQUIRE_EXCEPTION(
sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });

Data2 sent_t4{ 58, 28.5, "test4" };
Data2 sent_t4{ 58, 28.5 };
pub1_sender->send_with_topic(std::move(sent_t4), dunedaq::iomanager::Sender::s_no_block, "sub2_topic");
BOOST_REQUIRE_EXCEPTION(
sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
BOOST_REQUIRE_EXCEPTION(
sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });

Data2 sent_t5{ 59, 29.5, "test5" };
Data2 sent_t5{ 59, 29.5 };
pub2_sender->send_with_topic(std::move(sent_t5), dunedaq::iomanager::Sender::s_no_block, "sub2_topic");
BOOST_REQUIRE_EXCEPTION(
sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
auto ret2 = sub2_receiver->receive(std::chrono::milliseconds(10));
BOOST_CHECK_EQUAL(ret2.d1, 59);
BOOST_CHECK_EQUAL(ret2.d2, 29.5);
BOOST_CHECK_EQUAL(ret2.d3, "test5");
}

BOOST_FIXTURE_TEST_CASE(NonSerializableSendReceive, ConfigurationTestFixture)
Expand Down

0 comments on commit d5a48a0

Please sign in to comment.