From a7b495def3e1b2dbf4aeb8d343050fb8a8674fbf Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Mon, 29 Jul 2024 14:46:35 +0200 Subject: [PATCH 01/18] Initial progress towards no jsonnet --- CMakeLists.txt | 2 +- include/iomanager/network/NetworkManager.hpp | 3 +- include/iomanager/queue/Queue.hpp | 2 -- include/iomanager/queue/QueueBase.hpp | 31 +++++++++++--------- include/iomanager/queue/QueueRegistry.hpp | 4 --- schema/iomanager/info/QueueInfo.proto | 10 +++++++ schema/iomanager/queueinfo.jsonnet | 19 ------------ src/network/NetworkManager.cpp | 20 ------------- src/queue/QueueRegistry.cpp | 12 -------- unittest/QueueRegistry_test.cxx | 11 ------- 10 files changed, 29 insertions(+), 85 deletions(-) create mode 100644 schema/iomanager/info/QueueInfo.proto delete mode 100644 schema/iomanager/queueinfo.jsonnet diff --git a/CMakeLists.txt b/CMakeLists.txt index 3f7ce78..5a95f4f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,7 +16,7 @@ find_package(utilities REQUIRED) set(IOMANAGER_DEPENDENCIES serialization::serialization Folly::folly utilities::utilities opmonlib::opmonlib ipm::ipm) daq_codegen(connection.jsonnet TEMPLATES Structs.hpp.j2 Nljs.hpp.j2 ) -daq_codegen(connectioninfo.jsonnet queueinfo.jsonnet DEP_PKGS opmonlib TEMPLATES opmonlib/InfoStructs.hpp.j2 opmonlib/InfoNljs.hpp.j2 ) +daq_protobuf_codegen( info/*.proto ) daq_add_library(IOManager.cpp queue/QueueRegistry.cpp network/NetworkManager.cpp network/ConfigClient.cpp LINK_LIBRARIES ${IOMANAGER_DEPENDENCIES} ) diff --git a/include/iomanager/network/NetworkManager.hpp b/include/iomanager/network/NetworkManager.hpp index 8140b31..2edeb2d 100644 --- a/include/iomanager/network/NetworkManager.hpp +++ b/include/iomanager/network/NetworkManager.hpp @@ -17,7 +17,7 @@ #include "ipm/Receiver.hpp" #include "ipm/Sender.hpp" #include "ipm/Subscriber.hpp" -#include "opmonlib/InfoCollector.hpp" +#include "opmonlib/MonitorableObject.hpp" #include #include @@ -41,7 +41,6 @@ class NetworkManager static NetworkManager& get(); ~NetworkManager() { reset(); } - void gather_stats(opmonlib::InfoCollector& ci, int /*level*/); void configure(const Connections_t& connections, bool use_config_client, std::chrono::milliseconds config_client_interval); void reset(); diff --git a/include/iomanager/queue/Queue.hpp b/include/iomanager/queue/Queue.hpp index ca03ebc..6a3a89d 100644 --- a/include/iomanager/queue/Queue.hpp +++ b/include/iomanager/queue/Queue.hpp @@ -16,8 +16,6 @@ #include "iomanager/queue/QueueBase.hpp" -#include "opmonlib/InfoCollector.hpp" - #include "ers/Issue.hpp" #include diff --git a/include/iomanager/queue/QueueBase.hpp b/include/iomanager/queue/QueueBase.hpp index 7b2d240..c76c344 100644 --- a/include/iomanager/queue/QueueBase.hpp +++ b/include/iomanager/queue/QueueBase.hpp @@ -15,10 +15,10 @@ #ifndef IOMANAGER_INCLUDE_IOMANAGER_QUEUEBASE_HPP_ #define IOMANAGER_INCLUDE_IOMANAGER_QUEUEBASE_HPP_ -#include "iomanager/queueinfo/InfoNljs.hpp" #include "utilities/NamedObject.hpp" -#include "opmonlib/InfoCollector.hpp" +#include "opmonlib/MonitorableObject.hpp" +#include "iomanager/info/QueueInfo.pb.h" #include "ers/Issue.hpp" @@ -35,7 +35,7 @@ namespace dunedaq::iomanager { * @brief The QueueBase class allows to address generic behavior of any Queue implementation * */ -class QueueBase : public utilities::NamedObject + class QueueBase : public utilities::NamedObject, public opmonlib::MonitorableObject { public: /** @@ -46,17 +46,6 @@ class QueueBase : public utilities::NamedObject : utilities::NamedObject(name) {} - /** - * @brief Method to retrieve information (occupancy) from - * queues. - */ - void get_info(opmonlib::InfoCollector& ci, int /*level*/) - { - queueinfo::Info info; - info.capacity = this->get_capacity(); - info.number_of_elements = this->get_num_elements(); - ci.add(info); - } /** * @brief Get the capacity (max size) of the queue @@ -66,6 +55,20 @@ class QueueBase : public utilities::NamedObject virtual size_t get_num_elements() const = 0; + +protected: + /** + * @brief Method to retrieve information (occupancy) from + * queues. + */ + void generate_opmon_data() override + { + opmon::QueueInfo info; + info.set_capacity(this->get_capacity()); + info.set_number_of_elements(this->get_num_elements()); + publish(std::move(info)); + } + private: QueueBase(const QueueBase&) = delete; QueueBase& operator=(const QueueBase&) = delete; diff --git a/include/iomanager/queue/QueueRegistry.hpp b/include/iomanager/queue/QueueRegistry.hpp index 025fe3a..4b0df71 100644 --- a/include/iomanager/queue/QueueRegistry.hpp +++ b/include/iomanager/queue/QueueRegistry.hpp @@ -16,7 +16,6 @@ #include "iomanager/queue/QueueIssues.hpp" #include "ers/Issue.hpp" -#include "opmonlib/InfoCollector.hpp" #include #include @@ -58,9 +57,6 @@ class QueueRegistry */ void configure(const std::vector& configs); - // Gather statistics from queues - void gather_stats(opmonlib::InfoCollector& ic, int level); - // ONLY TO BE USED FOR TESTING! static void reset() { s_instance.reset(nullptr); } diff --git a/schema/iomanager/info/QueueInfo.proto b/schema/iomanager/info/QueueInfo.proto new file mode 100644 index 0000000..e5fa24a --- /dev/null +++ b/schema/iomanager/info/QueueInfo.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + + +package dunedaq.iomanager.opmon; + +message QueueInfo { + + uint64 capacity = 1; + uint64 number_of_elements = 2; +} \ No newline at end of file diff --git a/schema/iomanager/queueinfo.jsonnet b/schema/iomanager/queueinfo.jsonnet deleted file mode 100644 index 60ec2fe..0000000 --- a/schema/iomanager/queueinfo.jsonnet +++ /dev/null @@ -1,19 +0,0 @@ -// This is the queue info schema used by the queue application. -// It describes the information object structure passed by the application -// for operational monitoring - -local moo = import "moo.jsonnet"; -local s = moo.oschema.schema("dunedaq.iomanager.queueinfo"); - -local info = { - string : s.string("string", doc="Generate proper includes in InfoStructs"), - uint8 : s.number("uint8", "u8", - doc="An unsigned of 8 bytes used for counters"), - - info: s.record("Info", [ - s.field("capacity", self.uint8, 0, doc="Maximum queue capacity" ), - s.field("number_of_elements", self.uint8, 0, doc="Elements in the queue" ) - ], doc="General Queue information") -}; - -moo.oschema.sort_select(info) diff --git a/src/network/NetworkManager.cpp b/src/network/NetworkManager.cpp index 17080bd..863c97c 100755 --- a/src/network/NetworkManager.cpp +++ b/src/network/NetworkManager.cpp @@ -9,8 +9,6 @@ #include "iomanager/network/NetworkManager.hpp" #include "iomanager/SchemaUtils.hpp" -#include "iomanager/connectioninfo/InfoNljs.hpp" - #include "ipm/PluginInfo.hpp" #include "logging/Logging.hpp" #include "utilities/Resolver.hpp" @@ -33,24 +31,6 @@ NetworkManager::get() return *s_instance; } -void -NetworkManager::gather_stats(opmonlib::InfoCollector& ci, int level) -{ - - for (auto& sender : m_sender_plugins) { - opmonlib::InfoCollector tmp_ic; - if(sender.second == nullptr) continue; - sender.second->get_info(tmp_ic, level); - ci.add(sender.first.uid, tmp_ic); - } - - for (auto& receiver : m_receiver_plugins) { - opmonlib::InfoCollector tmp_ic; - if(receiver.second == nullptr) continue; - receiver.second->get_info(tmp_ic, level); - ci.add(receiver.first.uid, tmp_ic); - } -} void NetworkManager::configure(const Connections_t& connections, diff --git a/src/queue/QueueRegistry.cpp b/src/queue/QueueRegistry.cpp index 6a76ffa..77bef7a 100644 --- a/src/queue/QueueRegistry.cpp +++ b/src/queue/QueueRegistry.cpp @@ -38,18 +38,6 @@ QueueRegistry::configure(const std::vector& configs) m_configured = true; } -void -QueueRegistry::gather_stats(opmonlib::InfoCollector& ic, int level) -{ - - for (const auto& [name, queue_entry] : m_queue_registry) { - opmonlib::InfoCollector tmp_ci; - queue_entry.m_instance->get_info(tmp_ci, level); - if (!tmp_ci.is_empty()) { - ic.add(name, tmp_ci); - } - } -} bool QueueRegistry::has_queue(const std::string& uid, const std::string& data_type) const diff --git a/unittest/QueueRegistry_test.cxx b/unittest/QueueRegistry_test.cxx index 6a5cc0e..65b5970 100755 --- a/unittest/QueueRegistry_test.cxx +++ b/unittest/QueueRegistry_test.cxx @@ -48,17 +48,6 @@ BOOST_AUTO_TEST_CASE(Configure) [&](QueueRegistryConfigured const&) { return true; }); } -BOOST_AUTO_TEST_CASE(GatherStats) -{ - dunedaq::opmonlib::InfoCollector ic; - QueueRegistry::get().gather_stats(ic, 1); - BOOST_REQUIRE(ic.is_empty()); - - auto queue_ptr = QueueRegistry::get().get_queue("test_queue_stddeque"); - BOOST_REQUIRE(queue_ptr != nullptr); - QueueRegistry::get().gather_stats(ic, 1); - BOOST_REQUIRE(!ic.is_empty()); -} BOOST_AUTO_TEST_CASE(CreateQueue) { From ac2a38404cd7fe75d559c3264f308bcaca8d04bb Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Mon, 29 Jul 2024 15:05:58 +0200 Subject: [PATCH 02/18] Update --- schema/iomanager/connectioninfo.jsonnet | 20 -------------------- schema/iomanager/info/ConnectionInfo.proto | 14 ++++++++++++++ 2 files changed, 14 insertions(+), 20 deletions(-) delete mode 100644 schema/iomanager/connectioninfo.jsonnet create mode 100644 schema/iomanager/info/ConnectionInfo.proto diff --git a/schema/iomanager/connectioninfo.jsonnet b/schema/iomanager/connectioninfo.jsonnet deleted file mode 100644 index 555d75d..0000000 --- a/schema/iomanager/connectioninfo.jsonnet +++ /dev/null @@ -1,20 +0,0 @@ -// This is the application info schema used by the network manager. -// It describes the information object structure passed by the application -// for operational monitoring - -local moo = import "moo.jsonnet"; -local s = moo.oschema.schema("dunedaq.iomanager.connectioninfo"); - -local info = { - - count : s.number("count", "u8", doc="An unsigned of 8 bytes"), - - info: s.record("Info", [ - s.field("sent_bytes", self.count, 0, doc="Bytes sent via a connection of the networkmanager"), - s.field("received_bytes", self.count, 0, doc="Bytes received via a connection of the networkmanager"), - s.field("sent_messages", self.count, 0, doc="Messages sent via a connection of the networkmanager"), - s.field("received_messages", self.count, 0, doc="Messages received via a connection of the networkmanager") - ], doc="IOManager information") -}; - -moo.oschema.sort_select(info) diff --git a/schema/iomanager/info/ConnectionInfo.proto b/schema/iomanager/info/ConnectionInfo.proto new file mode 100644 index 0000000..e1e101b --- /dev/null +++ b/schema/iomanager/info/ConnectionInfo.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + + +package dunedaq.iomanager.opmon; + +message ConnectionInfo { + + uint64 sent_bytes = 1; + uint64 received_bytes = 2; + + uint64 sent_messages = 11; + uint64 received_messages = 12; + +} \ No newline at end of file From 808013bdd860b776217c61242cd70c2b467ab8df Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Mon, 29 Jul 2024 15:06:33 +0200 Subject: [PATCH 03/18] correct metrics --- schema/iomanager/info/ConnectionInfo.proto | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 schema/iomanager/info/ConnectionInfo.proto diff --git a/schema/iomanager/info/ConnectionInfo.proto b/schema/iomanager/info/ConnectionInfo.proto deleted file mode 100644 index e1e101b..0000000 --- a/schema/iomanager/info/ConnectionInfo.proto +++ /dev/null @@ -1,14 +0,0 @@ -syntax = "proto3"; - - -package dunedaq.iomanager.opmon; - -message ConnectionInfo { - - uint64 sent_bytes = 1; - uint64 received_bytes = 2; - - uint64 sent_messages = 11; - uint64 received_messages = 12; - -} \ No newline at end of file From 9b234dafd6490985a682b64326062833690c90c4 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 30 Jul 2024 11:48:15 +0200 Subject: [PATCH 04/18] Propagation of OpMonManager to queues --- include/iomanager/IOManager.hpp | 4 +++- include/iomanager/queue/QueueRegistry.hpp | 6 ++++-- include/iomanager/queue/detail/QueueRegistry.hxx | 2 ++ src/IOManager.cpp | 5 +++-- src/queue/QueueRegistry.cpp | 5 ++++- test/apps/iomanager_stress_test.cxx | 11 ++++++++--- test/apps/iomanager_stress_test_pubsub.cxx | 10 +++++++--- test/apps/queues_vs_threads_iomanager.cxx | 4 +++- test/apps/reconnection_test.cxx | 3 ++- unittest/IOManager_test.cxx | 4 +++- unittest/QueueRegistry_test.cxx | 5 +++-- unittest/performance_test.cxx | 4 +++- 12 files changed, 45 insertions(+), 18 deletions(-) diff --git a/include/iomanager/IOManager.hpp b/include/iomanager/IOManager.hpp index f0f6d87..fa40d91 100644 --- a/include/iomanager/IOManager.hpp +++ b/include/iomanager/IOManager.hpp @@ -12,6 +12,7 @@ #include "iomanager/Receiver.hpp" #include "iomanager/Sender.hpp" #include "iomanager/connection/Structs.hpp" +#include "opmonlib/OpMonManager.hpp" #include #include @@ -50,7 +51,8 @@ class IOManager void configure(Queues_t queues, Connections_t connections, bool use_config_client, - std::chrono::milliseconds config_client_interval); + std::chrono::milliseconds config_client_interval, + opmonlib::OpMonManager &); void reset(); diff --git a/include/iomanager/queue/QueueRegistry.hpp b/include/iomanager/queue/QueueRegistry.hpp index 4b0df71..0e48789 100644 --- a/include/iomanager/queue/QueueRegistry.hpp +++ b/include/iomanager/queue/QueueRegistry.hpp @@ -14,6 +14,7 @@ #include "iomanager/SchemaUtils.hpp" #include "iomanager/queue/Queue.hpp" #include "iomanager/queue/QueueIssues.hpp" +#include "opmonlib/OpMonManager.hpp" #include "ers/Issue.hpp" @@ -55,7 +56,7 @@ class QueueRegistry * @brief Configure the QueueRegistry * @param configs Queue configurations */ - void configure(const std::vector& configs); + void configure(const std::vector& configs, opmonlib::OpMonManager &); // ONLY TO BE USED FOR TESTING! static void reset() { s_instance.reset(nullptr); } @@ -79,7 +80,8 @@ class QueueRegistry std::map m_queue_registry; std::vector m_queue_configs; - + std::shared_ptr m_opmon_link{ std::make_shared() }; + bool m_configured{ false }; static std::unique_ptr s_instance; diff --git a/include/iomanager/queue/detail/QueueRegistry.hxx b/include/iomanager/queue/detail/QueueRegistry.hxx index c4ef951..8848042 100644 --- a/include/iomanager/queue/detail/QueueRegistry.hxx +++ b/include/iomanager/queue/detail/QueueRegistry.hxx @@ -67,6 +67,8 @@ QueueRegistry::create_queue(const QueueConfig& config) throw QueueTypeUnknown(ERS_HERE, str(config.queue_type)); } + m_opmon_link->register_child(config.id.uid, queue); + return queue; } diff --git a/src/IOManager.cpp b/src/IOManager.cpp index ce4afab..999162e 100755 --- a/src/IOManager.cpp +++ b/src/IOManager.cpp @@ -16,7 +16,8 @@ void dunedaq::iomanager::IOManager::configure(Queues_t queues, Connections_t connections, bool use_config_client, - std::chrono::milliseconds config_client_interval) + std::chrono::milliseconds config_client_interval, + dunedaq::opmonlib::OpMonManager & opmgr) { char* session = getenv("DUNEDAQ_SESSION"); if (session) { @@ -37,7 +38,7 @@ dunedaq::iomanager::IOManager::configure(Queues_t queues, nwCfg.push_back(connection); } - QueueRegistry::get().configure(qCfg); + QueueRegistry::get().configure(qCfg, opmgr); NetworkManager::get().configure(nwCfg, use_config_client, config_client_interval); } diff --git a/src/queue/QueueRegistry.cpp b/src/queue/QueueRegistry.cpp index 77bef7a..56de1d5 100644 --- a/src/queue/QueueRegistry.cpp +++ b/src/queue/QueueRegistry.cpp @@ -28,13 +28,16 @@ QueueRegistry::get() } void -QueueRegistry::configure(const std::vector& configs) +QueueRegistry::configure(const std::vector& configs, opmonlib::OpMonManager & mgr) { if (m_configured) { throw QueueRegistryConfigured(ERS_HERE); } m_queue_configs = configs; + + mgr.register_child("queues", m_opmon_link); + m_configured = true; } diff --git a/test/apps/iomanager_stress_test.cxx b/test/apps/iomanager_stress_test.cxx index 97429f9..fd7ad68 100644 --- a/test/apps/iomanager_stress_test.cxx +++ b/test/apps/iomanager_stress_test.cxx @@ -121,8 +121,13 @@ struct TestConfig conn_addr, ConnectionType::kPubSub }); } - IOManager::get()->configure( - queues, connections, use_connectivity_service, std::chrono::milliseconds(publish_interval)); + + dunedaq::opmonlib::OpMonManager op_manager(nullptr); + IOManager::get()->configure( queues, + connections, + use_connectivity_service, + std::chrono::milliseconds(publish_interval), + op_manager); } }; struct ReceiverTest @@ -551,4 +556,4 @@ main(int argc, char* argv[]) << ")"; } } -}; \ No newline at end of file +}; diff --git a/test/apps/iomanager_stress_test_pubsub.cxx b/test/apps/iomanager_stress_test_pubsub.cxx index ae61c0d..ad4370c 100644 --- a/test/apps/iomanager_stress_test_pubsub.cxx +++ b/test/apps/iomanager_stress_test_pubsub.cxx @@ -162,8 +162,12 @@ struct TestConfig } } - IOManager::get()->configure( - queues, connections, use_connectivity_service, std::chrono::milliseconds(publish_interval)); + dunedaq::opmonlib::OpMonManager op_manager(nullptr); + IOManager::get()->configure( queues, + connections, + use_connectivity_service, + std::chrono::milliseconds(publish_interval), + op_manager); } }; struct SubscriberTest @@ -667,4 +671,4 @@ main(int argc, char* argv[]) << ")"; } } -}; \ No newline at end of file +}; diff --git a/test/apps/queues_vs_threads_iomanager.cxx b/test/apps/queues_vs_threads_iomanager.cxx index 4f5e5d2..9bbb976 100644 --- a/test/apps/queues_vs_threads_iomanager.cxx +++ b/test/apps/queues_vs_threads_iomanager.cxx @@ -82,7 +82,9 @@ int main(int , char** ) for (int i = 0; i < max_n_threads; ++i) { queues.emplace_back(dunedaq::iomanager::QueueConfig{ {"bar" + std::to_string(i),"int_t"}, dunedaq::iomanager::QueueType::kFollyMPMCQueue, 1000 }); } - dunedaq::get_iomanager()->configure(queues, connections, false, 1000ms); + + dunedaq::opmonlib::OpMonManager opmgr(nullptr); + dunedaq::get_iomanager()->configure(queues, connections, false, 1000ms, opmgr); // Create all of the queues up front so the iomanager output is all in one place and not interspersed with the results for (int i = 0; i < max_n_threads; ++i) { diff --git a/test/apps/reconnection_test.cxx b/test/apps/reconnection_test.cxx index c427cd7..26f135e 100755 --- a/test/apps/reconnection_test.cxx +++ b/test/apps/reconnection_test.cxx @@ -84,7 +84,8 @@ struct TestConfig ConnectionType::kSendRecv }; connections.push_back(recv_conn); - IOManager::get()->configure(queues, connections, true, std::chrono::milliseconds(publish_interval)); + dunedaq::opmonlib::OpMonManager opmgr(nullptr); + IOManager::get()->configure(queues, connections, true, std::chrono::milliseconds(publish_interval), opmgr); } void send_message(uint8_t msg_idx) diff --git a/unittest/IOManager_test.cxx b/unittest/IOManager_test.cxx index dddbedb..6de6492 100755 --- a/unittest/IOManager_test.cxx +++ b/unittest/IOManager_test.cxx @@ -194,7 +194,9 @@ struct ConfigurationTestFixture connections.emplace_back(Connection{ pub1_id, "inproc://bar", ConnectionType::kPubSub }); connections.emplace_back(Connection{ pub2_id, "inproc://baz", ConnectionType::kPubSub }); connections.emplace_back(Connection{ pub3_id, "inproc://qui", ConnectionType::kPubSub }); - IOManager::get()->configure(queues, connections, false, 1000ms); // Not using connectivity service + + dunedaq::opmonlib::OpMonManager opmgr(nullptr); + IOManager::get()->configure(queues, connections, false, 1000ms, opmgr); // Not using connectivity service } ~ConfigurationTestFixture() { IOManager::get()->reset(); } diff --git a/unittest/QueueRegistry_test.cxx b/unittest/QueueRegistry_test.cxx index 65b5970..16a57c3 100755 --- a/unittest/QueueRegistry_test.cxx +++ b/unittest/QueueRegistry_test.cxx @@ -41,9 +41,10 @@ BOOST_AUTO_TEST_CASE(Configure) qc.id.uid = "test_queue_fmpmc"; queue_registry_config.push_back(qc); - QueueRegistry::get().configure(queue_registry_config); + dunedaq::opmonlib::OpMonManager opmgr(nullptr); + QueueRegistry::get().configure(queue_registry_config, opmgr); - BOOST_REQUIRE_EXCEPTION(QueueRegistry::get().configure(queue_registry_config), + BOOST_REQUIRE_EXCEPTION(QueueRegistry::get().configure(queue_registry_config, opmgr), QueueRegistryConfigured, [&](QueueRegistryConfigured const&) { return true; }); } diff --git a/unittest/performance_test.cxx b/unittest/performance_test.cxx index 3bac716..ad4dfdb 100644 --- a/unittest/performance_test.cxx +++ b/unittest/performance_test.cxx @@ -55,7 +55,9 @@ struct ConfigurationTestFixture dunedaq::iomanager::Connections_t connections; connections.emplace_back(Connection{ network_id, "inproc://foo", ConnectionType::kSendRecv }); - IOManager::get()->configure(queues, connections, false, 0ms); // Not using connectivity service + + dunedaq::opmonlib::OpMonManager opmgr(nullptr); + IOManager::get()->configure(queues, connections, false, 0ms, opmgr); // Not using connectivity service } ~ConfigurationTestFixture() { IOManager::get()->reset(); } From 42609ac8ca836323a5b096c43778ed6d84207bf4 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 30 Jul 2024 12:10:47 +0200 Subject: [PATCH 05/18] Propagate manager to connections --- include/iomanager/network/NetworkManager.hpp | 6 +++-- src/IOManager.cpp | 2 +- src/network/NetworkManager.cpp | 5 ++++- unittest/NetworkManager_test.cxx | 23 ++++++++++++++------ 4 files changed, 25 insertions(+), 11 deletions(-) diff --git a/include/iomanager/network/NetworkManager.hpp b/include/iomanager/network/NetworkManager.hpp index 2edeb2d..dd86395 100644 --- a/include/iomanager/network/NetworkManager.hpp +++ b/include/iomanager/network/NetworkManager.hpp @@ -17,7 +17,7 @@ #include "ipm/Receiver.hpp" #include "ipm/Sender.hpp" #include "ipm/Subscriber.hpp" -#include "opmonlib/MonitorableObject.hpp" +#include "opmonlib/OpMonManager.hpp" #include #include @@ -41,7 +41,8 @@ class NetworkManager static NetworkManager& get(); ~NetworkManager() { reset(); } - void configure(const Connections_t& connections, bool use_config_client, std::chrono::milliseconds config_client_interval); + void configure(const Connections_t& connections, bool use_config_client, std::chrono::milliseconds config_client_interval, + dunedaq::opmonlib::OpMonManager &); void reset(); std::shared_ptr get_receiver(ConnectionId const& conn_id); @@ -74,6 +75,7 @@ class NetworkManager std::unordered_map m_preconfigured_connections; std::unordered_map> m_receiver_plugins; std::unordered_map> m_sender_plugins; + std::shared_ptr m_opmon_link{ std::make_shared() }; std::unordered_map> m_subscriber_plugins; std::unique_ptr m_subscriber_update_thread; diff --git a/src/IOManager.cpp b/src/IOManager.cpp index 999162e..c7b87c4 100755 --- a/src/IOManager.cpp +++ b/src/IOManager.cpp @@ -39,7 +39,7 @@ dunedaq::iomanager::IOManager::configure(Queues_t queues, } QueueRegistry::get().configure(qCfg, opmgr); - NetworkManager::get().configure(nwCfg, use_config_client, config_client_interval); + NetworkManager::get().configure(nwCfg, use_config_client, config_client_interval, opmgr); } void diff --git a/src/network/NetworkManager.cpp b/src/network/NetworkManager.cpp index 863c97c..3b88b2a 100755 --- a/src/network/NetworkManager.cpp +++ b/src/network/NetworkManager.cpp @@ -35,7 +35,8 @@ NetworkManager::get() void NetworkManager::configure(const Connections_t& connections, bool use_config_client, - std::chrono::milliseconds config_client_interval) + std::chrono::milliseconds config_client_interval, + dunedaq::opmonlib::OpMonManager & opmgr) { if (!m_preconfigured_connections.empty()) { throw AlreadyConfigured(ERS_HERE); @@ -69,6 +70,8 @@ NetworkManager::configure(const Connections_t& connections, m_config_client = std::make_unique(connectionServer, connectionPort, config_client_interval); } m_config_client_interval = config_client_interval; + + opmgr.register_child( "connections", m_opmon_link); } void diff --git a/unittest/NetworkManager_test.cxx b/unittest/NetworkManager_test.cxx index b9b8443..747ee1a 100644 --- a/unittest/NetworkManager_test.cxx +++ b/unittest/NetworkManager_test.cxx @@ -63,7 +63,9 @@ struct NetworkManagerTestFixture testConn.id.data_type = "words"; testConn.uri = "inproc:/oof"; testConfig.push_back(testConn); - NetworkManager::get().configure(testConfig, false, 0ms); // Not using ConfigClient + + dunedaq::opmonlib::OpMonManager opmgr(nullptr); + NetworkManager::get().configure(testConfig, false, 0ms, opmgr); // Not using ConfigClient } ~NetworkManagerTestFixture() { NetworkManager::get().reset(); } @@ -143,12 +145,16 @@ BOOST_FIXTURE_TEST_CASE(FakeConfigure, NetworkManagerTestFixture) testConn.uri = "inproc://rab"; testConn.connection_type = ConnectionType::kSendRecv; testConfig.push_back(testConn); - BOOST_REQUIRE_EXCEPTION( - NetworkManager::get().configure(testConfig, false, 0ms), AlreadyConfigured, [&](AlreadyConfigured const&) { return true; }); + dunedaq::opmonlib::OpMonManager opmgr(nullptr); + BOOST_REQUIRE_EXCEPTION( NetworkManager::get().configure(testConfig, + false, + 0ms, + opmgr), + AlreadyConfigured, [&](AlreadyConfigured const&) { return true; }); NetworkManager::get().reset(); - NetworkManager::get().configure(testConfig, false, 0ms); + NetworkManager::get().configure(testConfig, false, 0ms, opmgr); conn_res = NetworkManager::get().get_preconfigured_connections(id_notfound); BOOST_REQUIRE_EQUAL(conn_res.connections.size(), 1); conn_res = NetworkManager::get().get_preconfigured_connections(sendRecvConnId); @@ -161,8 +167,11 @@ BOOST_FIXTURE_TEST_CASE(NameCollisionInConfiguration, NetworkManagerTestFixture) Connections_t testConfig; testConfig.emplace_back(Connection{ sendRecvConnId, "inproc://foo", ConnectionType::kSendRecv }); testConfig.emplace_back(Connection{ sendRecvConnId, "inproc://bar", ConnectionType::kSendRecv }); - BOOST_REQUIRE_EXCEPTION( - NetworkManager::get().configure(testConfig, false, 0ms), NameCollision, [&](NameCollision const&) { return true; }); + dunedaq::opmonlib::OpMonManager opmgr(nullptr); + BOOST_REQUIRE_EXCEPTION(NetworkManager::get().configure(testConfig, + false, + 0ms, opmgr), + NameCollision, [&](NameCollision const&) { return true; }); } BOOST_FIXTURE_TEST_CASE(GetDatatypes, NetworkManagerTestFixture) @@ -184,4 +193,4 @@ BOOST_AUTO_TEST_CASE(MakeIPMPlugins) {} BOOST_AUTO_TEST_SUITE_END() -#pragma GCC diagnostic pop \ No newline at end of file +#pragma GCC diagnostic pop From c618ef62cb7350db743509ab2b86a63632bced9b Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 30 Jul 2024 13:24:04 +0200 Subject: [PATCH 06/18] Register connections --- src/network/NetworkManager.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/network/NetworkManager.cpp b/src/network/NetworkManager.cpp index 3b88b2a..d7f56c6 100755 --- a/src/network/NetworkManager.cpp +++ b/src/network/NetworkManager.cpp @@ -299,6 +299,8 @@ NetworkManager::create_receiver(std::vector connections, Connect m_config_client->publish(connections[0]); } + m_opmon_link->register_child(conn_id.uid, plugin); + TLOG_DEBUG(12) << "END"; return plugin; } @@ -340,6 +342,8 @@ NetworkManager::create_sender(ConnectionInfo connection) m_config_client->publish(connection); } + m_opmon_link->register_child(connection.uid, plugin); + return plugin; } From aabcd29e6193d42aac42c4f2d8ee11c9330216cb Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 30 Jul 2024 16:09:26 +0200 Subject: [PATCH 07/18] Code for discussion with Giovanna tomorrow --- src/network/NetworkManager.cpp | 20 +++++++++++++++++++- unittest/IOManager_test.cxx | 5 +++-- unittest/performance_test.cxx | 6 ++++-- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/network/NetworkManager.cpp b/src/network/NetworkManager.cpp index d7f56c6..73dc4fe 100755 --- a/src/network/NetworkManager.cpp +++ b/src/network/NetworkManager.cpp @@ -299,8 +299,26 @@ NetworkManager::create_receiver(std::vector connections, Connect m_config_client->publish(connections[0]); } - m_opmon_link->register_child(conn_id.uid, plugin); + // MR: we need to check with Eric why we need this abomination + try { + m_opmon_link->register_child(conn_id.uid, plugin); + } catch (const ers::Issue & e) { + ers::error(e); + bool success = false; + size_t counter = 0; + do { + try { + auto name = conn_id.uid + '_' + std::to_string(0); + m_opmon_link->register_child(name, plugin); + success = true; + } catch ( const ers::Issue & err ) { + ers::error(err); + ++counter; + } + } while (! success); + } + TLOG_DEBUG(12) << "END"; return plugin; } diff --git a/unittest/IOManager_test.cxx b/unittest/IOManager_test.cxx index 6de6492..0a00bfb 100755 --- a/unittest/IOManager_test.cxx +++ b/unittest/IOManager_test.cxx @@ -195,10 +195,9 @@ struct ConfigurationTestFixture connections.emplace_back(Connection{ pub2_id, "inproc://baz", ConnectionType::kPubSub }); connections.emplace_back(Connection{ pub3_id, "inproc://qui", ConnectionType::kPubSub }); - dunedaq::opmonlib::OpMonManager opmgr(nullptr); IOManager::get()->configure(queues, connections, false, 1000ms, opmgr); // Not using connectivity service } - ~ConfigurationTestFixture() { IOManager::get()->reset(); } + ~ConfigurationTestFixture() { IOManager::get()->reset(); opmgr.reset(); } ConfigurationTestFixture(ConfigurationTestFixture const&) = default; ConfigurationTestFixture(ConfigurationTestFixture&&) = default; @@ -215,6 +214,8 @@ struct ConfigurationTestFixture ConnectionId sub1_id; ConnectionId sub2_id; ConnectionId sub3_id; + + dunedaq::opmonlib::OpMonManager opmgr{nullptr}; }; BOOST_AUTO_TEST_CASE(CopyAndMoveSemantics) diff --git a/unittest/performance_test.cxx b/unittest/performance_test.cxx index ad4dfdb..85510e4 100644 --- a/unittest/performance_test.cxx +++ b/unittest/performance_test.cxx @@ -56,7 +56,6 @@ struct ConfigurationTestFixture dunedaq::iomanager::Connections_t connections; connections.emplace_back(Connection{ network_id, "inproc://foo", ConnectionType::kSendRecv }); - dunedaq::opmonlib::OpMonManager opmgr(nullptr); IOManager::get()->configure(queues, connections, false, 0ms, opmgr); // Not using connectivity service } ~ConfigurationTestFixture() { IOManager::get()->reset(); } @@ -66,6 +65,8 @@ struct ConfigurationTestFixture ConfigurationTestFixture& operator=(ConfigurationTestFixture const&) = delete; ConfigurationTestFixture& operator=(ConfigurationTestFixture&&) = delete; + dunedaq::opmonlib::OpMonManager opmgr{nullptr}; + dunedaq::iomanager::ConnectionId network_id; dunedaq::iomanager::ConnectionId queue_id; const size_t n_sends = 10000; @@ -82,7 +83,8 @@ BOOST_FIXTURE_TEST_CASE(CallbackRegistrationNetwork, ConfigurationTestFixture) auto start_time = std::chrono::steady_clock::now(); for (unsigned int i = 0; i < n_sends; ++i) { dunedaq::data_t temp(message_size, i % 200); - net_sender->send(std::move(temp), Sender::s_no_block); + // net_sender->send(std::move(temp), Sender::s_no_block); + net_sender->send(std::move(temp), std::chrono::milliseconds(100000)); } BOOST_TEST_MESSAGE("Messages sent, waiting for receives"); while (received_count < n_sends) { From 4de3cea3e180f66e57d3888c7adb5791aec9e267 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 1 Aug 2024 12:53:38 +0200 Subject: [PATCH 08/18] Fix unittest but we really need a check --- src/network/NetworkManager.cpp | 21 ++++++++++++++++++++- unittest/performance_test.cxx | 3 +-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/network/NetworkManager.cpp b/src/network/NetworkManager.cpp index 73dc4fe..d0c2fc4 100755 --- a/src/network/NetworkManager.cpp +++ b/src/network/NetworkManager.cpp @@ -360,7 +360,26 @@ NetworkManager::create_sender(ConnectionInfo connection) m_config_client->publish(connection); } - m_opmon_link->register_child(connection.uid, plugin); + // MR: we need to check with Eric why we need this abomination + try { + m_opmon_link->register_child(connection.uid, plugin); + } catch (const ers::Issue & e) { + ers::error(e); + bool success = false; + size_t counter = 0; + do { + try { + auto name = connection.uid + '_' + std::to_string(0); + m_opmon_link->register_child(name, plugin); + success = true; + } catch ( const ers::Issue & err ) { + ers::error(err); + ++counter; + } + } while (! success); + } + + // m_opmon_link->register_child(connection.uid, plugin); return plugin; } diff --git a/unittest/performance_test.cxx b/unittest/performance_test.cxx index 85510e4..5d8598a 100644 --- a/unittest/performance_test.cxx +++ b/unittest/performance_test.cxx @@ -83,8 +83,7 @@ BOOST_FIXTURE_TEST_CASE(CallbackRegistrationNetwork, ConfigurationTestFixture) auto start_time = std::chrono::steady_clock::now(); for (unsigned int i = 0; i < n_sends; ++i) { dunedaq::data_t temp(message_size, i % 200); - // net_sender->send(std::move(temp), Sender::s_no_block); - net_sender->send(std::move(temp), std::chrono::milliseconds(100000)); + net_sender->send(std::move(temp), Sender::s_no_block); } BOOST_TEST_MESSAGE("Messages sent, waiting for receives"); while (received_count < n_sends) { From 95f4a20dea58805790b9ab4a0495ef1dbcab2533 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 1 Aug 2024 16:29:40 +0200 Subject: [PATCH 09/18] Remove reset --- unittest/IOManager_test.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unittest/IOManager_test.cxx b/unittest/IOManager_test.cxx index 0a00bfb..3b0dd1a 100755 --- a/unittest/IOManager_test.cxx +++ b/unittest/IOManager_test.cxx @@ -197,7 +197,7 @@ struct ConfigurationTestFixture IOManager::get()->configure(queues, connections, false, 1000ms, opmgr); // Not using connectivity service } - ~ConfigurationTestFixture() { IOManager::get()->reset(); opmgr.reset(); } + ~ConfigurationTestFixture() { IOManager::get()->reset(); } ConfigurationTestFixture(ConfigurationTestFixture const&) = default; ConfigurationTestFixture(ConfigurationTestFixture&&) = default; From 18127b70b0bb55ada14f0e2a68bfb480388b5ff2 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 2 Aug 2024 11:09:48 +0200 Subject: [PATCH 10/18] Final proposal on my side for network monitoring, Eric approval pending --- CMakeLists.txt | 4 +- include/iomanager/network/NetworkManager.hpp | 3 +- src/network/NetworkManager.cpp | 39 +++++++++----------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5a95f4f..1c6ed2c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,9 +11,11 @@ find_package(opmonlib REQUIRED) find_package(folly REQUIRED) find_package(serialization REQUIRED) find_package(utilities REQUIRED) +find_package(fmt REQUIRED) + ############################################################################## -set(IOMANAGER_DEPENDENCIES serialization::serialization Folly::folly utilities::utilities opmonlib::opmonlib ipm::ipm) +set(IOMANAGER_DEPENDENCIES serialization::serialization Folly::folly utilities::utilities opmonlib::opmonlib ipm::ipm fmt::fmt) daq_codegen(connection.jsonnet TEMPLATES Structs.hpp.j2 Nljs.hpp.j2 ) daq_protobuf_codegen( info/*.proto ) diff --git a/include/iomanager/network/NetworkManager.hpp b/include/iomanager/network/NetworkManager.hpp index dd86395..3f7c0e2 100644 --- a/include/iomanager/network/NetworkManager.hpp +++ b/include/iomanager/network/NetworkManager.hpp @@ -75,7 +75,8 @@ class NetworkManager std::unordered_map m_preconfigured_connections; std::unordered_map> m_receiver_plugins; std::unordered_map> m_sender_plugins; - std::shared_ptr m_opmon_link{ std::make_shared() }; + std::shared_ptr m_sender_opmon_link{ std::make_shared() }; + std::shared_ptr m_receiver_opmon_link{ std::make_shared() }; std::unordered_map> m_subscriber_plugins; std::unique_ptr m_subscriber_update_thread; diff --git a/src/network/NetworkManager.cpp b/src/network/NetworkManager.cpp index d0c2fc4..2b7486d 100755 --- a/src/network/NetworkManager.cpp +++ b/src/network/NetworkManager.cpp @@ -18,6 +18,8 @@ #include #include +#include + namespace dunedaq::iomanager { std::unique_ptr NetworkManager::s_instance = nullptr; @@ -71,7 +73,8 @@ NetworkManager::configure(const Connections_t& connections, } m_config_client_interval = config_client_interval; - opmgr.register_child( "connections", m_opmon_link); + opmgr.register_child( "senders", m_sender_opmon_link); + opmgr.register_child( "receivers", m_receiver_opmon_link); } void @@ -299,24 +302,21 @@ NetworkManager::create_receiver(std::vector connections, Connect m_config_client->publish(connections[0]); } - - // MR: we need to check with Eric why we need this abomination - try { - m_opmon_link->register_child(conn_id.uid, plugin); - } catch (const ers::Issue & e) { - ers::error(e); + if ( is_pubsub ) { bool success = false; size_t counter = 0; do { + auto name = fmt::format("{}--{}", conn_id.uid, counter); try { - auto name = conn_id.uid + '_' + std::to_string(0); - m_opmon_link->register_child(name, plugin); + m_receiver_opmon_link->register_child(name, plugin); success = true; } catch ( const ers::Issue & err ) { - ers::error(err); ++counter; } - } while (! success); + } while( ! success ); + } + else { + m_receiver_opmon_link->register_child(conn_id.uid, plugin); } TLOG_DEBUG(12) << "END"; @@ -360,26 +360,23 @@ NetworkManager::create_sender(ConnectionInfo connection) m_config_client->publish(connection); } - // MR: we need to check with Eric why we need this abomination - try { - m_opmon_link->register_child(connection.uid, plugin); - } catch (const ers::Issue & e) { - ers::error(e); + if ( is_pubsub ) { bool success = false; size_t counter = 0; do { + auto name = fmt::format("{}--{}", connection.uid, counter); try { - auto name = connection.uid + '_' + std::to_string(0); - m_opmon_link->register_child(name, plugin); + m_sender_opmon_link->register_child(name, plugin); success = true; } catch ( const ers::Issue & err ) { - ers::error(err); ++counter; } - } while (! success); + } while( ! success ); + } + else { + m_sender_opmon_link->register_child(connection.uid, plugin); } - // m_opmon_link->register_child(connection.uid, plugin); return plugin; } From 2937f75009255b6a034cc9349f75ffdc7bcd976e Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 2 Aug 2024 12:00:03 +0200 Subject: [PATCH 11/18] Better exception to catch --- src/network/NetworkManager.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/network/NetworkManager.cpp b/src/network/NetworkManager.cpp index 2b7486d..83907a4 100755 --- a/src/network/NetworkManager.cpp +++ b/src/network/NetworkManager.cpp @@ -310,7 +310,7 @@ NetworkManager::create_receiver(std::vector connections, Connect try { m_receiver_opmon_link->register_child(name, plugin); success = true; - } catch ( const ers::Issue & err ) { + } catch ( const opmonlib::NonUniqueChildName & err ) { ++counter; } } while( ! success ); @@ -368,7 +368,7 @@ NetworkManager::create_sender(ConnectionInfo connection) try { m_sender_opmon_link->register_child(name, plugin); success = true; - } catch ( const ers::Issue & err ) { + } catch ( const opmonlib::NonUniqueChildName & err ) { ++counter; } } while( ! success ); From bceaa4304ec2edff96728270c9e3d505bc027433 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 2 Aug 2024 13:58:09 +0200 Subject: [PATCH 12/18] Better code organisation --- include/iomanager/network/NetworkManager.hpp | 3 + src/network/NetworkManager.cpp | 61 ++++++++++---------- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/include/iomanager/network/NetworkManager.hpp b/include/iomanager/network/NetworkManager.hpp index 3f7c0e2..ad24abd 100644 --- a/include/iomanager/network/NetworkManager.hpp +++ b/include/iomanager/network/NetworkManager.hpp @@ -77,6 +77,9 @@ class NetworkManager std::unordered_map> m_sender_plugins; std::shared_ptr m_sender_opmon_link{ std::make_shared() }; std::shared_ptr m_receiver_opmon_link{ std::make_shared() }; + static void register_monitorable_node( std::shared_ptr conn, + std::shared_ptr link, + const std::string & name, bool is_pubsub ); std::unordered_map> m_subscriber_plugins; std::unique_ptr m_subscriber_update_thread; diff --git a/src/network/NetworkManager.cpp b/src/network/NetworkManager.cpp index 83907a4..605e3c1 100755 --- a/src/network/NetworkManager.cpp +++ b/src/network/NetworkManager.cpp @@ -106,6 +106,9 @@ NetworkManager::reset() } } m_config_client.reset(nullptr); + + m_sender_opmon_link = std::make_shared(); + m_receiver_opmon_link = std::make_shared(); } std::shared_ptr @@ -302,22 +305,7 @@ NetworkManager::create_receiver(std::vector connections, Connect m_config_client->publish(connections[0]); } - if ( is_pubsub ) { - bool success = false; - size_t counter = 0; - do { - auto name = fmt::format("{}--{}", conn_id.uid, counter); - try { - m_receiver_opmon_link->register_child(name, plugin); - success = true; - } catch ( const opmonlib::NonUniqueChildName & err ) { - ++counter; - } - } while( ! success ); - } - else { - m_receiver_opmon_link->register_child(conn_id.uid, plugin); - } + register_monitorable_node(plugin, m_receiver_opmon_link, conn_id.uid, is_pubsub); TLOG_DEBUG(12) << "END"; return plugin; @@ -360,23 +348,8 @@ NetworkManager::create_sender(ConnectionInfo connection) m_config_client->publish(connection); } - if ( is_pubsub ) { - bool success = false; - size_t counter = 0; - do { - auto name = fmt::format("{}--{}", connection.uid, counter); - try { - m_sender_opmon_link->register_child(name, plugin); - success = true; - } catch ( const opmonlib::NonUniqueChildName & err ) { - ++counter; - } - } while( ! success ); - } - else { - m_sender_opmon_link->register_child(connection.uid, plugin); - } + register_monitorable_node(plugin, m_sender_opmon_link, connection.uid, is_pubsub); return plugin; } @@ -406,4 +379,28 @@ NetworkManager::update_subscribers() } } +void +NetworkManager::register_monitorable_node( std::shared_ptr conn, + std::shared_ptr link, + const std::string & name, bool is_pubsub ) { + if ( is_pubsub ) { + bool success = false; + size_t counter = 0; + do { + auto fname = fmt::format("{}--{}", name, counter); + try { + link->register_child(fname, conn); + success = true; + } catch ( const opmonlib::NonUniqueChildName & err ) { + ++counter; + } + } while( ! success ); + } + else { + link->register_child(name, conn); + } + +} + + } // namespace dunedaq::iomanager From 44a2cd78c04b772a2ef4da0e9baedb8dd340b6ec Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 6 Aug 2024 15:39:15 +0200 Subject: [PATCH 13/18] Proper opmon structure in schema directory --- CMakeLists.txt | 2 +- schema/iomanager/{info => opmon}/QueueInfo.proto | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename schema/iomanager/{info => opmon}/QueueInfo.proto (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1c6ed2c..5f481ae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,7 +18,7 @@ find_package(fmt REQUIRED) set(IOMANAGER_DEPENDENCIES serialization::serialization Folly::folly utilities::utilities opmonlib::opmonlib ipm::ipm fmt::fmt) daq_codegen(connection.jsonnet TEMPLATES Structs.hpp.j2 Nljs.hpp.j2 ) -daq_protobuf_codegen( info/*.proto ) +daq_protobuf_codegen( opmon/*.proto ) daq_add_library(IOManager.cpp queue/QueueRegistry.cpp network/NetworkManager.cpp network/ConfigClient.cpp LINK_LIBRARIES ${IOMANAGER_DEPENDENCIES} ) diff --git a/schema/iomanager/info/QueueInfo.proto b/schema/iomanager/opmon/QueueInfo.proto similarity index 100% rename from schema/iomanager/info/QueueInfo.proto rename to schema/iomanager/opmon/QueueInfo.proto From abc0a2f6a4baeeb31f0cd8664ed3b98eb889b96e Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 6 Aug 2024 16:04:23 +0200 Subject: [PATCH 14/18] Change interface to register_node --- include/iomanager/queue/detail/QueueRegistry.hxx | 2 +- src/network/NetworkManager.cpp | 10 +++++----- src/queue/QueueRegistry.cpp | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/include/iomanager/queue/detail/QueueRegistry.hxx b/include/iomanager/queue/detail/QueueRegistry.hxx index 8848042..910ea63 100644 --- a/include/iomanager/queue/detail/QueueRegistry.hxx +++ b/include/iomanager/queue/detail/QueueRegistry.hxx @@ -67,7 +67,7 @@ QueueRegistry::create_queue(const QueueConfig& config) throw QueueTypeUnknown(ERS_HERE, str(config.queue_type)); } - m_opmon_link->register_child(config.id.uid, queue); + m_opmon_link->register_node(config.id.uid, queue); return queue; } diff --git a/src/network/NetworkManager.cpp b/src/network/NetworkManager.cpp index 605e3c1..ef880b5 100755 --- a/src/network/NetworkManager.cpp +++ b/src/network/NetworkManager.cpp @@ -73,8 +73,8 @@ NetworkManager::configure(const Connections_t& connections, } m_config_client_interval = config_client_interval; - opmgr.register_child( "senders", m_sender_opmon_link); - opmgr.register_child( "receivers", m_receiver_opmon_link); + opmgr.register_node( "senders", m_sender_opmon_link); + opmgr.register_node( "receivers", m_receiver_opmon_link); } void @@ -389,15 +389,15 @@ NetworkManager::register_monitorable_node( std::shared_ptrregister_child(fname, conn); + link->register_node(fname, conn); success = true; - } catch ( const opmonlib::NonUniqueChildName & err ) { + } catch ( const opmonlib::NonUniqueNodeName & err ) { ++counter; } } while( ! success ); } else { - link->register_child(name, conn); + link->register_node(name, conn); } } diff --git a/src/queue/QueueRegistry.cpp b/src/queue/QueueRegistry.cpp index 56de1d5..5347e85 100644 --- a/src/queue/QueueRegistry.cpp +++ b/src/queue/QueueRegistry.cpp @@ -36,7 +36,7 @@ QueueRegistry::configure(const std::vector& configs, opmonlib::OpMo m_queue_configs = configs; - mgr.register_child("queues", m_opmon_link); + mgr.register_node("queues", m_opmon_link); m_configured = true; } From 8af6fc7791a42cf61fd527cb8df58c2442e1b752 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 6 Aug 2024 16:55:19 +0200 Subject: [PATCH 15/18] Style names --- include/iomanager/queue/QueueBase.hpp | 2 +- schema/iomanager/opmon/{QueueInfo.proto => queue.proto} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename schema/iomanager/opmon/{QueueInfo.proto => queue.proto} (100%) diff --git a/include/iomanager/queue/QueueBase.hpp b/include/iomanager/queue/QueueBase.hpp index c76c344..122ae75 100644 --- a/include/iomanager/queue/QueueBase.hpp +++ b/include/iomanager/queue/QueueBase.hpp @@ -18,7 +18,7 @@ #include "utilities/NamedObject.hpp" #include "opmonlib/MonitorableObject.hpp" -#include "iomanager/info/QueueInfo.pb.h" +#include "iomanager/opmon/queue.pb.h" #include "ers/Issue.hpp" diff --git a/schema/iomanager/opmon/QueueInfo.proto b/schema/iomanager/opmon/queue.proto similarity index 100% rename from schema/iomanager/opmon/QueueInfo.proto rename to schema/iomanager/opmon/queue.proto From 794afd6866a9c5e8807058fbe094bfd449016aa6 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 9 Aug 2024 15:27:14 +0200 Subject: [PATCH 16/18] Final agreement on monitoring name --- src/network/NetworkManager.cpp | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/network/NetworkManager.cpp b/src/network/NetworkManager.cpp index ef880b5..7abf4f5 100755 --- a/src/network/NetworkManager.cpp +++ b/src/network/NetworkManager.cpp @@ -382,10 +382,13 @@ NetworkManager::update_subscribers() void NetworkManager::register_monitorable_node( std::shared_ptr conn, std::shared_ptr link, - const std::string & name, bool is_pubsub ) { - if ( is_pubsub ) { + const std::string & name, bool /*is_pubsub*/ ) { + + try { + link->register_node(name, conn); + } catch (const opmonlib::NonUniqueNodeName & err ) { bool success = false; - size_t counter = 0; + size_t counter = 1; do { auto fname = fmt::format("{}--{}", name, counter); try { @@ -396,10 +399,6 @@ NetworkManager::register_monitorable_node( std::shared_ptrregister_node(name, conn); - } - } From cf2e29a8bfd032d77ed719146dcf7bb8ba007377 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Sat, 10 Aug 2024 09:59:51 +0200 Subject: [PATCH 17/18] Restore compilation --- test/apps/iomanager_stress_test.cxx | 4 +++- test/apps/queues_vs_threads_iomanager.cxx | 3 ++- test/apps/reconnection_test.cxx | 3 ++- unittest/IOManager_test.cxx | 3 ++- unittest/NetworkManager_test.cxx | 7 ++++--- unittest/QueueRegistry_test.cxx | 4 ++-- unittest/performance_test.cxx | 3 ++- 7 files changed, 17 insertions(+), 10 deletions(-) diff --git a/test/apps/iomanager_stress_test.cxx b/test/apps/iomanager_stress_test.cxx index fd7ad68..3911eb9 100644 --- a/test/apps/iomanager_stress_test.cxx +++ b/test/apps/iomanager_stress_test.cxx @@ -11,6 +11,8 @@ #include "boost/program_options.hpp" +#include "opmonlib/TestOpMonManager.hpp" + #include #include #include @@ -122,7 +124,7 @@ struct TestConfig ConnectionType::kPubSub }); } - dunedaq::opmonlib::OpMonManager op_manager(nullptr); + dunedaq::opmonlib::TestOpMonManager op_manager; IOManager::get()->configure( queues, connections, use_connectivity_service, diff --git a/test/apps/queues_vs_threads_iomanager.cxx b/test/apps/queues_vs_threads_iomanager.cxx index 9bbb976..a3d1fa8 100644 --- a/test/apps/queues_vs_threads_iomanager.cxx +++ b/test/apps/queues_vs_threads_iomanager.cxx @@ -5,6 +5,7 @@ #include "iomanager/IOManager.hpp" #include "iomanager/Sender.hpp" +#include "opmonlib/TestOpMonManager.hpp" using namespace std::chrono; @@ -83,7 +84,7 @@ int main(int , char** ) queues.emplace_back(dunedaq::iomanager::QueueConfig{ {"bar" + std::to_string(i),"int_t"}, dunedaq::iomanager::QueueType::kFollyMPMCQueue, 1000 }); } - dunedaq::opmonlib::OpMonManager opmgr(nullptr); + dunedaq::opmonlib::TestOpMonManager opmgr; dunedaq::get_iomanager()->configure(queues, connections, false, 1000ms, opmgr); // Create all of the queues up front so the iomanager output is all in one place and not interspersed with the results diff --git a/test/apps/reconnection_test.cxx b/test/apps/reconnection_test.cxx index 26f135e..c395e0b 100755 --- a/test/apps/reconnection_test.cxx +++ b/test/apps/reconnection_test.cxx @@ -8,6 +8,7 @@ #include "iomanager/IOManager.hpp" #include "logging/Logging.hpp" +#include "opmonlib/TestOpMonManager.hpp" #include "boost/program_options.hpp" @@ -84,7 +85,7 @@ struct TestConfig ConnectionType::kSendRecv }; connections.push_back(recv_conn); - dunedaq::opmonlib::OpMonManager opmgr(nullptr); + dunedaq::opmonlib::TestOpMonManager opmgr; IOManager::get()->configure(queues, connections, true, std::chrono::milliseconds(publish_interval), opmgr); } diff --git a/unittest/IOManager_test.cxx b/unittest/IOManager_test.cxx index 3b0dd1a..f052a73 100755 --- a/unittest/IOManager_test.cxx +++ b/unittest/IOManager_test.cxx @@ -9,6 +9,7 @@ #include "iomanager/IOManager.hpp" #include "serialization/Serialization.hpp" +#include "opmonlib/TestOpMonManager.hpp" #define BOOST_TEST_MODULE IOManager_test // NOLINT @@ -215,7 +216,7 @@ struct ConfigurationTestFixture ConnectionId sub2_id; ConnectionId sub3_id; - dunedaq::opmonlib::OpMonManager opmgr{nullptr}; + dunedaq::opmonlib::TestOpMonManager opmgr; }; BOOST_AUTO_TEST_CASE(CopyAndMoveSemantics) diff --git a/unittest/NetworkManager_test.cxx b/unittest/NetworkManager_test.cxx index 747ee1a..5477702 100644 --- a/unittest/NetworkManager_test.cxx +++ b/unittest/NetworkManager_test.cxx @@ -8,6 +8,7 @@ #include "iomanager/connection/Structs.hpp" #include "iomanager/network/NetworkManager.hpp" +#include "opmonlib/TestOpMonManager.hpp" #include "logging/Logging.hpp" @@ -64,7 +65,7 @@ struct NetworkManagerTestFixture testConn.uri = "inproc:/oof"; testConfig.push_back(testConn); - dunedaq::opmonlib::OpMonManager opmgr(nullptr); + dunedaq::opmonlib::TestOpMonManager opmgr; NetworkManager::get().configure(testConfig, false, 0ms, opmgr); // Not using ConfigClient } ~NetworkManagerTestFixture() { NetworkManager::get().reset(); } @@ -145,7 +146,7 @@ BOOST_FIXTURE_TEST_CASE(FakeConfigure, NetworkManagerTestFixture) testConn.uri = "inproc://rab"; testConn.connection_type = ConnectionType::kSendRecv; testConfig.push_back(testConn); - dunedaq::opmonlib::OpMonManager opmgr(nullptr); + dunedaq::opmonlib::TestOpMonManager opmgr; BOOST_REQUIRE_EXCEPTION( NetworkManager::get().configure(testConfig, false, 0ms, @@ -167,7 +168,7 @@ BOOST_FIXTURE_TEST_CASE(NameCollisionInConfiguration, NetworkManagerTestFixture) Connections_t testConfig; testConfig.emplace_back(Connection{ sendRecvConnId, "inproc://foo", ConnectionType::kSendRecv }); testConfig.emplace_back(Connection{ sendRecvConnId, "inproc://bar", ConnectionType::kSendRecv }); - dunedaq::opmonlib::OpMonManager opmgr(nullptr); + dunedaq::opmonlib::TestOpMonManager opmgr; BOOST_REQUIRE_EXCEPTION(NetworkManager::get().configure(testConfig, false, 0ms, opmgr), diff --git a/unittest/QueueRegistry_test.cxx b/unittest/QueueRegistry_test.cxx index 16a57c3..3b4f352 100755 --- a/unittest/QueueRegistry_test.cxx +++ b/unittest/QueueRegistry_test.cxx @@ -7,7 +7,7 @@ */ #include "iomanager/queue/QueueRegistry.hpp" - +#include "opmonlib/TestOpMonManager.hpp" #define BOOST_TEST_MODULE QueueRegistry_test // NOLINT #include "boost/test/unit_test.hpp" @@ -41,7 +41,7 @@ BOOST_AUTO_TEST_CASE(Configure) qc.id.uid = "test_queue_fmpmc"; queue_registry_config.push_back(qc); - dunedaq::opmonlib::OpMonManager opmgr(nullptr); + dunedaq::opmonlib::TestOpMonManager opmgr; QueueRegistry::get().configure(queue_registry_config, opmgr); BOOST_REQUIRE_EXCEPTION(QueueRegistry::get().configure(queue_registry_config, opmgr), diff --git a/unittest/performance_test.cxx b/unittest/performance_test.cxx index 5d8598a..6e99f6b 100644 --- a/unittest/performance_test.cxx +++ b/unittest/performance_test.cxx @@ -12,6 +12,7 @@ #define BOOST_TEST_MODULE performance_test // NOLINT +#include "opmonlib/TestOpMonManager.hpp" #include "boost/test/unit_test.hpp" #include @@ -65,7 +66,7 @@ struct ConfigurationTestFixture ConfigurationTestFixture& operator=(ConfigurationTestFixture const&) = delete; ConfigurationTestFixture& operator=(ConfigurationTestFixture&&) = delete; - dunedaq::opmonlib::OpMonManager opmgr{nullptr}; + dunedaq::opmonlib::TestOpMonManager opmgr; dunedaq::iomanager::ConnectionId network_id; dunedaq::iomanager::ConnectionId queue_id; From 1a098246dec7e0df8d973a5893de752114d223b0 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Sat, 10 Aug 2024 10:03:38 +0200 Subject: [PATCH 18/18] restore test --- test/apps/iomanager_stress_test_pubsub.cxx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/apps/iomanager_stress_test_pubsub.cxx b/test/apps/iomanager_stress_test_pubsub.cxx index ad4370c..617b33b 100644 --- a/test/apps/iomanager_stress_test_pubsub.cxx +++ b/test/apps/iomanager_stress_test_pubsub.cxx @@ -8,6 +8,7 @@ #include "iomanager/IOManager.hpp" #include "logging/Logging.hpp" +#include "opmonlib/TestOpMonManager.hpp" #include "boost/program_options.hpp" @@ -162,7 +163,7 @@ struct TestConfig } } - dunedaq::opmonlib::OpMonManager op_manager(nullptr); + dunedaq::opmonlib::TestOpMonManager op_manager; IOManager::get()->configure( queues, connections, use_connectivity_service,