Skip to content

Commit

Permalink
Merge pull request #81 from DUNE-DAQ/feature/opmon
Browse files Browse the repository at this point in the history
Feature/opmon
  • Loading branch information
glehmannmiotto authored Aug 14, 2024
2 parents 65872f7 + 1a09824 commit 55fcd9c
Show file tree
Hide file tree
Showing 21 changed files with 150 additions and 135 deletions.
6 changes: 4 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ find_package(opmonlib REQUIRED)
find_package(folly REQUIRED)
find_package(serialization REQUIRED)
find_package(utilities REQUIRED)
find_package(fmt REQUIRED)


##############################################################################
set(IOMANAGER_DEPENDENCIES serialization::serialization Folly::folly utilities::utilities opmonlib::opmonlib ipm::ipm)
set(IOMANAGER_DEPENDENCIES serialization::serialization Folly::folly utilities::utilities opmonlib::opmonlib ipm::ipm fmt::fmt)

daq_codegen(connection.jsonnet TEMPLATES Structs.hpp.j2 Nljs.hpp.j2 )
daq_codegen(connectioninfo.jsonnet queueinfo.jsonnet DEP_PKGS opmonlib TEMPLATES opmonlib/InfoStructs.hpp.j2 opmonlib/InfoNljs.hpp.j2 )
daq_protobuf_codegen( opmon/*.proto )

daq_add_library(IOManager.cpp queue/QueueRegistry.cpp network/NetworkManager.cpp network/ConfigClient.cpp LINK_LIBRARIES ${IOMANAGER_DEPENDENCIES} )

Expand Down
4 changes: 3 additions & 1 deletion include/iomanager/IOManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "iomanager/Receiver.hpp"
#include "iomanager/Sender.hpp"
#include "iomanager/connection/Structs.hpp"
#include "opmonlib/OpMonManager.hpp"

#include <chrono>
#include <functional>
Expand Down Expand Up @@ -50,7 +51,8 @@ class IOManager
void configure(Queues_t queues,
Connections_t connections,
bool use_config_client,
std::chrono::milliseconds config_client_interval);
std::chrono::milliseconds config_client_interval,
opmonlib::OpMonManager &);

void reset();

Expand Down
11 changes: 8 additions & 3 deletions include/iomanager/network/NetworkManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "ipm/Receiver.hpp"
#include "ipm/Sender.hpp"
#include "ipm/Subscriber.hpp"
#include "opmonlib/InfoCollector.hpp"
#include "opmonlib/OpMonManager.hpp"

#include <atomic>
#include <chrono>
Expand All @@ -41,8 +41,8 @@ class NetworkManager
static NetworkManager& get();
~NetworkManager() { reset(); }

void gather_stats(opmonlib::InfoCollector& ci, int /*level*/);
void configure(const Connections_t& connections, bool use_config_client, std::chrono::milliseconds config_client_interval);
void configure(const Connections_t& connections, bool use_config_client, std::chrono::milliseconds config_client_interval,
dunedaq::opmonlib::OpMonManager &);
void reset();

std::shared_ptr<ipm::Receiver> get_receiver(ConnectionId const& conn_id);
Expand Down Expand Up @@ -75,6 +75,11 @@ class NetworkManager
std::unordered_map<ConnectionId, Connection> m_preconfigured_connections;
std::unordered_map<ConnectionId, std::shared_ptr<ipm::Receiver>> m_receiver_plugins;
std::unordered_map<ConnectionId, std::shared_ptr<ipm::Sender>> m_sender_plugins;
std::shared_ptr<dunedaq::opmonlib::OpMonLink> m_sender_opmon_link{ std::make_shared<dunedaq::opmonlib::OpMonLink>() };
std::shared_ptr<dunedaq::opmonlib::OpMonLink> m_receiver_opmon_link{ std::make_shared<dunedaq::opmonlib::OpMonLink>() };
static void register_monitorable_node( std::shared_ptr<opmonlib::MonitorableObject> conn,
std::shared_ptr<opmonlib::OpMonLink> link,
const std::string & name, bool is_pubsub );

std::unordered_map<ConnectionId, std::shared_ptr<ipm::Subscriber>> m_subscriber_plugins;
std::unique_ptr<std::thread> m_subscriber_update_thread;
Expand Down
2 changes: 0 additions & 2 deletions include/iomanager/queue/Queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

#include "iomanager/queue/QueueBase.hpp"

#include "opmonlib/InfoCollector.hpp"

#include "ers/Issue.hpp"

