diff --git a/CMakeLists.txt b/CMakeLists.txt index 224ebe5..b5636d6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,7 +33,7 @@ daq_protobuf_codegen( opmon/*.proto ) ############################################################################## # Main library -daq_add_library(DAQModule*.cpp ConfigurationManager.cpp ModuleConfiguration.cpp +daq_add_library(Application.cpp DAQModule.cpp DAQModuleManager.cpp ConfigurationManager.cpp ModuleConfiguration.cpp LINK_LIBRARIES ${APPFWK_DEPENDENCIES}) ############################################################################## diff --git a/docs/ActionPlans.md b/docs/ActionPlans.md new file mode 100755 index 0000000..7b70f86 --- /dev/null +++ b/docs/ActionPlans.md @@ -0,0 +1,72 @@ +# Action Plans + +## Overview + +An ActionPlan defines a series of ActionSteps, each one consisting of a set of DAQModule classes to run the command on. Each ActionPlan is associated with a FSMCommand object, and is run by the appliction when it recieves the corresponding command. If a command is received and no ActionPlan is defined, the application currently runs a "dummy" ActionPlan consisting of a single step where registered Actions matching the command name are all run in parallel. + +## Defining an ActionPlan + +ActionPlans are defined in configuration using these objects: + +```XML + + + + + + + + +``` + +1. ActionPlan relates a set of ActionSteps to a FSMCommand instance. +1. ActionStep lists the module types to run in parallel at a given point in the sequence + +ActionPlans are validated by the application to ensure that every module type has registered methods corresponding to the command linked to the ActionPlan. + +### Example test/config/appfwk.data.xml + +The DAQModuleManager_test unit test defines several ActionPlans used within the test. For example, the "do_stuff" action: + +```XML + + + + + + + + + + + + + + + + + + + +``` + +The ActionPlans are associated with the Application instance as follows: + +```XML + + + + + + + + + + + +``` + +## Notes + +* DAQModules register their action methods in the same way as before, however the specification of valid states for an action has been removed +* ActionSteps target module types, with the assumption that if multiple modules of the same class are present within an application, they will all run their defined action methods in unison (or at least within the same parallel processing step) diff --git a/docs/README.md b/docs/README.md index 18f31cf..457a215 100644 --- a/docs/README.md +++ b/docs/README.md @@ -10,7 +10,7 @@ appfwk consists of a generic DAQ application (`daq_application`) which can be co appfwk provides the scaffolding on which all DUNE DAQ software processes can be developed. The running DAQ typically consists of multiple distinct processes assigned various tasks: filtering data, requesting it, saving it to storage, etc. There are many different types of process, some of which may not even have been conceived of yet, and it would be cumbersome to recompile multiple different types of process across many packages every time one wanted to change the behavior of the DAQ. To solve this problem, the approach that's been taken is to have a standard DUNE DAQ software process [`daq_application`](Daq-Application.md) which can be configured at runtime by Run Control in order to perform some particular function in the DAQ. -`daq_application` is designed as a flexible container of "DAQ modules" (units of code designed to perform specific tasks) and "connections" (designed to move data between DAQ modules that can be in the same or in different DAQ applications). These specific tasks can vary widely; they include [producing fake data for testing purposes](https://github.com/DUNE-DAQ/readoutmodules/blob/develop/plugins/FakeCardReader.hpp), [putting data into long term storage](https://github.com/DUNE-DAQ/dfmodules/blob/develop/plugins/DataWriterModule.hpp), and so forth. DAQ modules will typically execute user-defined functions when receiving standard transitions from Run Control: "conf", "start", etc. appfwk provides the `DAQModule` base class which users should derive their DAQ module class from in their own packages. +`daq_application` is designed as a flexible container of "DAQ modules" (units of code designed to perform specific tasks) and "connections" (designed to move data between DAQ modules that can be in the same or in different DAQ applications). These specific tasks can vary widely; they include [producing fake data for testing purposes](https://github.com/DUNE-DAQ/readoutmodules/blob/develop/plugins/FakeCardReader.hpp), [putting data into long term storage](https://github.com/DUNE-DAQ/dfmodules/blob/develop/plugins/DataWriterModule.hpp), and so forth. DAQ modules will typically execute user-defined functions when receiving standard transitions from Run Control: "conf", "start", etc. appfwk provides the `DAQModule` base class which users should derive their DAQ module class from in their own packages. Read more about ActionPlans [here](ActionPlans.md). ![daq_application](https://github.com/DUNE-DAQ/appfwk/raw/develop/docs/Application.png) diff --git a/include/appfwk/DAQModule.hpp b/include/appfwk/DAQModule.hpp index f80d10a..e89d7f0 100644 --- a/include/appfwk/DAQModule.hpp +++ b/include/appfwk/DAQModule.hpp @@ -137,12 +137,13 @@ ERS_DECLARE_ISSUE_BASE(appfwk, ///< Namespace /** * @brief The MissingConnection DAQModule ERS Issue */ -ERS_DECLARE_ISSUE_BASE(appfwk, ///< Namespace - MissingConnection, ///< Type of the Issue - appfwk::GeneralDAQModuleIssue, ///< Base class of the Issue - "Required Connection Not Found. Type: " << type << ", direction: " << direction, ///< Log Message from the issue - ((std::string)name), ///< Base class attributes - ((std::string)type)((std::string)direction) ///< Attribute of this class +ERS_DECLARE_ISSUE_BASE(appfwk, ///< Namespace + MissingConnection, ///< Type of the Issue + appfwk::GeneralDAQModuleIssue, ///< Base class of the Issue + "Required Connection Not Found. Type: " << type << ", direction: " + << direction, ///< Log Message from the issue + ((std::string)name), ///< Base class attributes + ((std::string)type)((std::string)direction) ///< Attribute of this class ) // Re-enable coverage collection LCOV_EXCL_STOP @@ -197,11 +198,11 @@ namespace appfwk { * Non-accepted commands or failure should return an ERS exception * indicating this result. */ - void execute_command(const std::string& name, const std::string& state, const data_t& data = {}); + void execute_command(const std::string& name, const data_t& data = {}); std::vector get_commands() const; - bool has_command(const std::string& name, const std::string& state) const; + bool has_command(const std::string& name) const; protected: /** @@ -209,9 +210,7 @@ namespace appfwk { * Returns whether the command was inserted (false meaning that command `cmd` already exists) */ template - void register_command(const std::string& name, - void (Child::*f)(const data_t&), - const std::set& valid_states = std::set{ "ANY" }); + void register_command(const std::string& name, void (Child::*f)(const data_t&)); DAQModule(DAQModule const&) = delete; DAQModule(DAQModule&&) = delete; @@ -219,7 +218,7 @@ namespace appfwk { DAQModule& operator=(DAQModule&&) = delete; private: - using CommandMap_t = std::map, std::function>>; + using CommandMap_t = std::map>; CommandMap_t m_commands; }; diff --git a/include/appfwk/ModuleConfiguration.hpp b/include/appfwk/ModuleConfiguration.hpp index ecef18d..b2f6c20 100644 --- a/include/appfwk/ModuleConfiguration.hpp +++ b/include/appfwk/ModuleConfiguration.hpp @@ -12,6 +12,7 @@ #define APPFWK_INCLUDE_MODULECONFIGURATION_HPP_ #include "appfwk/ConfigurationManager.hpp" +#include "confmodel/ActionPlan.hpp" #include "confmodel/DaqModule.hpp" #include "iomanager/IOManager.hpp" #include "conffwk/Configuration.hpp" @@ -26,10 +27,10 @@ ERS_DECLARE_ISSUE(appfwk, "Application contains a resource " << res << " that is not a DaqModule", ///< Message ((std::string)res) ///< Message parameters ) -ERS_DECLARE_ISSUE(appfwk, ///< Namespace - NotADaqApplication, ///< Issue class name +ERS_DECLARE_ISSUE(appfwk, ///< Namespace + NotADaqApplication, ///< Issue class name "Application " << app << " is neither a DaqApplication nor a SmartDaqApplication ", ///< Message - ((std::string)app) ///< Message parameters + ((std::string)app) ///< Message parameters ) namespace confmodel { @@ -41,10 +42,11 @@ namespace appfwk { class ModuleConfiguration { + std::shared_ptr m_config_mgr; + std::unordered_map m_action_plans; std::vector m_modules; iomanager::Queues_t m_queues; iomanager::Connections_t m_networkconnections; - std::shared_ptr m_config_mgr; public: explicit ModuleConfiguration(std::shared_ptr mgr); @@ -53,6 +55,9 @@ class ModuleConfiguration const iomanager::Connections_t& networkconnections() { return m_networkconnections; } const std::vector& modules() { return m_modules; } + const std::unordered_map& action_plans() { return m_action_plans; } + const dunedaq::confmodel::ActionPlan* action_plan(std::string cmd) const; + std::shared_ptr configuration_manager() { return m_config_mgr; } template diff --git a/include/appfwk/detail/DAQModule.hxx b/include/appfwk/detail/DAQModule.hxx index 482d413..3b29522 100644 --- a/include/appfwk/detail/DAQModule.hxx +++ b/include/appfwk/detail/DAQModule.hxx @@ -3,12 +3,11 @@ namespace dunedaq::appfwk { template void DAQModule::register_command(const std::string& cmd_name, - void (Child::*f)(const data_t&), - const std::set& states) + void (Child::*f)(const data_t&)) { using namespace std::placeholders; - bool done = m_commands.emplace(cmd_name, std::make_pair(states, std::bind(f, dynamic_cast(this), _1))).second; + bool done = m_commands.emplace(cmd_name, std::bind(f, dynamic_cast(this), _1)).second; if (!done) { // Throw here throw CommandRegistrationFailed(ERS_HERE, get_name(), cmd_name); diff --git a/src/detail/Application.hxx b/src/Application.cpp similarity index 98% rename from src/detail/Application.hxx rename to src/Application.cpp index d13d2ea..dd8e92d 100644 --- a/src/detail/Application.hxx +++ b/src/Application.cpp @@ -6,6 +6,8 @@ * received with this code. */ +#include "Application.hpp" + #include "appfwk/Issues.hpp" #include "appfwk/opmon/application.pb.h" #include "appfwk/cmd/Nljs.hpp" @@ -109,7 +111,7 @@ Application::execute(const dataobj_t& cmd_data) } try { - m_mod_mgr.execute(get_state(), cmdname, rc_cmd.data); + m_mod_mgr.execute(cmdname, rc_cmd.data); m_busy.store(false); if (rc_cmd.exit_state != "ANY") set_state(rc_cmd.exit_state); diff --git a/src/Application.hpp b/src/Application.hpp index 954da31..7ac44f9 100644 --- a/src/Application.hpp +++ b/src/Application.hpp @@ -112,6 +112,4 @@ class Application } // namespace appfwk } // namespace dunedaq -#include "detail/Application.hxx" - #endif // APPFWK_INCLUDE_APPFWK_APPLICATION_HPP_ diff --git a/src/DAQModule.cpp b/src/DAQModule.cpp index f92707e..807467d 100644 --- a/src/DAQModule.cpp +++ b/src/DAQModule.cpp @@ -15,15 +15,11 @@ namespace dunedaq::appfwk { void -DAQModule::execute_command(const std::string& cmd_name, const std::string& state, const data_t& data) +DAQModule::execute_command(const std::string& cmd_name, const data_t& data) { if (auto cmd = m_commands.find(cmd_name); cmd != m_commands.end()) { - if (cmd->second.first.find("ANY") != cmd->second.first.end() || - (cmd->second.first.find(state) != cmd->second.first.end())) { - std::invoke(cmd->second.second, data); + std::invoke(cmd->second, data); return; - } - throw InvalidState(ERS_HERE, get_name(), cmd_name, state); } throw UnknownCommand(ERS_HERE, get_name(), cmd_name); } @@ -38,14 +34,10 @@ DAQModule::get_commands() const } bool -DAQModule::has_command(const std::string& cmd_name, const std::string& state) const +DAQModule::has_command(const std::string& cmd_name) const { if (auto cmd = m_commands.find(cmd_name); cmd != m_commands.end()) { - if (cmd->second.first.find("ANY") != cmd->second.first.end() || - (cmd->second.first.find(state) != cmd->second.first.end())) { return true; - } - ers::warning(InvalidState(ERS_HERE, get_name(), cmd_name, state)); } return false; } diff --git a/src/DAQModuleManager.cpp b/src/DAQModuleManager.cpp new file mode 100644 index 0000000..8c8f121 --- /dev/null +++ b/src/DAQModuleManager.cpp @@ -0,0 +1,286 @@ +/** + * @file DAQModuleManager.cpp DAQModuleManager implementataion + * + * This is part of the DUNE DAQ Application Framework, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#include "DAQModuleManager.hpp" + +#include "cmdlib/cmd/Nljs.hpp" + +#include "appfwk/Issues.hpp" +#include "appfwk/app/Nljs.hpp" +#include "appfwk/cmd/Nljs.hpp" + +#include "appfwk/DAQModule.hpp" + +#include "confmodel/ActionStep.hpp" +#include "confmodel/Session.hpp" + +#include "iomanager/IOManager.hpp" + +#include "logging/Logging.hpp" + +#include +#include +#include +#include +#include +#include + +namespace dunedaq { +namespace appfwk { + +DAQModuleManager::DAQModuleManager() + : m_initialized(false) +{ +} + +void +DAQModuleManager::initialize(std::shared_ptr cfgMgr, opmonlib::OpMonManager& opm) +{ + auto csInterval = cfgMgr->session()->get_connectivity_service_interval_ms(); + m_module_configuration = std::make_shared(cfgMgr); + get_iomanager()->configure(m_module_configuration->queues(), + m_module_configuration->networkconnections(), + true, + std::chrono::milliseconds(csInterval), + opm); + init_modules(m_module_configuration->modules(), opm); + + for (auto& plan_pair : m_module_configuration->action_plans()) { + auto cmd = plan_pair.first; + + for (auto& step : plan_pair.second->get_steps()) { + for (auto& mod_type : step->get_modules()) { + if (!m_modules_by_type.count(mod_type) || m_modules_by_type[mod_type].size() == 0) { + throw ActionPlanValidationFailed(ERS_HERE, cmd, mod_type, "Module does not exist"); + } + auto module_test = m_module_map[m_modules_by_type[mod_type][0]]; + if (!module_test->has_command(cmd)) { + throw ActionPlanValidationFailed(ERS_HERE, cmd, mod_type, "Module does not have method " + cmd); + } + } + } + } + + this->m_initialized = true; +} + +void +DAQModuleManager::init_modules(const std::vector& modules, + opmonlib::OpMonManager& opm) +{ + for (const auto mod : modules) { + TLOG_DEBUG(0) << "construct: " << mod->class_name() << " : " << mod->UID(); + auto mptr = make_module(mod->class_name(), mod->UID()); + m_module_map.emplace(mod->UID(), mptr); + + if (!m_modules_by_type.count(mod->class_name())) { + m_modules_by_type[mod->class_name()] = std::vector(); + } + m_modules_by_type[mod->class_name()].emplace_back(mod->UID()); + + opm.register_node(mod->UID(), mptr); + mptr->init(m_module_configuration); + } +} + +void +DAQModuleManager::cleanup() +{ + get_iomanager()->reset(); + this->m_initialized = false; +} + +DAQModuleManager::dataobj_t +DAQModuleManager::get_dataobj_for_module(const std::string& mod_name, const dataobj_t& cmd_data) +{ + auto cmd_obj = cmd_data.get(); + const dataobj_t dummy{}; + + if (!cmd_obj.modules.empty()) { + for (const auto& addressed : cmd_obj.modules) { + + // First exception: empty = `all` + if (addressed.match.empty()) { + return addressed.data; + } else { + // match module name with regex + if (std::regex_match(mod_name, std::regex(addressed.match))) { + return addressed.data; + } + } + } + } + // No matches + return dummy; +} + +bool +DAQModuleManager::execute_action(const std::string& module_name, const std::string& action, const dataobj_t& data_obj) +{ + try { + TLOG_DEBUG(2) << "Executing " << module_name << " -> " << action; + m_module_map[module_name]->execute_command(action, data_obj); + } catch (ers::Issue& ex) { + ers::error(ex); + return false; + } + return true; +} + +void +DAQModuleManager::execute_action_plan_step(std::string const& cmd, + const confmodel::ActionStep* step, + const dataobj_t& cmd_data) +{ + std::string failed_mod_names(""); + std::unordered_map> futures; + + for (auto& mod_class : step->get_modules()) { + auto modules = m_modules_by_type[mod_class]; + for (auto& mod_name : modules) { + + auto data_obj = get_dataobj_for_module(mod_name, cmd_data); + TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod_class << ")"; + futures[mod_name] = + std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, data_obj); + } + } + + for (auto& future : futures) { + future.second.wait(); + auto ret = future.second.get(); + if (!ret) { + failed_mod_names.append(future.first); + failed_mod_names.append(", "); + } + } + // Throw if any dispatching failed + if (!failed_mod_names.empty()) { + throw CommandDispatchingFailed(ERS_HERE, cmd, failed_mod_names); + } +} + +std::vector +DAQModuleManager::get_modnames_by_cmdid(cmdlib::cmd::CmdId id) +{ + // Make a convenience array with module names that have the requested command + std::vector mod_names; + for (const auto& [mod_name, mod_ptr] : m_module_map) { + if (mod_ptr->has_command(id)) + mod_names.push_back(mod_name); + } + + return mod_names; +} + +void +DAQModuleManager::check_cmd_data(const std::string& id, const dataobj_t& cmd_data) +{ + // This method ensures that each module is only matched once per command. + // If multiple matches are found, an ers::Issue is thrown + // Disclaimenr for the occasional reader: this is the first implementation of the + // multiple-matches detection logic. The author is painfully aware that it can be + // vastly improved, in style if not in performance. + + auto cmd_obj = cmd_data.get(); + const dataobj_t dummy{}; + + // Make a convenience array with module names that have the requested command + std::vector cmd_mod_names = get_modnames_by_cmdid(id); + + // containers for error tracking + std::map> mod_to_re; + + if (!cmd_obj.modules.empty()) { + for (const auto& addressed : cmd_obj.modules) { + if (!addressed.match.empty()) { + // Find module names matching the regex + for (const std::string& mod_name : cmd_mod_names) { + // match module name with regex + if (std::regex_match(mod_name, std::regex(addressed.match))) { + mod_to_re[mod_name].push_back(addressed.match); + } + } + } + } + + // Select modules with multiple matches + for (auto i = mod_to_re.begin(), last = mod_to_re.end(); i != last;) { + if (i->second.size() == 1) { + i = mod_to_re.erase(i); + } else { + ++i; + } + } + + // Catch cases + if (mod_to_re.size() > 0) { + std::string mod_names; + for (const auto& [mod_name, matched_re] : mod_to_re) { + mod_names += mod_name + ", "; + } + throw ConflictingCommandMatching(ERS_HERE, id, mod_names); + } + } +} + +void +DAQModuleManager::execute(const std::string& cmd, const dataobj_t& cmd_data) +{ + + TLOG_DEBUG(1) << "Command id:" << cmd; + + if (!m_initialized) { + throw DAQModuleManagerNotInitialized(ERS_HERE, cmd); + } + + check_cmd_data(cmd, cmd_data); + + auto action_plan = m_module_configuration->action_plan(cmd); + if (action_plan == nullptr) { +#if 0 + throw ActionPlanNotFound(ERS_HERE, cmd, "Throwing exception"); +#elif 0 + ers::warning(ActionPlanNotFound(ERS_HERE, cmd, "Returning without executing actions")); + return; +#else + // Emulate old behavior + ers::info(ActionPlanNotFound(ERS_HERE, cmd, "Executing action on all modules in parallel")); + std::string failed_mod_names(""); + std::unordered_map> futures; + + auto mods = get_modnames_by_cmdid(cmd); + for (auto& mod : mods) { + TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod; + auto data_obj = get_dataobj_for_module(mod, cmd_data); + futures[mod] = std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod, cmd, data_obj); + } + + for (auto& future : futures) { + future.second.wait(); + auto ret = future.second.get(); + if (!ret) { + failed_mod_names.append(future.first); + failed_mod_names.append(", "); + } + } + // Throw if any dispatching failed + if (!failed_mod_names.empty()) { + throw CommandDispatchingFailed(ERS_HERE, cmd, failed_mod_names); + } +#endif + } else { + // We validated the action plans already + for (auto& step : action_plan->get_steps()) { + execute_action_plan_step(cmd, step, cmd_data); + } + } +} + +} // namespace appfwk +} // namespace dunedaq diff --git a/src/DAQModuleManager.hpp b/src/DAQModuleManager.hpp index 1d47758..dd55a60 100644 --- a/src/DAQModuleManager.hpp +++ b/src/DAQModuleManager.hpp @@ -14,6 +14,7 @@ #include "appfwk/ConfigurationManager.hpp" #include "appfwk/ModuleConfiguration.hpp" +#include "confmodel/ActionStep.hpp" #include "confmodel/DaqModule.hpp" #include "conffwk/Configuration.hpp" @@ -53,19 +54,29 @@ ERS_DECLARE_ISSUE(appfwk, ((std::string)modules) ///< Message parameters ) -ERS_DECLARE_ISSUE(appfwk, ///< Namespace - FailedInfoGathering, ///< Issue class name - "Info gathering failed for module: " << module, ///< Message - ((std::string)module) ///< Message parameters +ERS_DECLARE_ISSUE(appfwk, ///< Namespace + FailedInfoGathering, ///< Issue class name + "Info gathering failed for module: " << module, ///< Message + ((std::string)module) ///< Message parameters ) -ERS_DECLARE_ISSUE_BASE(appfwk, ///< Namespace - ExceptionWhileInfoGathering, ///< Issue class name - FailedInfoGathering, ///< Base Issue class name - module << " threw exception while info gathering: " << message, ///< Message - ((std::string)module), ///< Base Issue params - ((std::string)message) ///< This class params +ERS_DECLARE_ISSUE_BASE(appfwk, ///< Namespace + ExceptionWhileInfoGathering, ///< Issue class name + FailedInfoGathering, ///< Base Issue class name + module << " threw exception while info gathering: " << message, ///< Message + ((std::string)module), ///< Base Issue params + ((std::string)message) ///< This class params ) + +ERS_DECLARE_ISSUE(appfwk, + ActionPlanNotFound, + "No action plan found for command " << cmd << ", taking the following action: " << message, + ((std::string)cmd)((std::string)message)) + +ERS_DECLARE_ISSUE(appfwk, + ActionPlanValidationFailed, + "Error validating action plan " << cmd << ", module " << module << ": " << message, + ((std::string)cmd)((std::string)module)((std::string)message)) // Re-enable coverage collection LCOV_EXCL_STOP namespace appfwk { @@ -84,26 +95,28 @@ class DAQModuleManager void cleanup(); // Execute a properly structured command - void execute(const std::string& state, const std::string& cmd, const dataobj_t& cmd_data); + void execute(const std::string& cmd, const dataobj_t& cmd_data); private: typedef std::map> DAQModuleMap_t; ///< DAQModules indexed by name void init_modules(const std::vector& modules, opmonlib::OpMonManager & ); - void dispatch_one_match_only(cmdlib::cmd::CmdId id, const std::string& state, const dataobj_t& data); - void dispatch_after_merge(cmdlib::cmd::CmdId id, const std::string& state, const dataobj_t& data); - std::vector get_modnames_by_cmdid(cmdlib::cmd::CmdId id, const std::string& state); + void check_cmd_data(const std::string& id, const dataobj_t& cmd_data); + dataobj_t get_dataobj_for_module(const std::string& mod_name, const dataobj_t& cmd_data); + bool execute_action(const std::string& mod_name, const std::string& action, const dataobj_t& data_obj); + void execute_action_plan_step(const std::string& cmd, const confmodel::ActionStep* step, const dataobj_t& cmd_data); + + std::vector get_modnames_by_cmdid(cmdlib::cmd::CmdId id); std::shared_ptr m_module_configuration; bool m_initialized; DAQModuleMap_t m_module_map; + std::map> m_modules_by_type; }; } // namespace appfwk } // namespace dunedaq -#include "detail/DAQModuleManager.hxx" - #endif // APPFWK_INCLUDE_APPFWK_DAQMODULEMANAGER_HPP_ diff --git a/src/ModuleConfiguration.cpp b/src/ModuleConfiguration.cpp index 687c843..bb06cb1 100644 --- a/src/ModuleConfiguration.cpp +++ b/src/ModuleConfiguration.cpp @@ -9,14 +9,15 @@ #include "appfwk/ModuleConfiguration.hpp" #include "appmodel/SmartDaqApplication.hpp" +#include "conffwk/Configuration.hpp" #include "confmodel/DaqApplication.hpp" #include "confmodel/DaqModule.hpp" +#include "confmodel/FSMCommand.hpp" #include "confmodel/NetworkConnection.hpp" #include "confmodel/Queue.hpp" #include "confmodel/ResourceSet.hpp" #include "confmodel/Service.hpp" #include "confmodel/Session.hpp" -#include "conffwk/Configuration.hpp" #include #include @@ -26,6 +27,7 @@ using namespace dunedaq::appfwk; ModuleConfiguration::ModuleConfiguration(std::shared_ptr cfMgr) : m_config_mgr(cfMgr) + , m_action_plans() { auto session = cfMgr->session(); auto application = cfMgr->application(); @@ -37,14 +39,22 @@ ModuleConfiguration::ModuleConfiguration(std::shared_ptr c auto cpos = cfMgr->m_oks_config_spec.find(":") + 1; std::string oksFile = cfMgr->m_oks_config_spec.substr(cpos); // Strip off "oksconflibs:" m_modules = smartDaqApp->generate_modules(confdb.get(), oksFile, session); - } - else { + + for (auto& plan : smartDaqApp->get_action_plans()) { + TLOG_DBG(6) << "Registering action plan " << plan->UID() << " for cmd " << plan->get_fsm()->get_cmd(); + m_action_plans[plan->get_fsm()->get_cmd()] = plan; + } + } else { auto daqApp = application->cast(); if (daqApp) { m_modules = daqApp->get_modules(); - } - else { - throw (NotADaqApplication(ERS_HERE, application->UID())); + + for (auto& plan : daqApp->get_action_plans()) { + TLOG_DBG(6) << "Registering action plan " << plan->UID() << " for cmd " << plan->get_fsm()->get_cmd(); + m_action_plans[plan->get_fsm()->get_cmd()] = plan; + } + } else { + throw(NotADaqApplication(ERS_HERE, application->UID())); } } @@ -110,3 +120,12 @@ ModuleConfiguration::ModuleConfiguration(std::shared_ptr c } } } + +const dunedaq::confmodel::ActionPlan* +dunedaq::appfwk::ModuleConfiguration::action_plan(std::string cmd) const +{ + if (m_action_plans.count(cmd)) { + return m_action_plans.at(cmd); + } + return nullptr; +} diff --git a/src/detail/DAQModuleManager.hxx b/src/detail/DAQModuleManager.hxx deleted file mode 100644 index 6948804..0000000 --- a/src/detail/DAQModuleManager.hxx +++ /dev/null @@ -1,230 +0,0 @@ -/** - * @file DAQModuleManager.cpp DAQModuleManager implementataion - * - * This is part of the DUNE DAQ Application Framework, copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ - -#include "cmdlib/cmd/Nljs.hpp" - -#include "appfwk/Issues.hpp" -#include "appfwk/app/Nljs.hpp" -#include "appfwk/cmd/Nljs.hpp" - -#include "appfwk/DAQModule.hpp" - -#include "confmodel/Session.hpp" - -#include "iomanager/IOManager.hpp" - -#include "logging/Logging.hpp" - -#include -#include -#include -#include -#include - -namespace dunedaq { -namespace appfwk { - -DAQModuleManager::DAQModuleManager() - : m_initialized(false) -{ -} - -void -DAQModuleManager::initialize(std::shared_ptr cfgMgr, opmonlib::OpMonManager & opm) -{ - auto csInterval = cfgMgr->session()->get_connectivity_service_interval_ms(); - m_module_configuration = std::make_shared(cfgMgr); - get_iomanager()->configure(m_module_configuration->queues(), - m_module_configuration->networkconnections(), - true, - std::chrono::milliseconds(csInterval), - opm); - init_modules(m_module_configuration->modules(), opm); - this->m_initialized = true; -} - -void -DAQModuleManager::init_modules(const std::vector& modules, opmonlib::OpMonManager & opm) -{ - for (const auto mod : modules) { - TLOG_DEBUG(0) << "construct: " << mod->class_name() << " : " << mod->UID(); - auto mptr = make_module(mod->class_name(), mod->UID()); - m_module_map.emplace(mod->UID(), mptr); - opm.register_node( mod->UID(), mptr); - mptr->init(m_module_configuration); - } -} - -void -DAQModuleManager::cleanup() -{ - get_iomanager()->reset(); - this->m_initialized = false; -} - -void -DAQModuleManager::dispatch_after_merge(cmdlib::cmd::CmdId id, const std::string& state, const dataobj_t& data) -{ - // The command dispatching: commands and parameters are distributed to all modules that - // have registered a method corresponding to the command. If no parameters are found, an - // empty dataobj_t is passed. - std::string bad_mod_names(""); - auto cmd_obj = data.get(); - for (const auto& [mod_name, mod_ptr] : m_module_map) { - if (mod_ptr->has_command(id, state)) { - dataobj_t params; - for (const auto& addressed : cmd_obj.modules) { - if (addressed.match.empty() || std::regex_match(mod_name.c_str(), std::regex(addressed.match.c_str()))) { - for (nlohmann::json::const_iterator it = addressed.data.begin(); it != addressed.data.end(); ++it) { - params[it.key()] = it.value(); - } - } - } - TLOG_DEBUG(2) << "Dispatch \"" << id << "\" to \"" << mod_ptr->get_name() << "\":\n" << params.dump(4); - try { - mod_ptr->execute_command(id, state, params); - } catch (ers::Issue& ex) { - ers::error(ex); - bad_mod_names.append(mod_name); - bad_mod_names.append(", "); - } - } - } - if (!bad_mod_names.empty()) { - throw CommandDispatchingFailed(ERS_HERE, id, bad_mod_names); - } -} - -std::vector -DAQModuleManager::get_modnames_by_cmdid(cmdlib::cmd::CmdId id, const std::string& state) -{ - // Make a convenience array with module names that have the requested command - std::vector mod_names; - for (const auto& [mod_name, mod_ptr] : m_module_map) { - if (mod_ptr->has_command(id, state)) - mod_names.push_back(mod_name); - } - - return mod_names; -} - -void -DAQModuleManager::dispatch_one_match_only(cmdlib::cmd::CmdId id, const std::string& state, const dataobj_t& data) -{ - // This method ensures that each module is only matched once per command. - // If multiple matches are found, an ers::Issue is thrown - // Disclaimenr for the occasional reader: this is the first implementation of the - // multiple-matches detection logic. The author is painfully aware that it can be - // vastly improved, in style if not in performance. - - auto cmd_obj = data.get(); - const dataobj_t dummy{}; - - // Make a convenience array with module names that have the requested command - std::vector cmd_mod_names = get_modnames_by_cmdid(id, state); - - // containers for error tracking - std::vector unmatched_addr; - std::map> mod_to_re; - - std::vector, const dataobj_t*>> mod_seq; - - if (!cmd_obj.modules.empty()) { - for (const auto& addressed : cmd_obj.modules) { - - // Module names matching the 'match' regex - std::vector matches; - - // First exception: empty = `all` - if (addressed.match.empty()) { - matches = cmd_mod_names; - } else { - // Find module names matching the regex - for (const std::string& mod_name : cmd_mod_names) { - // match module name with regex - if (std::regex_match(mod_name, std::regex(addressed.match))) { - matches.push_back(mod_name); - mod_to_re[mod_name].push_back(addressed.match); - } - } - - // Keep track of unmatched expressions - if (matches.empty()) { - unmatched_addr.push_back(addressed.match); - continue; - } - } - mod_seq.emplace_back(matches, &addressed.data); - } - - if (!unmatched_addr.empty()) { - // say something! - // or not? - } - - // Select modules with multiple matches - for (auto i = mod_to_re.begin(), last = mod_to_re.end(); i != last;) { - if (i->second.size() == 1) { - i = mod_to_re.erase(i); - } else { - ++i; - } - } - - // Catch cases - if (mod_to_re.size() > 0) { - std::string mod_names; - for (const auto& [mod_name, matched_re] : mod_to_re) { - mod_names += mod_name + ", "; - } - throw ConflictingCommandMatching(ERS_HERE, id, mod_names); - } - - } else { - mod_seq.emplace_back(cmd_mod_names, &dummy); - } - - std::string failed_mod_names(""); - - // All sorted, execute! - for (auto& [mod_names, data_ptr] : mod_seq) { - for (auto& mod_name : mod_names) { - try { - TLOG_DEBUG(2) << "Executing " << id << " -> " << mod_name; - m_module_map[mod_name]->execute_command(id, state, *data_ptr); - } catch (ers::Issue& ex) { - ers::error(ex); - failed_mod_names.append(mod_name); - failed_mod_names.append(", "); - } - } - } - - // Throw if any dispatching failed - if (!failed_mod_names.empty()) { - throw CommandDispatchingFailed(ERS_HERE, id, failed_mod_names); - } -} - -void -DAQModuleManager::execute(const std::string& state, const std::string& cmd, const dataobj_t& cmd_data) -{ - - TLOG_DEBUG(1) << "Command id:" << cmd; - - if (!m_initialized) { - throw DAQModuleManagerNotInitialized(ERS_HERE, cmd); - } - - dispatch_one_match_only(cmd, state, cmd_data); - - // dispatch(cmd.id, cmd.data); -} - -} // namespace appfwk -} // namespace dunedaq diff --git a/test/config/appSession.data.xml b/test/config/appSession.data.xml index ec2431a..4156794 100755 --- a/test/config/appSession.data.xml +++ b/test/config/appSession.data.xml @@ -284,7 +284,7 @@ - + diff --git a/test/config/appfwk.data.xml b/test/config/appfwk.data.xml index 6fdefce..ea804d8 100755 --- a/test/config/appfwk.data.xml +++ b/test/config/appfwk.data.xml @@ -63,30 +63,110 @@ - + - - - - - - + + + + + + + + + + + + + + - - - - - - + + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/test/plugins/DummyModule.hpp b/test/plugins/DummyModule.hpp index 50d3eec..eb806ea 100644 --- a/test/plugins/DummyModule.hpp +++ b/test/plugins/DummyModule.hpp @@ -37,8 +37,7 @@ class DummyParentModule : public DAQModule explicit DummyParentModule(const std::string& name) : DAQModule(name) { - std::set valid_states{ "INITIAL", "RUNNING" }; - register_command("stuff", &DummyParentModule::do_stuff, valid_states); + register_command("stuff", &DummyParentModule::do_stuff); } void init(std::shared_ptr) final {} diff --git a/unittest/DAQModuleManager_test.cxx b/unittest/DAQModuleManager_test.cxx index 9dfbe2a..cd44863 100644 --- a/unittest/DAQModuleManager_test.cxx +++ b/unittest/DAQModuleManager_test.cxx @@ -39,7 +39,7 @@ make_config_mgr() { std::string oksConfig = "oksconflibs:test/config/appSession.data.xml"; std::string appName = "TestApp"; - std::string sessionName = "partition_name"; + std::string sessionName = "test-session"; return std::make_shared(oksConfig, appName, sessionName); } @@ -72,7 +72,7 @@ BOOST_AUTO_TEST_CASE(NotInitialized) nlohmann::json cmd_data; // to_json(cmd_data, cmd); - BOOST_REQUIRE_EXCEPTION(mgr.execute("CONFIGURED", "start", cmd_data), + BOOST_REQUIRE_EXCEPTION(mgr.execute("start", cmd_data), DAQModuleManagerNotInitialized, [&](DAQModuleManagerNotInitialized) { return true; }); } @@ -91,6 +91,47 @@ BOOST_AUTO_TEST_CASE(InitializeModules) BOOST_REQUIRE_EQUAL(mgr.initialized(), true); } +#if 0 +BOOST_AUTO_TEST_CASE(NoActionPlan) +{ + + dunedaq::get_iomanager()->reset(); + auto mgr = DAQModuleManager(); + BOOST_REQUIRE_EQUAL(mgr.initialized(), false); + + dunedaq::opmonlib::TestOpMonManager opmgr; + auto cfgMgr = make_config_mgr(); + mgr.initialize(cfgMgr, opmgr); + + BOOST_REQUIRE_EQUAL(mgr.initialized(), true); + nlohmann::json cmd_data; + BOOST_REQUIRE_EXCEPTION( + mgr.execute("unknown_cmd", cmd_data), ActionPlanNotFound, [&](ActionPlanNotFound) { return true; }); +} +#endif + +BOOST_AUTO_TEST_CASE(InvalidActionPlan) +{ + + dunedaq::get_iomanager()->reset(); + auto mgr = DAQModuleManager(); + BOOST_REQUIRE_EQUAL(mgr.initialized(), false); + + std::string oksConfig = "oksconflibs:test/config/appSession.data.xml"; + std::string appName = "MissingModuleApp"; + std::string sessionName = "test-session"; + dunedaq::opmonlib::TestOpMonManager opmgr; + auto cfgMgr = std::make_shared(oksConfig, appName, sessionName); + BOOST_REQUIRE_EXCEPTION( + mgr.initialize(cfgMgr, opmgr), ActionPlanValidationFailed, [&](ActionPlanValidationFailed) { return true; }); + + dunedaq::get_iomanager()->reset(); + appName = "MissingMethodApp"; + cfgMgr = std::make_shared(oksConfig, appName, sessionName); + BOOST_REQUIRE_EXCEPTION( + mgr.initialize(cfgMgr, opmgr), ActionPlanValidationFailed, [&](ActionPlanValidationFailed) { return true; }); +} + BOOST_AUTO_TEST_CASE(CommandModules) { dunedaq::get_iomanager()->reset(); @@ -103,11 +144,10 @@ BOOST_AUTO_TEST_CASE(CommandModules) BOOST_REQUIRE_EQUAL(mgr.initialized(), true); nlohmann::json cmd_data; - mgr.execute("RUNNING", "stuff", cmd_data); + mgr.execute("stuff", cmd_data); - BOOST_REQUIRE_EXCEPTION(mgr.execute("RUNNING", "bad_stuff", cmd_data), - CommandDispatchingFailed, - [&](CommandDispatchingFailed) { return true; }); + BOOST_REQUIRE_EXCEPTION( + mgr.execute("bad_stuff", cmd_data), CommandDispatchingFailed, [&](CommandDispatchingFailed) { return true; }); } @@ -131,12 +171,12 @@ BOOST_AUTO_TEST_CASE(CommandMatchingModules) addr_cmd.match = "foo"; cmd_obj.modules.push_back(addr_cmd); to_json(cmd_obj_data, cmd_obj); - mgr.execute("RUNNING", "stuff", cmd_obj_data); + mgr.execute("stuff", cmd_obj_data); addr_cmd.match = ".*module.*"; cmd_obj.modules.push_back(addr_cmd); to_json(cmd_obj_data, cmd_obj); - BOOST_REQUIRE_EXCEPTION(mgr.execute("RUNNING", "bad_stuff", cmd_obj_data), + BOOST_REQUIRE_EXCEPTION(mgr.execute("bad_stuff", cmd_obj_data), ConflictingCommandMatching, [&](ConflictingCommandMatching) { return true; }); }