Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mroda/protobuf #287

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ daq_oks_codegen(appfwk.schema.xml TEST NAMESPACE dunedaq::appfwk::dal
DALDIR dal DEP_PKGS confmodel)

daq_codegen( app.jsonnet cmd.jsonnet DEP_PKGS iomanager rcif cmdlib TEMPLATES Structs.hpp.j2 Nljs.hpp.j2 )
daq_codegen( appinfo.jsonnet DEP_PKGS opmonlib TEMPLATES opmonlib/InfoStructs.hpp.j2 opmonlib/InfoNljs.hpp.j2 )
daq_protobuf_codegen( *.proto )

##############################################################################
# Main library
Expand Down
7 changes: 2 additions & 5 deletions include/appfwk/DAQModule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
#include "appfwk/ModuleConfiguration.hpp"

#include "utilities/NamedObject.hpp"

#include "opmonlib/InfoCollector.hpp"
#include "opmonlib/MonitorableObject.hpp"

#include "cetlib/BasicPluginFactory.h"
#include "cetlib/compiler_macros.h"
Expand Down Expand Up @@ -160,7 +159,7 @@ namespace appfwk {
* This header also contains the definitions of the Issues that can be
* thrown by the DAQModule.
*/
class DAQModule : public utilities::NamedObject
class DAQModule : public utilities::NamedObject, public opmonlib::MonitorableObject
{
public:
using data_t = nlohmann::json;
Expand Down Expand Up @@ -204,8 +203,6 @@ class DAQModule : public utilities::NamedObject

bool has_command(const std::string& name, const std::string& state) const;

virtual void get_info(opmonlib::InfoCollector& /*ci*/, int /*level*/) { return; }

protected:
/**
* @brief Registers a mdoule command under the name `cmd`.
Expand Down
13 changes: 13 additions & 0 deletions schema/appfwk/AppInfo.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

package dunedaq.appfwk.opmon;

message AppInfo {

string state = 1;
string host = 2;

bool busy = 5;
bool error = 6;

}
24 changes: 0 additions & 24 deletions schema/appfwk/appinfo.jsonnet

This file was deleted.

22 changes: 8 additions & 14 deletions src/Application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#define APPFWK_INCLUDE_APPFWK_APPLICATION_HPP_

#include "appfwk/cmd/Structs.hpp"
#include "rcif/runinfo/InfoStructs.hpp"
#include "rcif/RunInfo.pb.h"
#include "utilities/NamedObject.hpp"

#include "cmdlib/CommandFacility.hpp"
Expand All @@ -19,8 +19,7 @@
#include "DAQModuleManager.hpp"
#include "appfwk/ConfFacility.hpp"

#include "opmonlib/InfoManager.hpp"
#include "opmonlib/InfoProvider.hpp"
#include "opmonlib/OpMonManager.hpp"

#include "ers/Issue.hpp"
#include "nlohmann/json.hpp"
Expand Down Expand Up @@ -58,14 +57,14 @@ namespace appfwk {

class Application
: public cmdlib::CommandedObject
, public opmonlib::InfoProvider
, public opmonlib::OpMonManager
, public utilities::NamedObject
{
public:
using dataobj_t = nlohmann::json;

Application(std::string app_name,
std::string partition_name,
std::string session_name,
std::string cmdlibimpl,
std::string opmonlibimpl,
std::string confimpl);
Expand All @@ -79,15 +78,13 @@ class Application
// Execute a properly structured command
void execute(const dataobj_t& cmd_data);

// Gether the opmon information

void gather_stats(opmonlib::InfoCollector& ic, int level);

// Check whether the command can be accepted
bool is_cmd_valid(const dataobj_t& cmd_data);

// hook for metric generation
void generate_opmon_data() override;

// State synch getter & setter

void set_state(std::string s)
{
const std::lock_guard<std::mutex> lock(m_mutex);
Expand All @@ -101,15 +98,12 @@ class Application

private:
std::mutex m_mutex;
std::string m_partition;
opmonlib::InfoManager m_info_mgr;
std::string m_state;
std::atomic<bool> m_busy;
std::atomic<bool> m_error;
bool m_initialized;
std::chrono::time_point<std::chrono::steady_clock> m_run_start_time;
dunedaq::rcif::runinfo::Info m_runinfo;
std::string m_fully_qualified_name;
dunedaq::rcif::opmon::RunInfo m_runinfo;
DAQModuleManager m_mod_mgr;
std::shared_ptr<cmdlib::CommandFacility> m_cmd_fac;
std::shared_ptr<ConfigurationManager> m_config_mgr;
Expand Down
9 changes: 3 additions & 6 deletions src/DAQModuleManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

#include "ers/Issue.hpp"
#include "nlohmann/json.hpp"
#include "opmonlib/InfoCollector.hpp"

#include "appfwk/ConfigurationManager.hpp"
#include "appfwk/ModuleConfiguration.hpp"
Expand All @@ -20,6 +19,7 @@

#include "appfwk/app/Structs.hpp"
#include "cmdlib/cmd/Structs.hpp"
#include "opmonlib/OpMonManager.hpp"

#include <map>
#include <memory>
Expand Down Expand Up @@ -79,20 +79,17 @@ class DAQModuleManager

DAQModuleManager();

void initialize(std::shared_ptr<ConfigurationManager> mgr);
void initialize(std::shared_ptr<ConfigurationManager> mgr, opmonlib::OpMonManager & );
bool initialized() const { return m_initialized; }
void cleanup();

// Execute a properly structured command
void execute(const std::string& state, const std::string& cmd, const dataobj_t& cmd_data);

// Gather statistics from modules
void gather_stats(opmonlib::InfoCollector& ic, int level);

private:
typedef std::map<std::string, std::shared_ptr<DAQModule>> DAQModuleMap_t; ///< DAQModules indexed by name

void init_modules(const std::vector<const dunedaq::confmodel::DaqModule*>& modules);
void init_modules(const std::vector<const dunedaq::confmodel::DaqModule*>& modules, opmonlib::OpMonManager & );
void dispatch_one_match_only(cmdlib::cmd::CmdId id, const std::string& state, const dataobj_t& data);
void dispatch_after_merge(cmdlib::cmd::CmdId id, const std::string& state, const dataobj_t& data);

Expand Down
97 changes: 41 additions & 56 deletions src/detail/Application.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
*/

#include "appfwk/Issues.hpp"
#include "appfwk/appinfo/InfoNljs.hpp"
#include "appfwk/AppInfo.pb.h"
#include "appfwk/cmd/Nljs.hpp"
#include "rcif/cmd/Nljs.hpp"
#include "rcif/runinfo/InfoNljs.hpp"

#include "logging/Logging.hpp"

Expand All @@ -21,24 +20,22 @@ namespace dunedaq {
namespace appfwk {

Application::Application(std::string appname,
std::string partition,
std::string session,
std::string cmdlibimpl,
std::string opmonlibimpl,
std::string confimpl)
: NamedObject(appname)
, m_partition(partition)
, m_info_mgr(opmonlibimpl)
: OpMonManager(session, appname, opmonlibimpl)
, NamedObject(appname)
, m_state("NONE")
, m_busy(false)
, m_error(false)
, m_initialized(false)
, m_config_mgr(std::make_shared<ConfigurationManager>(confimpl, appname, partition))
, m_config_mgr(std::make_shared<ConfigurationManager>(confimpl, appname, session))
{
m_runinfo.running = false;
m_runinfo.runno = 0;
m_runinfo.runtime = 0;
m_runinfo.set_running(false);
m_runinfo.set_run_number(0);
m_runinfo.set_run_time(0);

m_fully_qualified_name = partition + "." + appname;
m_cmd_fac = cmdlib::make_command_facility(cmdlibimpl);

TLOG() << "confimpl=<" << confimpl << ">\n";
Expand All @@ -48,11 +45,7 @@ void
Application::init()
{
m_cmd_fac->set_commanded(*this, get_name());
m_info_mgr.set_provider(*this);
// Add partition id as tag
m_info_mgr.set_tags({ { "partition_id", m_partition } });

m_mod_mgr.initialize(m_config_mgr);
m_mod_mgr.initialize(m_config_mgr, *this);
set_state("INITIAL");
m_initialized = true;
}
Expand All @@ -70,20 +63,22 @@ Application::run(std::atomic<bool>& end_marker)
std::stringstream s1(getenv("DUNEDAQ_OPMON_INTERVAL"));
std::stringstream s2(getenv("DUNEDAQ_OPMON_LEVEL"));
uint32_t interval = 0; // NOLINT(build/unsigned)
uint32_t level = 0; // NOLINT(build/unsigned)
opmonlib::OpMonLevel level = 0;
s1 >> interval;
s2 >> level;

m_info_mgr.start(interval, level);
set_opmon_level(level);

start_monitoring(std::chrono::seconds(interval));
m_cmd_fac->run(end_marker);
m_info_mgr.stop();

stop_monitoring();
m_mod_mgr.cleanup();
}

void
Application::execute(const dataobj_t& cmd_data)
{

auto rc_cmd = cmd_data.get<rcif::cmd::RCCommand>();
std::string cmdname = rc_cmd.id;
if (!is_cmd_valid(cmd_data)) {
Expand All @@ -94,24 +89,25 @@ Application::execute(const dataobj_t& cmd_data)

if (cmdname == "start") {
auto cmd_obj = rc_cmd.data.get<cmd::CmdObj>();

for (const auto& addressed : cmd_obj.modules) {
dataobj_t startpars = addressed.data;
auto rc_startpars = startpars.get<rcif::cmd::StartParams>();
m_runinfo.runno = rc_startpars.run;
m_runinfo.set_run_number(rc_startpars.run);
break;
}

m_run_start_time = std::chrono::steady_clock::now();
;
m_runinfo.running = true;
m_runinfo.runtime = 0;
} else if (cmdname == "stop") {
m_runinfo.running = false;
m_runinfo.runno = 0;
m_runinfo.runtime = 0;
m_runinfo.set_running(true);
m_runinfo.set_run_time(0);
}

else if (cmdname == "stop") {
m_run_start_time = std::chrono::steady_clock::time_point();
m_runinfo.set_running(false);
m_runinfo.set_run_number(0);
m_runinfo.set_run_time(0);
}

try {
m_mod_mgr.execute(get_state(), cmdname, rc_cmd.data);
m_busy.store(false);
Expand All @@ -125,40 +121,29 @@ Application::execute(const dataobj_t& cmd_data)
}

void
Application::gather_stats(opmonlib::InfoCollector& ci, int level)
Application::generate_opmon_data()
{
appinfo::Info ai;
ai.state = get_state();
ai.busy = m_busy.load();
ai.error = m_error.load();
opmon::AppInfo ai;
ai.set_state(get_state());
ai.set_busy(m_busy.load());
ai.set_error(m_error.load());

char hostname[256];
auto res = gethostname(hostname, 256);
if (res < 0)
ai.host = "Unknown";
ai.set_host("Unknown");
else
ai.host = std::string(hostname);

opmonlib::InfoCollector tmp_ci;

tmp_ci.add(ai);
ai.set_host (std::string(hostname));

if (ai.state == "RUNNING" || ai.state == "READY") {
publish(std::move(ai), {}, opmonlib::to_level(opmonlib::EntryOpMonLevel::kTopPriority));

if ( m_run_start_time.time_since_epoch().count() == 0 ) {
auto now = std::chrono::steady_clock::now();
m_runinfo.runtime = std::chrono::duration_cast<std::chrono::seconds>(now - m_run_start_time).count();
m_runinfo.set_run_time(std::chrono::duration_cast<std::chrono::seconds>(now - m_run_start_time).count() );
}
tmp_ci.add(m_runinfo);

if (level == 0) {
// give only generic application info
} else if (ai.state != "NONE" && ai.state != "INITIAL") {
try {
m_mod_mgr.gather_stats(tmp_ci, level);
} catch (ers::Issue& ex) {
ers::error(ex);
}
}
ci.add(m_fully_qualified_name, tmp_ci);


publish( decltype(m_runinfo)(m_runinfo) );
}

bool
Expand Down
Loading
Loading