Skip to content

Commit

Permalink
Merge pull request #151 from DUNE-DAQ/feature/tstamp_estimator_move
Browse files Browse the repository at this point in the history
TimeSync and TimestampEstimator are now in utilities
  • Loading branch information
bieryAtFnal authored Aug 29, 2023
2 parents f9a99ab + afd2815 commit e95443b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 34 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ find_package(detchannelmaps REQUIRED)
find_package(dfmessages REQUIRED)
find_package(iomanager REQUIRED)
find_package(RdKafka REQUIRED)
find_package(timinglibs REQUIRED)
find_package(utilities REQUIRED)
find_package(Boost COMPONENTS unit_test_framework program_options REQUIRED)
find_package(fftw REQUIRED)

Expand All @@ -31,7 +31,7 @@ detdataformats::detdataformats
fddetdataformats::fddetdataformats
detchannelmaps::detchannelmaps
dfmessages::dfmessages
timinglibs::timinglibs
utilities::utilities
)

if(WITH_PYTHON_SUPPORT)
Expand Down
2 changes: 1 addition & 1 deletion cmake/dqmConfig.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ find_dependency(daqdataformats)
find_dependency(detdataformats)
find_dependency(fddetdataformats)
find_dependency(detchannelmaps)
find_dependency(timinglibs)
find_dependency(utilities)
find_dependency(HighFive)


Expand Down
41 changes: 14 additions & 27 deletions plugins/DQMProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,17 @@ DQMProcessor::do_configure(const nlohmann::json& args)
void
DQMProcessor::do_start(const nlohmann::json& args)
{
if (m_mode == "readout") {
m_run_number.store(daqdataformats::run_number_t(args.at("run").get<daqdataformats::run_number_t>()));

m_time_est.reset(new timinglibs::TimestampEstimator(m_clock_frequency));
if (m_mode == "readout") {

m_received_timesync_count.store(0);
m_time_est.reset(new utilities::TimestampEstimator(m_run_number, m_clock_frequency));

// Subscribe to all TimeSync messages
if (m_timesync_receiver) {
m_timesync_receiver->add_callback(std::bind(&DQMProcessor::dispatch_timesync, this, std::placeholders::_1));
m_timesync_receiver->add_callback(std::bind(&utilities::TimestampEstimator::timesync_callback<dfmessages::TimeSync>,
reinterpret_cast<utilities::TimestampEstimator*>(m_time_est.get()),
std::placeholders::_1));
}
}

Expand All @@ -178,24 +180,25 @@ DQMProcessor::do_start(const nlohmann::json& args)

m_dqm_args.run_mark = std::make_shared<std::atomic<bool>>(true);

m_run_number.store(daqdataformats::run_number_t(args.at("run").get<daqdataformats::run_number_t>()));

m_running_thread.reset(new std::thread(&DQMProcessor::do_work, this));
}

void
DQMProcessor::do_drain_dataflow(const data_t&)
{
m_dqm_args.run_mark->store(false);
m_running_thread->join();

// 15-Aug-2023, KAB: operations which involve the TimestampEstimator (m_time_est) need to come *before*
// the worker thread is stopped because the m_time_est pointer is reset at the end of the work function.
if (m_mode == "readout") {
if (m_timesync_receiver) {
m_timesync_receiver->remove_callback();
}
TLOG() << get_name() << ": received " << m_received_timesync_count.load() << " TimeSync messages.";
TLOG() << get_name() << ": received " << m_time_est->get_received_timesync_count() << " TimeSync messages.";
}
else if (m_mode == "df") {

m_dqm_args.run_mark->store(false);
m_running_thread->join();

if (m_mode == "df") {
get_iomanager()->remove_callback<std::unique_ptr<daqdataformats::TriggerRecord>>(m_df2dqm_connection);
}
}
Expand Down Expand Up @@ -636,22 +639,6 @@ DQMProcessor::dfrequest()
get_iom_sender<dfmessages::TRMonRequest>(m_dqm2df_connection)->send(std::move(trmon), m_sink_timeout);
}


void
DQMProcessor::dispatch_timesync(dfmessages::TimeSync& timesyncmsg)
{
++m_received_timesync_count;
TLOG_DEBUG(13) << "Received TimeSync message with DAQ time= " << timesyncmsg.daq_time
<< ", run=" << timesyncmsg.run_number << " (local run number is " << m_run_number << ")";
if (m_time_est.get() != nullptr) {
if (timesyncmsg.run_number == m_run_number) {
m_time_est->add_timestamp_datapoint(timesyncmsg);
} else {
TLOG_DEBUG(0) << "Discarded TimeSync message from run " << timesyncmsg.run_number << " during run " << m_run_number;
}
}
}

} // namespace dqm
} // namespace dunedaq

Expand Down
7 changes: 3 additions & 4 deletions plugins/DQMProcessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
#include "iomanager/Receiver.hpp"
#include "daqdataformats/TriggerRecord.hpp"
#include "dfmessages/TriggerDecision.hpp"
#include "timinglibs/TimestampEstimator.hpp"
#include "utilities/TimestampEstimator.hpp"
#include "dfmessages/TimeSync.hpp"
#include "ipm/Receiver.hpp"

#include "iomanager/queue/FollyQueue.hpp"
Expand Down Expand Up @@ -59,7 +60,6 @@ class DQMProcessor : public dunedaq::appfwk::DAQModule
void do_drain_dataflow(const data_t&);
void do_configure(const data_t&);

void dispatch_timesync(dfmessages::TimeSync& timesyncmsg);
void dispatch_trigger_record(std::unique_ptr<daqdataformats::TriggerRecord>& tr);

void do_work();
Expand Down Expand Up @@ -94,7 +94,7 @@ class DQMProcessor : public dunedaq::appfwk::DAQModule
std::string m_df2dqm_connection;
std::string m_dqm2df_connection;

std::unique_ptr<timinglibs::TimestampEstimator> m_time_est;
std::unique_ptr<utilities::TimestampEstimator> m_time_est;

std::atomic<daqdataformats::run_number_t> m_run_number;

Expand All @@ -110,7 +110,6 @@ class DQMProcessor : public dunedaq::appfwk::DAQModule
std::atomic<int> m_total_request_count{ 0 };
std::atomic<int> m_data_count{ 0 };
std::atomic<int> m_total_data_count{ 0 };
std::atomic<uint64_t> m_received_timesync_count{ 0 }; // NOLINT(build/unsigned)

std::string m_channel_map;

Expand Down

0 comments on commit e95443b

Please sign in to comment.