#include <chrono>
Expand Down
31 changes: 17 additions & 14 deletions include/iomanager/queue/QueueBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
#ifndef IOMANAGER_INCLUDE_IOMANAGER_QUEUEBASE_HPP_
#define IOMANAGER_INCLUDE_IOMANAGER_QUEUEBASE_HPP_

#include "iomanager/queueinfo/InfoNljs.hpp"
#include "utilities/NamedObject.hpp"

#include "opmonlib/InfoCollector.hpp"
#include "opmonlib/MonitorableObject.hpp"
#include "iomanager/opmon/queue.pb.h"

#include "ers/Issue.hpp"

Expand All @@ -35,7 +35,7 @@ namespace dunedaq::iomanager {
* @brief The QueueBase class allows to address generic behavior of any Queue implementation
*
*/
class QueueBase : public utilities::NamedObject
class QueueBase : public utilities::NamedObject, public opmonlib::MonitorableObject
{
public:
/**
Expand All @@ -46,17 +46,6 @@ class QueueBase : public utilities::NamedObject
: utilities::NamedObject(name)
{}

/**
* @brief Method to retrieve information (occupancy) from
* queues.
*/
void get_info(opmonlib::InfoCollector& ci, int /*level*/)
{
queueinfo::Info info;
info.capacity = this->get_capacity();
info.number_of_elements = this->get_num_elements();
ci.add(info);
}

/**
* @brief Get the capacity (max size) of the queue
Expand All @@ -66,6 +55,20 @@ class QueueBase : public utilities::NamedObject

virtual size_t get_num_elements() const = 0;


protected:
/**
* @brief Method to retrieve information (occupancy) from
* queues.
*/
void generate_opmon_data() override
{
opmon::QueueInfo info;
info.set_capacity(this->get_capacity());
info.set_number_of_elements(this->get_num_elements());
publish(std::move(info));
}

private:
QueueBase(const QueueBase&) = delete;
QueueBase& operator=(const QueueBase&) = delete;
Expand Down
10 changes: 4 additions & 6 deletions include/iomanager/queue/QueueRegistry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
#include "iomanager/SchemaUtils.hpp"
#include "iomanager/queue/Queue.hpp"
#include "iomanager/queue/QueueIssues.hpp"
#include "opmonlib/OpMonManager.hpp"

#include "ers/Issue.hpp"
#include "opmonlib/InfoCollector.hpp"

#include <map>
#include <memory>
Expand Down Expand Up @@ -56,10 +56,7 @@ class QueueRegistry
* @brief Configure the QueueRegistry
* @param configs Queue configurations
*/
void configure(const std::vector<QueueConfig>& configs);

// Gather statistics from queues
void gather_stats(opmonlib::InfoCollector& ic, int level);
void configure(const std::vector<QueueConfig>& configs, opmonlib::OpMonManager &);

// ONLY TO BE USED FOR TESTING!
static void reset() { s_instance.reset(nullptr); }
Expand All @@ -83,7 +80,8 @@ class QueueRegistry

std::map<std::string, QueueEntry> m_queue_registry;
std::vector<QueueConfig> m_queue_configs;

std::shared_ptr<opmonlib::OpMonLink> m_opmon_link{ std::make_shared<opmonlib::OpMonLink>() };

bool m_configured{ false };

static std::unique_ptr<QueueRegistry> s_instance;
Expand Down
2 changes: 2 additions & 0 deletions include/iomanager/queue/detail/QueueRegistry.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ QueueRegistry::create_queue(const QueueConfig& config)
throw QueueTypeUnknown(ERS_HERE, str(config.queue_type));
}

m_opmon_link->register_node(config.id.uid, queue);

return queue;
}

Expand Down
20 changes: 0 additions & 20 deletions schema/iomanager/connectioninfo.jsonnet

This file was deleted.

10 changes: 10 additions & 0 deletions schema/iomanager/opmon/queue.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
syntax = "proto3";


package dunedaq.iomanager.opmon;

message QueueInfo {

uint64 capacity = 1;
uint64 number_of_elements = 2;
}
19 changes: 0 additions & 19 deletions schema/iomanager/queueinfo.jsonnet

This file was deleted.

