LCOV - code coverage report
Current view: top level - artdaq/DAQrate - FragmentReceiverManager.hh (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 26 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 9 0

            Line data    Source code
       1              : #ifndef ARTDAQ_DAQRATE_DATATRANSFERMANAGER_HH
       2              : #define ARTDAQ_DAQRATE_DATATRANSFERMANAGER_HH
       3              : 
       4              : #include "TRACE/tracemf.h"  // Pre-empt TRACE/trace.h from Fragment.hh.
       5              : #include "artdaq-core/Data/Fragment.hh"
       6              : 
       7              : #include "artdaq/DAQrate/detail/FragCounter.hh"
       8              : #include "artdaq/TransferPlugins/TransferInterface.hh"
       9              : 
      10              : namespace fhicl {
      11              : class ParameterSet;
      12              : }
      13              : 
      14              : #include <condition_variable>
      15              : #include <map>
      16              : #include <memory>
      17              : #include <set>
      18              : 
      19              : namespace artdaq {
      20              : class FragmentReceiverManager;
      21              : class FragmentStoreElement;
      22              : }  // namespace artdaq
      23              : 
      24              : /**
      25              :  * \brief Receives Fragment objects from one or more DataSenderManager instances using TransferInterface plugins
      26              :  * DataReceiverMaanger runs a reception thread for each source, and can automatically suppress reception from
      27              :  * sources which are going faster than the others.
      28              :  */
      29              : class artdaq::FragmentReceiverManager
      30              : {
      31              : public:
      32              :         /**
      33              :          * \brief FragmentReceiverManager Constructor
      34              :          * \param ps ParameterSet used to configure the FragmentReceiverManager
      35              :          *
      36              :          * \verbatim
      37              :          * FragmentReceiverManager accepts the following Parameters:
      38              :          * "auto_suppression_enabled" (Default: true): Whether to suppress a source that gets too far ahead
      39              :          * "max_receive_difference" (Default: 50): Threshold (in sequence ID) for suppressing a source
      40              :          * "receive_timeout_usec" (Default: 100000): The timeout for receive operations
      41              :          * "enabled_sources" (OPTIONAL): List of sources which are enabled. If not specified, all sources are assumed enabled
      42              :          * "sources" (Default: blank table): FHiCL table containing TransferInterface configurations for each source.
      43              :          *   NOTE: "source_rank" MUST be specified (and unique) for each source!
      44              :          * \endverbatim
      45              :          */
      46              :         explicit FragmentReceiverManager(const fhicl::ParameterSet& ps);
      47              : 
      48              :         /**
      49              :          * \brief FragmentReceiverManager Destructor
      50              :          */
      51              :         virtual ~FragmentReceiverManager();
      52              : 
      53              :         /**
      54              :          * \brief Receive a Fragment
      55              :          * \param[out] rank Rank of sender that sent the Fragment, or RECV_TIMEOUT
      56              :          * \param timeout_usec Timeout to wait for a Fragment to become ready
      57              :          * \return Pointer to received Fragment. May be nullptr if no Fragments are ready
      58              :          */
      59              :         FragmentPtr recvFragment(int& rank, size_t timeout_usec = 0);
      60              : 
      61              :         /**
      62              :          * \brief Return the count of Fragment objects received by this FragmentReceiverManager
      63              :          * \return The count of Fragment objects received by this FragmentReceiverManager
      64              :          */
      65              :         size_t count() const;
      66              : 
      67              :         /**
      68              :          * \brief Get the count of Fragment objects received by this FragmentReceiverManager from a given source
      69              :          * \param rank Source rank to get count for
      70              :          * \return The  count of Fragment objects received by this FragmentReceiverManager from the source
      71              :          */
      72              :         size_t slotCount(size_t rank) const;
      73              : 
      74              :         /**
      75              :          * \brief Get the total size of all data recieved by this FragmentReceiverManager
      76              :          * \return The total size of all data received by this FragmentReceiverManager
      77              :          */
      78              :         size_t byteCount() const;
      79              : 
      80              :         /**
      81              :          * \brief Start receiver threads for all enabled sources
      82              :          */
      83              :         void start_threads();
      84              : 
      85              :         /**
      86              :          * \brief Get the list of enabled sources
      87              :          * \return The list of enabled sources
      88              :          */
      89              :         std::set<int> enabled_sources() const;
      90              : 
      91              :         /**
      92              :          * \brief Get the list of sources which are still receiving data
      93              :          * \return std::set containing ranks of sources which are still receiving data
      94              :          */
      95              :         std::set<int> running_sources() const;
      96              : 
      97              : private:
      98              :         FragmentReceiverManager(FragmentReceiverManager const&) = delete;
      99              :         FragmentReceiverManager(FragmentReceiverManager&&) = delete;
     100              :         FragmentReceiverManager& operator=(FragmentReceiverManager const&) = delete;
     101              :         FragmentReceiverManager& operator=(FragmentReceiverManager&&) = delete;
     102              : 
     103              :         void runReceiver_(int);
     104              : 
     105              :         bool fragments_ready_() const;
     106              : 
     107              :         int get_next_source_() const;
     108              : 
     109              :         std::atomic<bool> stop_requested_;
     110              : 
     111              :         std::map<int, boost::thread> source_threads_;
     112              :         std::map<int, std::unique_ptr<TransferInterface>> source_plugins_;
     113              :         std::unordered_map<int, std::pair<size_t, double>> source_metric_data_;
     114              :         std::unordered_map<int, std::chrono::steady_clock::time_point> source_metric_send_time_;
     115              :         std::unordered_map<int, std::atomic<bool>> enabled_sources_;
     116              :         std::unordered_map<int, std::atomic<bool>> running_sources_;
     117              : 
     118              :         std::map<int, FragmentStoreElement> fragment_store_;
     119              : 
     120              :         std::mutex input_cv_mutex_;
     121              :         std::condition_variable input_cv_;
     122              :         std::mutex output_cv_mutex_;
     123              :         std::condition_variable output_cv_;
     124              : 
     125              :         detail::FragCounter recv_frag_count_;  // Number of frags received per source.
     126              :         detail::FragCounter recv_frag_size_;   // Number of bytes received per source.
     127              :         detail::FragCounter recv_seq_count_;   // For counting sequence IDs
     128              :         bool suppress_noisy_senders_;
     129              :         size_t suppression_threshold_;
     130              : 
     131              :         size_t receive_timeout_;
     132              :         mutable int last_source_;
     133              : };
     134              : 
     135              : /**
     136              :  * \brief This class contains tracking information for all Fragment objects which have been received from a specific source
     137              :  *
     138              :  * This class was designed so that there could be a mutex for each source, instead of locking all sources whenever a
     139              :  * Fragment had to be retrieved. FragmentStoreElement is itself a container type, sorted by Fragment arrival time. It is a
     140              :  * modified queue, with only the first element accessible, but it allows elements to be added to either end (for rejected Fragments).
     141              :  */
     142              : class artdaq::FragmentStoreElement
     143              : {
     144              : public:
     145              :         /**
     146              :          * \brief FragmentStoreElement Constructor
     147              :          */
     148            0 :         FragmentStoreElement()
     149            0 :             : frags_()
     150            0 :             , empty_(true)
     151              :         {
     152            0 :                 std::cout << "FragmentStoreElement CONSTRUCTOR" << std::endl;
     153            0 :         }
     154              : 
     155              :         /**
     156              :          * \brief Are any Fragment objects contained in this FragmentStoreElement?
     157              :          * \return Whether any Fragment objects are contained in this FragmentStoreElement
     158              :          */
     159            0 :         bool empty() const
     160              :         {
     161            0 :                 return empty_;
     162              :         }
     163              : 
     164              :         /**
     165              :          * \brief Add a Fragment to the front of the FragmentStoreElement
     166              :          * \param frag Fragment to add
     167              :          */
     168              :         void emplace_front(FragmentPtr&& frag)
     169              :         {
     170              :                 std::unique_lock<std::mutex> lk(mutex_);
     171              :                 frags_.emplace_front(std::move(frag));
     172              :                 empty_ = false;
     173              :         }
     174              : 
     175              :         /**
     176              :          * \brief Add a Fragment to the end of the FragmentStoreElement
     177              :          * \param frag Fragment to add
     178              :          */
     179            0 :         void emplace_back(FragmentPtr&& frag)
     180              :         {
     181            0 :                 std::unique_lock<std::mutex> lk(mutex_);
     182            0 :                 frags_.emplace_back(std::move(frag));
     183            0 :                 empty_ = false;
     184            0 :         }
     185              : 
     186              :         /**
     187              :          * \brief Remove the first Fragment from the FragmentStoreElement and return it
     188              :          * \return The first Fragment in the FragmentStoreElement
     189              :          */
     190            0 :         FragmentPtr front()
     191              :         {
     192            0 :                 std::unique_lock<std::mutex> lk(mutex_);
     193            0 :                 auto current_fragment = std::move(frags_.front());
     194            0 :                 frags_.pop_front();
     195            0 :                 empty_ = frags_.empty();
     196            0 :                 return current_fragment;
     197            0 :         }
     198              : 
     199              :         /**
     200              :          * \brief Set the End-Of-Data marker value for this Receiver
     201              :          * \param eod Number of Receives expected for this receiver
     202              :          */
     203            0 :         void SetEndOfData(size_t eod) { eod_marker_ = eod; }
     204              :         /**
     205              :          * \brief Get the value of the End-Of-Data marker for this Receiver
     206              :          * \return The value of the End-Of-Data marker. Returns -1 (0xFFFFFFFFFFFFFFFF) if no EndOfData Fragments received
     207              :          */
     208            0 :         size_t GetEndOfData() const { return eod_marker_; }
     209              : 
     210              :         /**
     211              :          * \brief Get the number of Fragments stored in this FragmentStoreElement
     212              :          * \return The number of Fragments stored in this FragmentStoreElement
     213              :          */
     214            0 :         size_t size() const { return frags_.size(); }
     215              : 
     216              : private:
     217              :         mutable std::mutex mutex_;
     218              :         FragmentPtrs frags_;
     219              :         std::atomic<bool> empty_;
     220              :         size_t eod_marker_{0xFFFFFFFFFFFFFFFF};
     221              : };
     222              : 
     223              : inline size_t
     224            0 : artdaq::FragmentReceiverManager::
     225              :     count() const
     226              : {
     227            0 :         return recv_frag_count_.count();
     228              : }
     229              : 
     230              : inline size_t
     231              : artdaq::FragmentReceiverManager::
     232              :     slotCount(size_t rank) const
     233              : {
     234              :         return recv_frag_count_.slotCount(rank);
     235              : }
     236              : 
     237              : inline size_t
     238            0 : artdaq::FragmentReceiverManager::
     239              :     byteCount() const
     240              : {
     241            0 :         return recv_frag_size_.count();
     242              : }
     243              : #endif  // ARTDAQ_DAQRATE_DATATRANSFERMANAGER_HH
        

Generated by: LCOV version 2.0-1