diff --git a/include/iomanager/IOManager.hpp b/include/iomanager/IOManager.hpp index f0f6d87..f53ce94 100644 --- a/include/iomanager/IOManager.hpp +++ b/include/iomanager/IOManager.hpp @@ -54,6 +54,8 @@ class IOManager void reset(); + bool senders_are_ready(); + template std::shared_ptr> get_sender(ConnectionId id); diff --git a/include/iomanager/Sender.hpp b/include/iomanager/Sender.hpp index a177e63..f260654 100644 --- a/include/iomanager/Sender.hpp +++ b/include/iomanager/Sender.hpp @@ -15,6 +15,8 @@ #include "logging/Logging.hpp" #include "utilities/NamedObject.hpp" +#include + namespace dunedaq::iomanager { // Typeless @@ -33,6 +35,7 @@ class Sender : public utilities::NamedObject virtual ~Sender() = default; ConnectionId id() const { return m_conn; } + virtual bool is_ready_for_sending(timeout_t timeout) = 0; protected: ConnectionId m_conn; @@ -45,11 +48,11 @@ class SenderConcept : public Sender public: explicit SenderConcept(ConnectionId const& conn_id) : Sender(conn_id) - {} - virtual void send(Datatype&& data, Sender::timeout_t timeout) = 0; // NOLINT - virtual bool try_send(Datatype&& data, Sender::timeout_t timeout) = 0; // NOLINT - virtual void send_with_topic(Datatype&& data, Sender::timeout_t timeout, std::string topic) = 0; // NOLINT - virtual bool is_ready_for_sending(Sender::timeout_t timeout) = 0; // NOLINT + { + } + virtual void send(Datatype&& data, Sender::timeout_t timeout) = 0; + virtual bool try_send(Datatype&& data, Sender::timeout_t timeout) = 0; + virtual void send_with_topic(Datatype&& data, Sender::timeout_t timeout, std::string topic) = 0; }; } // namespace dunedaq::iomanager diff --git a/src/IOManager.cpp b/src/IOManager.cpp index ce4afab..4773d0c 100755 --- a/src/IOManager.cpp +++ b/src/IOManager.cpp @@ -9,6 +9,8 @@ #include "iomanager/IOManager.hpp" #include +#include +#include std::shared_ptr dunedaq::iomanager::IOManager::s_instance = nullptr; @@ -51,6 +53,21 @@ dunedaq::iomanager::IOManager::reset() s_instance = nullptr; } +bool +dunedaq::iomanager::IOManager::senders_are_ready() +{ + auto ready = true; + + for (auto& sender_pair : m_senders) { + if (!sender_pair.second->is_ready_for_sending(Sender::timeout_t(1))) { + ready = false; + break; + } + } + + return ready; +} + std::set dunedaq::iomanager::IOManager::get_datatypes(std::string const& uid) { diff --git a/unittest/IOManager_test.cxx b/unittest/IOManager_test.cxx index dddbedb..2553c18 100755 --- a/unittest/IOManager_test.cxx +++ b/unittest/IOManager_test.cxx @@ -251,6 +251,8 @@ BOOST_FIXTURE_TEST_CASE(SimpleSendReceive, ConfigurationTestFixture) auto q_sender = IOManager::get()->get_sender(queue_id); auto q_receiver = IOManager::get()->get_receiver(queue_id); + BOOST_REQUIRE(IOManager::get()->senders_are_ready()); + Data sent_nw(56, 26.5, "test1"); Data sent_q(57, 27.5, "test2"); net_sender->send(std::move(sent_nw), dunedaq::iomanager::Sender::s_no_block); @@ -277,6 +279,8 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture) auto sub2_receiver = IOManager::get()->get_receiver(sub2_id); auto sub3_receiver = IOManager::get()->get_receiver(sub3_id); + BOOST_REQUIRE(IOManager::get()->senders_are_ready()); + // Sub1 is subscribed to all data_t publishers, Sub2 only to pub2, Sub3 to all data2_t Data2 sent_t1(56, 26.5); pub1_sender->send(std::move(sent_t1), dunedaq::iomanager::Sender::s_no_block); @@ -319,6 +323,8 @@ BOOST_FIXTURE_TEST_CASE(PubSubWithTopic, ConfigurationTestFixture) auto sub1_receiver = IOManager::get()->get_receiver(sub1_id); auto sub2_receiver = IOManager::get()->get_receiver(sub2_id); + BOOST_REQUIRE(IOManager::get()->senders_are_ready()); + sub1_receiver->subscribe("sub1_topic"); sub2_receiver->subscribe("sub2_topic"); @@ -346,7 +352,7 @@ BOOST_FIXTURE_TEST_CASE(PubSubWithTopic, ConfigurationTestFixture) BOOST_REQUIRE_EXCEPTION( sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; }); - Data2 sent_t3{ 57, 27.5}; + Data2 sent_t3{ 57, 27.5 }; pub1_sender->send_with_topic(std::move(sent_t3), dunedaq::iomanager::Sender::s_no_block, "sub1_topic"); ret1 = sub1_receiver->receive(std::chrono::milliseconds(10)); BOOST_CHECK_EQUAL(ret1.d1, 57); @@ -377,6 +383,8 @@ BOOST_FIXTURE_TEST_CASE(NonSerializableSendReceive, ConfigurationTestFixture) auto q_sender = IOManager::get()->get_sender(queue_id); auto q_receiver = IOManager::get()->get_receiver(queue_id); + BOOST_REQUIRE(IOManager::get()->senders_are_ready()); + NonSerializableData sent_nw(56, 26.5, "test1"); NonSerializableData sent_q(57, 27.5, "test2"); BOOST_REQUIRE_EXCEPTION(net_sender->send(std::move(sent_nw), dunedaq::iomanager::Sender::s_no_block), @@ -403,6 +411,8 @@ BOOST_FIXTURE_TEST_CASE(NonCopyableSendReceive, ConfigurationTestFixture) auto q_sender = IOManager::get()->get_sender(queue_id); auto q_receiver = IOManager::get()->get_receiver(queue_id); + BOOST_REQUIRE(IOManager::get()->senders_are_ready()); + NonCopyableData sent_nw(56, 26.5, "test1"); NonCopyableData sent_q(57, 27.5, "test2"); net_sender->send(std::move(sent_nw), dunedaq::iomanager::Sender::s_no_block); @@ -427,6 +437,8 @@ BOOST_FIXTURE_TEST_CASE(NonSerializableNonCopyableSendReceive, ConfigurationTest auto q_sender = IOManager::get()->get_sender(queue_id); auto q_receiver = IOManager::get()->get_receiver(queue_id); + BOOST_REQUIRE(IOManager::get()->senders_are_ready()); + NonSerializableNonCopyable sent_nw(56, 26.5, "test1"); NonSerializableNonCopyable sent_q(57, 27.5, "test2"); BOOST_REQUIRE_EXCEPTION(net_sender->send(std::move(sent_nw), dunedaq::iomanager::Sender::s_no_block), @@ -451,6 +463,8 @@ BOOST_FIXTURE_TEST_CASE(CallbackRegistration, ConfigurationTestFixture) auto net_sender = IOManager::get()->get_sender(conn_id); auto q_sender = IOManager::get()->get_sender(queue_id); + BOOST_REQUIRE(IOManager::get()->senders_are_ready()); + Data sent_data_nw(56, 26.5, "test1"); Data sent_data_q(57, 27.5, "test2"); Data recv_data; @@ -494,6 +508,8 @@ BOOST_FIXTURE_TEST_CASE(NonCopyableCallbackRegistration, ConfigurationTestFixtur auto net_sender = IOManager::get()->get_sender(conn_id); auto q_sender = IOManager::get()->get_sender(queue_id); + BOOST_REQUIRE(IOManager::get()->senders_are_ready()); + NonCopyableData sent_data_nw(56, 26.5, "test1"); NonCopyableData sent_data_q(57, 27.5, "test2"); NonCopyableData recv_data; @@ -537,6 +553,8 @@ BOOST_FIXTURE_TEST_CASE(NonSerializableCallbackRegistration, ConfigurationTestFi auto net_sender = IOManager::get()->get_sender(conn_id); auto q_sender = IOManager::get()->get_sender(queue_id); + BOOST_REQUIRE(IOManager::get()->senders_are_ready()); + NonSerializableData sent_data_nw(56, 26.5, "test1"); NonSerializableData sent_data_q(57, 27.5, "test2"); NonSerializableData recv_data; @@ -572,6 +590,8 @@ BOOST_FIXTURE_TEST_CASE(NonSerializableNonCopyableCallbackRegistration, Configur auto net_sender = IOManager::get()->get_sender(conn_id); auto q_sender = IOManager::get()->get_sender(queue_id); + BOOST_REQUIRE(IOManager::get()->senders_are_ready()); + NonSerializableNonCopyable sent_data_nw(56, 26.5, "test1"); NonSerializableNonCopyable sent_data_q(57, 27.5, "test2"); NonSerializableNonCopyable recv_data;