7 changes: 4 additions & 3 deletions src/IOManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ void
dunedaq::iomanager::IOManager::configure(Queues_t queues,
Connections_t connections,
bool use_config_client,
std::chrono::milliseconds config_client_interval)
std::chrono::milliseconds config_client_interval,
dunedaq::opmonlib::OpMonManager & opmgr)
{
char* session = getenv("DUNEDAQ_SESSION");
if (session) {
Expand All @@ -37,8 +38,8 @@ dunedaq::iomanager::IOManager::configure(Queues_t queues,
nwCfg.push_back(connection);
}

QueueRegistry::get().configure(qCfg);
NetworkManager::get().configure(nwCfg, use_config_client, config_client_interval);
QueueRegistry::get().configure(qCfg, opmgr);
NetworkManager::get().configure(nwCfg, use_config_client, config_client_interval, opmgr);
}

void
Expand Down
59 changes: 38 additions & 21 deletions src/network/NetworkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
#include "iomanager/network/NetworkManager.hpp"
#include "iomanager/SchemaUtils.hpp"

#include "iomanager/connectioninfo/InfoNljs.hpp"

#include "ipm/PluginInfo.hpp"
#include "logging/Logging.hpp"
#include "utilities/Resolver.hpp"
Expand All @@ -20,6 +18,8 @@
#include <string>
#include <vector>

#include <fmt/format.h>

namespace dunedaq::iomanager {

std::unique_ptr<NetworkManager> NetworkManager::s_instance = nullptr;
Expand All @@ -33,29 +33,12 @@ NetworkManager::get()
return *s_instance;
}

void
NetworkManager::gather_stats(opmonlib::InfoCollector& ci, int level)
{

for (auto& sender : m_sender_plugins) {
opmonlib::InfoCollector tmp_ic;
if(sender.second == nullptr) continue;
sender.second->get_info(tmp_ic, level);
ci.add(sender.first.uid, tmp_ic);
}

for (auto& receiver : m_receiver_plugins) {
opmonlib::InfoCollector tmp_ic;
if(receiver.second == nullptr) continue;
receiver.second->get_info(tmp_ic, level);
ci.add(receiver.first.uid, tmp_ic);
}
}

void
NetworkManager::configure(const Connections_t& connections,
bool use_config_client,
std::chrono::milliseconds config_client_interval)
std::chrono::milliseconds config_client_interval,
dunedaq::opmonlib::OpMonManager & opmgr)
{
if (!m_preconfigured_connections.empty()) {
throw AlreadyConfigured(ERS_HERE);
Expand Down Expand Up @@ -89,6 +72,9 @@ NetworkManager::configure(const Connections_t& connections,
m_config_client = std::make_unique<ConfigClient>(connectionServer, connectionPort, config_client_interval);
}
m_config_client_interval = config_client_interval;

opmgr.register_node( "senders", m_sender_opmon_link);
opmgr.register_node( "receivers", m_receiver_opmon_link);
}

void
Expand Down Expand Up @@ -120,6 +106,9 @@ NetworkManager::reset()
}
}
m_config_client.reset(nullptr);

m_sender_opmon_link = std::make_shared<dunedaq::opmonlib::OpMonLink>();
m_receiver_opmon_link = std::make_shared<dunedaq::opmonlib::OpMonLink>();
}

std::shared_ptr<ipm::Receiver>
Expand Down Expand Up @@ -316,6 +305,8 @@ NetworkManager::create_receiver(std::vector<ConnectionInfo> connections, Connect
m_config_client->publish(connections[0]);
}

register_monitorable_node(plugin, m_receiver_opmon_link, conn_id.uid, is_pubsub);

TLOG_DEBUG(12) << "END";
return plugin;
}
Expand Down Expand Up @@ -357,6 +348,9 @@ NetworkManager::create_sender(ConnectionInfo connection)
m_config_client->publish(connection);
}


register_monitorable_node(plugin, m_sender_opmon_link, connection.uid, is_pubsub);

return plugin;
}

Expand Down Expand Up @@ -385,4 +379,27 @@ NetworkManager::update_subscribers()
}
}

void
NetworkManager::register_monitorable_node( std::shared_ptr<opmonlib::MonitorableObject> conn,
std::shared_ptr<opmonlib::OpMonLink> link,
const std::string & name, bool /*is_pubsub*/ ) {

try {
link->register_node(name, conn);
} catch (const opmonlib::NonUniqueNodeName & err ) {
bool success = false;
size_t counter = 1;
do {
auto fname = fmt::format("{}--{}", name, counter);
try {
link->register_node(fname, conn);
success = true;
} catch ( const opmonlib::NonUniqueNodeName & err ) {
++counter;
}
} while( ! success );
}
}


} // namespace dunedaq::iomanager
Loading

0 comments on commit 55fcd9c

Please sign in to comment.