From 532c163e9952ee0f48f309dd55e5555e1db8bc0a Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Mon, 29 Jul 2024 11:41:25 +0200 Subject: [PATCH 1/6] remove old stuff --- CMakeLists.txt | 2 - plugins/kafkaOpmonService.cpp | 151 ---------------------------------- 2 files changed, 153 deletions(-) delete mode 100644 plugins/kafkaOpmonService.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index b7af903..176dfb7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,9 +20,7 @@ daq_add_library(*.cpp LINK_LIBRARIES ${Boost_LIBRARIES} opmonlib::opmonlib RdKaf ############################################################################## # Plugins -daq_add_plugin(kafkaOpmonService duneOpmonService LINK_LIBRARIES opmonlib::opmonlib RdKafka::rdkafka RdKafka::rdkafka++ ers::ers kafkaopmon) daq_add_plugin(streamOpMonFacility duneOpMonFacility LINK_LIBRARIES kafkaopmon) -target_include_directories(kafkaopmon_kafkaOpmonService_duneOpmonService PUBLIC) ## The following application is deprecated ## The correct consumer to feed influxdb is contained in microservice diff --git a/plugins/kafkaOpmonService.cpp b/plugins/kafkaOpmonService.cpp deleted file mode 100644 index 5b2aaee..0000000 --- a/plugins/kafkaOpmonService.cpp +++ /dev/null @@ -1,151 +0,0 @@ -/** - * @file kafkaOpmonService.cpp kafkaopmon class implementation - * - * This is part of the DUNE DAQ software, copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ - -#include "JsonFlattener.hpp" - -#include "opmonlib/OpmonService.hpp" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using json = nlohmann::json; - -namespace dunedaq { // namespace dunedaq - -ERS_DECLARE_ISSUE(kafkaopmon, CannotProduce, "Cannot produce to kafka " << error, ((std::string)error)) - -ERS_DECLARE_ISSUE(kafkaopmon, WrongURI, "Incorrect URI" << uri, ((std::string)uri)) - -} // namespace dunedaq - -namespace dunedaq::kafkaopmon { // namespace dunedaq - -class kafkaOpmonService : public dunedaq::opmonlib::OpmonService -{ -public: - explicit kafkaOpmonService(std::string uri) - : dunedaq::opmonlib::OpmonService(uri) - { - - // Regex rescription: - //"([a-zA-Z]+):\/\/([^:\/?#\s]+)+(?::(\d+))?(\/[^?#\s]+)?(?:\?(?:db=([^?#\s]+)))" - //* 1st Capturing Group `([a-zA-Z])`: Matches protocol - //* 2nd Capturing Group `([^:\/?#\s])+`: Matches hostname - //* 3rd Capturing Group `(\d)`: Matches port - //* 4th Capturing Group `([^\/?#]+)?`: Matches kafka topic - - std::regex uri_re(R"(([a-zA-Z]+):\/\/([^:\/?#\s]+):(\d+)\/([^:\/?#\s]+))"); - //* 1st Capturing Group `([a-zA-Z])`: Matches protocol - //* 2nd Capturing Group `([^:\/?#\s])+`: Matches hostname - //* 3rd Capturing Group `(\d)`: Matches port - //* 4th Capturing Group `([^\/?#]+)?`: Matches kafka topic - - std::smatch uri_match; - if (!std::regex_match(uri, uri_match, uri_re)) { - ers::fatal(WrongURI(ERS_HERE, " Invalid URI syntax: " + uri)); - } - - m_host = uri_match[2]; - m_port = uri_match[3]; - m_topic = uri_match[4]; - // Kafka server settings - std::string brokers = m_host + ":" + m_port; - std::string errstr; - - RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - conf->set("bootstrap.servers", brokers, errstr); - if (errstr != "") { - CannotProduce(ERS_HERE, "Bootstrap server error : " + errstr); - } - if (const char* env_p = std::getenv("DUNEDAQ_APPLICATION_NAME")) - conf->set("client.id", env_p, errstr); - else - conf->set("client.id", "erskafkaOpmonproducerdefault", errstr); - - if (errstr != "") { - CannotProduce(ERS_HERE, "Producer configuration error : " + errstr); - } - // Create producer instance - m_producer = RdKafka::Producer::create(conf, errstr); - - if (errstr != "") { - CannotProduce(ERS_HERE, "Producer creation error : " + errstr); - } - } - - void publish(nlohmann::json j) override - { - - std::vector infos; - try { - JsonFlattener jf(j); - infos = jf.get(); - } catch (const ers::Issue& i) { - ers::error(i); - } - - for (const auto& report : infos) { - - auto partition = report["tags"]["partition_id"].get(); - - try { - // serialize it to BSON - RdKafka::ErrorCode err = m_producer->produce(m_topic, - RdKafka::Topic::PARTITION_UA, - RdKafka::Producer::RK_MSG_COPY, - const_cast(report.dump().c_str()), - report.dump().size(), - &partition, - 0, - 0, - nullptr); - - if (err != RdKafka::ERR_NO_ERROR) { - CannotProduce(ERS_HERE, "% Failed to produce " + RdKafka::err2str(err)); - } - } catch (const std::exception& e) { - std::string s = e.what(); - ers::error(CannotProduce(ERS_HERE, "Error [" + s + "] message(s) were not delivered")); - } - } - } - -protected: - typedef OpmonService inherited; - -private: - RdKafka::Producer* m_producer; - - std::string m_host; - std::string m_port; - std::string m_topic; - - std::vector m_inserts; - - std::string m_query; -}; - -} // namespace dunedaq::kafkaopmon - -extern "C" -{ - std::shared_ptr make(std::string service) - { // namespace dunedaq::kafkaopmon - return std::shared_ptr(new dunedaq::kafkaopmon::kafkaOpmonService(service)); - } -} From 63c1a74431ebc66bf873b1283b6de244af85ad44 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 6 Aug 2024 15:56:32 +0200 Subject: [PATCH 2/6] Change to register_node --- test/apps/opmon_publish_to_kafka_test.cxx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/apps/opmon_publish_to_kafka_test.cxx b/test/apps/opmon_publish_to_kafka_test.cxx index 4b2661a..e5941d8 100644 --- a/test/apps/opmon_publish_to_kafka_test.cxx +++ b/test/apps/opmon_publish_to_kafka_test.cxx @@ -20,7 +20,7 @@ using namespace dunedaq::opmonlib; class TestObject : public MonitorableObject { public: - using MonitorableObject::register_child; + using MonitorableObject::register_node; using MonitorableObject::publish; TestObject() : MonitorableObject() {;} }; @@ -60,7 +60,7 @@ main(int argc, char const* argv[]) std::vector> objs(n); for ( size_t i = 0; i < n; ++i ) { auto p = objs[i] = std::make_shared(); - man.register_child( "element_" + std::to_string(i), p ); + man.register_node( "element_" + std::to_string(i), p ); } auto pub_func = [&](int i, std::shared_ptr p){ From 3ba7207712244bc5df37e19d4722ddc79519afd4 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 6 Aug 2024 17:00:58 +0200 Subject: [PATCH 3/6] Style names --- test/apps/opmon_publish_to_kafka_test.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/apps/opmon_publish_to_kafka_test.cxx b/test/apps/opmon_publish_to_kafka_test.cxx index e5941d8..8314e3c 100644 --- a/test/apps/opmon_publish_to_kafka_test.cxx +++ b/test/apps/opmon_publish_to_kafka_test.cxx @@ -6,7 +6,7 @@ #include #include -#include "opmonlib/info/test.pb.h" +#include "opmonlib/opmon/test.pb.h" #include From fa6f639be831759e7311b11340c45d5a75235256 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 6 Aug 2024 17:07:15 +0200 Subject: [PATCH 4/6] Style names --- test/apps/opmon_publisher_test.cxx | 2 +- unittest/stream_OpMonFacility_test.cxx | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/apps/opmon_publisher_test.cxx b/test/apps/opmon_publisher_test.cxx index 4ad1309..42f64b8 100644 --- a/test/apps/opmon_publisher_test.cxx +++ b/test/apps/opmon_publisher_test.cxx @@ -8,7 +8,7 @@ #include #include "opmonlib/Utils.hpp" -#include "opmonlib/info/test.pb.h" +#include "opmonlib/opmon/test.pb.h" #include diff --git a/unittest/stream_OpMonFacility_test.cxx b/unittest/stream_OpMonFacility_test.cxx index 83e672a..f8831eb 100644 --- a/unittest/stream_OpMonFacility_test.cxx +++ b/unittest/stream_OpMonFacility_test.cxx @@ -9,7 +9,7 @@ #include "opmonlib/OpMonFacility.hpp" #include "opmonlib/Utils.hpp" -#include "opmonlib/info/test.pb.h" +#include "opmonlib/opmon/test.pb.h" #define BOOST_TEST_MODULE stream_opmon_facility_test // NOLINT From d3ac5ea9ed601de00ad82728e74d11f367942f89 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 9 Aug 2024 11:46:04 +0200 Subject: [PATCH 5/6] Getting rid of old debries --- src/JsonFlattener.cpp | 100 ------------------------------------------ src/JsonFlattener.hpp | 64 --------------------------- 2 files changed, 164 deletions(-) delete mode 100644 src/JsonFlattener.cpp delete mode 100644 src/JsonFlattener.hpp diff --git a/src/JsonFlattener.cpp b/src/JsonFlattener.cpp deleted file mode 100644 index 6931e33..0000000 --- a/src/JsonFlattener.cpp +++ /dev/null @@ -1,100 +0,0 @@ -/** - * @file JsonFlattener.cpp JsonFlattener class implementation - * - * This is part of the DUNE DAQ Software Suite, copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ -#include - -#include "JsonFlattener.hpp" - -#include "opmonlib/JSONTags.hpp" - -namespace dunedaq { - -using opmonlib::JSONTags; - -namespace kafkaopmon { - -JsonFlattener::JsonFlattener(const nlohmann::json& j) -{ - if (j.size() != 2 || j.count(JSONTags::parent) != 1 || j.count(JSONTags::tags) != 1) { - throw OpmonJSONValidationError(ERS_HERE, "Root key '" + std::string(JSONTags::parent) + "' not found"); - } - - if (j[JSONTags::parent].size() != 1) { - throw OpmonJSONValidationError(ERS_HERE, - "Expected 1 top-level entry, found " + std::to_string(j[JSONTags::parent].size())); - } - - // even easier with structured bindings (C++17) - for (auto& [key, value] : j[JSONTags::tags].items()) { - m_tags[key] = value; - } - - parse_json("", j[JSONTags::parent]); -} - -void -JsonFlattener::parse_json(std::string path, const nlohmann::json& j) -{ - - for (auto& [key, obj] : j.items()) { - - auto objpath = (path.size() ? path + m_separator + key : key); - - // validate meta filelds, i.e. - if (!obj.is_object()) - throw OpmonJSONValidationError(ERS_HERE, key + " is not an object"); - - // if properties are here, process them - if (obj.count(JSONTags::properties)) { - // Loop over property structures - for (auto& pstruct : obj[JSONTags::properties].items()) { - - // Make sure that this is a json object - if (!pstruct.value().is_object()) - throw OpmonJSONValidationError(ERS_HERE, pstruct.key() + " is not an object"); - - // And that it contains the required fields - if (pstruct.value().count(JSONTags::time) == 0 || pstruct.value().count(JSONTags::data) == 0) - throw OpmonJSONValidationError( - ERS_HERE, pstruct.key() + " has no " + JSONTags::time + " or " + JSONTags::data + " tag"); - - // Check for presence of stubstructures - std::vector sub_structs; - for (auto& pobj : pstruct.value().at(JSONTags::data).items()) { - if (pobj.value().is_object()) { - sub_structs.push_back(pobj.key()); - } - } - if (sub_structs.size()) { - throw OpmonJSONValidationError(ERS_HERE, pstruct.key() + " contains substructures"); - } - - nlohmann::json entry; - auto& temp_tags = entry["tags"] = m_tags; - temp_tags["source_id"] = objpath; - entry["measurement"] = pstruct.key(); - entry["fields"] = pstruct.value().at(JSONTags::data); - auto seconds = pstruct.value().at(JSONTags::time).get(); - auto gmt = gmtime(&seconds); - std::array time_c_string; - strftime(time_c_string.data(), 80, "%Y-%m-%dT%H:%M:%SZ", gmt); - std::string time_string(time_c_string.data()); - entry["time"] = time_string; - m_components.push_back(entry); - } - } - - // and then go through children - if (obj.count(JSONTags::children)) { - // Recurse over children - this->parse_json(objpath, obj[JSONTags::children]); - } - } -} - -} // namespace kafkaopmon -} // namespace dunedaq diff --git a/src/JsonFlattener.hpp b/src/JsonFlattener.hpp deleted file mode 100644 index 17c13a2..0000000 --- a/src/JsonFlattener.hpp +++ /dev/null @@ -1,64 +0,0 @@ -/** - * @file JsonFlattener.hpp unitlity class - * - * This class takes the nested structure produced by opmon - * and creates a number of simpler json with no chieldren blocks - * - * This is part of the DUNE DAQ Application Framework, copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ - -#ifndef KAFKAOPMON_SRC_JSONFLATTENER_HPP_ -#define KAFKAOPMON_SRC_JSONFLATTENER_HPP_ - -#include "logging/Logging.hpp" - -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -using json = nlohmann::json; - -namespace dunedaq { -ERS_DECLARE_ISSUE(kafkaopmon, OpmonJSONValidationError, "JSON input incorrect" << error, ((std::string)error)) - -ERS_DECLARE_ISSUE(kafkaopmon, IncorrectJSON, "JSON input incorrect" << Warning, ((std::string)Warning)) - -ERS_DECLARE_ISSUE(kafkaopmon, ErrorJSON, "JSON input error" << Error, ((std::string)Error)) - -namespace kafkaopmon { -class JsonFlattener -{ - -public: - JsonFlattener() = delete; - explicit JsonFlattener(const nlohmann::json& j); - /** - * Convert a nlohmann::json wiht nested metrics into - * a vector of simple json that are similar to the logcal structure - * accpeted by influx DB - */ - const std::vector& get() const { return m_components; } - - inline static std::string m_source_id_tag = "source_id"; // NOLINT - static constexpr char m_separator = '.'; - -protected: - void parse_json(std::string path, const nlohmann::json& j); - -private: - std::vector m_components; - nlohmann::json m_tags; -}; -} // namespace kafkaopmon -} // namespace dunedaq - -#endif // KAFKAOPMON_SRC_JSONFLATTENER_HPP_ From 9409faf8e5f951ac6312fc4185a2491c92dd1c1d Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 9 Aug 2024 11:47:11 +0200 Subject: [PATCH 6/6] more cleanup --- CMakeLists.txt | 1 - test/apps/test_flattener.cxx | 33 --------------------------------- 2 files changed, 34 deletions(-) delete mode 100644 test/apps/test_flattener.cxx diff --git a/CMakeLists.txt b/CMakeLists.txt index 176dfb7..1506236 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,7 +41,6 @@ daq_add_unit_test( stream_OpMonFacility_test LINK_LIBRARIES kafkaopmon ) ############################################################################## # test application -daq_add_application( test_flattener test_flattener.cxx TEST LINK_LIBRARIES kafkaopmon) daq_add_application( opmon_publisher_test opmon_publisher_test.cxx TEST LINK_LIBRARIES kafkaopmon ) daq_add_application( opmon_publish_to_kafka_test opmon_publish_to_kafka_test.cxx TEST LINK_LIBRARIES kafkaopmon ) diff --git a/test/apps/test_flattener.cxx b/test/apps/test_flattener.cxx deleted file mode 100644 index bfab376..0000000 --- a/test/apps/test_flattener.cxx +++ /dev/null @@ -1,33 +0,0 @@ -/** - * @brief Using namespace for convenience - */ -#include "JsonFlattener.hpp" - -#include - -using json = nlohmann::json; - -int -main(int argc, char const* argv[]) -{ - - if (argc != 2) { - std::cout << "Usage: " << argv[0] << " .json" << std::endl; - exit(-1); - } - - std::ifstream file((argv[1])); - json j = json::parse(file); - - std::cout << j << std::endl; - - std::cout << "------" << std::endl; - auto iqb = dunedaq::kafkaopmon::JsonFlattener(j); - - for (auto item : iqb.get()) { - std::cout << item << std::endl; - } - - /* code */ - return 0; -}