LCOV - code coverage report
Current view: top level - artdaq/DAQrate - DataSenderManager.hh (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 3 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 2 0

            Line data    Source code
       1              : #ifndef ARTDAQ_DAQRATE_DATASENDERMANAGER_HH
       2              : #define ARTDAQ_DAQRATE_DATASENDERMANAGER_HH
       3              : 
       4              : #include "fhiclcpp/types/Sequence.h"  // Must pre-empt fhiclcpp/types/Atom.h
       5              : 
       6              : #include "TRACE/tracemf.h"  // Pre-empt TRACE/trace.h from Fragment.hh.
       7              : #include "artdaq-core/Data/Fragment.hh"
       8              : 
       9              : #include "artdaq/DAQdata/HostMap.hh"
      10              : #include "artdaq/DAQrate/detail/FragCounter.hh"
      11              : #include "artdaq/DAQrate/detail/TableReceiver.hh"
      12              : #include "artdaq/TransferPlugins/TransferInterface.hh"
      13              : 
      14              : #include "fhiclcpp/types/Atom.h"
      15              : #include "fhiclcpp/types/Comment.h"
      16              : #include "fhiclcpp/types/ConfigurationTable.h"
      17              : #include "fhiclcpp/types/Name.h"
      18              : #include "fhiclcpp/types/OptionalTable.h"
      19              : #include "fhiclcpp/types/TableFragment.h"
      20              : 
      21              : #include <netinet/in.h>
      22              : #include <atomic>
      23              : #include <map>
      24              : #include <memory>
      25              : #include <set>
      26              : 
      27              : namespace artdaq {
      28              : class DataSenderManager;
      29              : }
      30              : 
      31              : /**
      32              :  * \brief Sends Fragment objects using TransferInterface plugins. Uses Routing Tables if confgiured,
      33              :  * otherwise will Round-Robin Fragments to the destinations.
      34              :  */
      35              : class artdaq::DataSenderManager
      36              : {
      37              : public:
      38              :         /// <summary>
      39              :         /// Configuration for transfers to destinations
      40              :         /// </summary>
      41              :         struct DestinationsConfig
      42              :         {
      43              :                 /// Example Configuration for transfer to destination. See artdaq::TransferInterface::Config
      44              :                 fhicl::OptionalTable<artdaq::TransferInterface::Config> dest{fhicl::Name{"d1"}, fhicl::Comment{"Configuration for transfer to destination"}};
      45              :         };
      46              : 
      47              :         /// <summary>
      48              :         /// Configuration of DataSenderManager. May be used for parameter validation
      49              :         /// </summary>
      50              :         struct Config
      51              :         {
      52              :                 /// "broadcast_sends" (Default: false): Send all Fragments to all destinations
      53              :                 fhicl::Atom<bool> broadcast_sends{fhicl::Name{"broadcast_sends"}, fhicl::Comment{"Send all Fragments to all destinations"}, false};
      54              :                 /// "nonblocking_sends" (Default: false): If true, will use non-reliable mode of TransferInterface plugins
      55              :                 fhicl::Atom<bool> nonblocking_sends{fhicl::Name{"nonblocking_sends"}, fhicl::Comment{"Whether sends should block. Used for DL->DISP connection."}, false};
      56              :                 /// "send_timeout_usec" (Default: 5000000 (5 seconds): Timeout for sends in non-reliable modes (broadcast and nonblocking)
      57              :                 fhicl::Atom<size_t> send_timeout_us{fhicl::Name{"send_timeout_usec"}, fhicl::Comment{"Timeout for sends in non-reliable modes (broadcast and nonblocking)"}, 5000000};
      58              :                 /// "send_retry_count" (Default: 2): Number of times to retry a send in non-reliable mode
      59              :                 fhicl::Atom<size_t> send_retry_count{fhicl::Name{"send_retry_count"}, fhicl::Comment{"Number of times to retry a send in non-reliable mode"}, 2};
      60              :                 fhicl::OptionalTable<artdaq::TableReceiver::Config> routing_table_config{fhicl::Name{"routing_table_config"}};  ///< Configuration for Routing Table reception. See artdaq::DataSenderManager::RoutingTableConfig
      61              :                 /// "destinations" (Default: Empty ParameterSet): FHiCL table for TransferInterface configurations for each destaintion. See artdaq::DataSenderManager::DestinationsConfig
      62              :                 ///   NOTE: "destination_rank" MUST be specified (and unique) for each destination!
      63              :                 fhicl::OptionalTable<DestinationsConfig> destinations{fhicl::Name{"destinations"}};
      64              :                 /// Optional host_map configuration (Can also be specified in each DestinationsConfig entry. See artdaq::HostMap::Config
      65              :                 fhicl::TableFragment<artdaq::HostMap::Config> host_map;
      66              :                 /// enabled_destinations" (OPTIONAL): If specified, only the destination ranks listed will be enabled. If not specified, all destinations will be enabled.
      67              :                 fhicl::Sequence<size_t> enabled_destinations{fhicl::Name{"enabled_destinations"}, fhicl::Comment{"List of destiantion ranks to activate (must be defined in destinations block)"}, std::vector<size_t>()};
      68              :         };
      69              :         /// Used for ParameterSet validation (if desired)
      70              :         using Parameters = fhicl::WrappedTable<Config>;
      71              : 
      72              :         /**
      73              :          * \brief DataSenderManager Constructor
      74              :          * \param ps ParameterSet used to configure the DataSenderManager. See artdaq::DataSenderManager::Config
      75              :          */
      76              :         explicit DataSenderManager(const fhicl::ParameterSet& ps);
      77              : 
      78              :         /**
      79              :          * \brief DataSenderManager Destructor
      80              :          */
      81              :         virtual ~DataSenderManager();
      82              : 
      83              :         /**
      84              :          * \brief Send the given Fragment. Return the rank of the destination to which the Fragment was sent.
      85              :          * \param frag Fragment to sent
      86              :          * \return Pair containing Rank of destination for Fragment and the CopyStatus from the send call
      87              :          */
      88              :         std::pair<int, TransferInterface::CopyStatus> sendFragment(Fragment&& frag);
      89              : 
      90              :         /**
      91              :          * \brief Return the count of Fragment objects sent by this DataSenderManagerq
      92              :          * \return The count of Fragment objects sent by this DataSenderManager
      93              :          */
      94              :         size_t count() const;
      95              : 
      96              :         /**
      97              :          * \brief Get the count of Fragment objects sent by this DataSenderManager to a given destination
      98              :          * \param rank Destination rank to get count for
      99              :          * \return The  count of Fragment objects sent by this DataSenderManager to the destination
     100              :          */
     101              :         size_t slotCount(size_t rank) const;
     102              : 
     103              :         /**
     104              :          * \brief Get the number of configured destinations
     105              :          * \return The number of configured destinations
     106              :          */
     107              :         size_t destinationCount() const { return destinations_.size(); }
     108              : 
     109              :         /**
     110              :          * \brief Get the list of enabled destinations
     111              :          * \return The list of enabled destiantion ranks
     112              :          */
     113              :         std::set<int> enabled_destinations() const { return enabled_destinations_; }
     114              : 
     115              :         /**
     116              :          * \brief Gets the current size of the Routing Table, in case other parts of the system want to use this information
     117              :          * \return The current size of the Routing Table.
     118              :          */
     119              :         size_t GetRoutingTableEntryCount() const;
     120              : 
     121              :         /**
     122              :          * \brief Gets the number of sends remaining in the routing table, in case other parts of the system want to use this information
     123              :          * \return The number of sends remaining in the routing table
     124              :          */
     125              :         size_t GetRemainingRoutingTableEntries() const;
     126              : 
     127              :         /**
     128              :          * \brief Stop the DataSenderManager, aborting any sends in progress
     129              :          */
     130            0 :         void StopSender() { should_stop_ = true; }
     131              : 
     132              :         /**
     133              :          * \brief Remove the given sequence ID from the routing table and sent_count lists
     134              :          * \param seq Sequence ID to remove
     135              :          */
     136              :         void RemoveRoutingTableEntry(Fragment::sequence_id_t seq);
     137              :         /**
     138              :          * \brief Get the number of Fragments sent with a given Sequence ID
     139              :          * \param seq Sequence ID to query
     140              :          * \return The number of Fragments sent with a given Sequence ID
     141              :          */
     142              :         size_t GetSentSequenceIDCount(Fragment::sequence_id_t seq);
     143              : 
     144              : private:
     145              :         DataSenderManager(DataSenderManager const&) = delete;
     146              :         DataSenderManager(DataSenderManager&&) = delete;
     147              :         DataSenderManager& operator=(DataSenderManager const&) = delete;
     148              :         DataSenderManager& operator=(DataSenderManager&&) = delete;
     149              : 
     150              :         // Calculate where the fragment with this sequenceID should go.
     151              :         int calcDest_(Fragment::sequence_id_t) const;
     152              : 
     153              : private:
     154              :         std::map<int, std::unique_ptr<artdaq::TransferInterface>> destinations_;
     155              :         std::set<int> enabled_destinations_;
     156              : 
     157              :         detail::FragCounter sent_frag_count_;
     158              : 
     159              :         bool broadcast_sends_;
     160              :         bool non_blocking_mode_;
     161              :         size_t send_timeout_us_;
     162              :         size_t send_retry_count_;
     163              : 
     164              :         std::unique_ptr<TableReceiver> table_receiver_;
     165              :         std::atomic<bool> should_stop_;
     166              :         std::map<Fragment::sequence_id_t, size_t> sent_sequence_id_count_;
     167              : 
     168              :         mutable std::mutex sent_sequence_id_mutex_;
     169              : 
     170              :         mutable std::atomic<uint64_t> highest_sequence_id_routed_;
     171              : };
     172              : 
     173              : inline size_t
     174            0 : artdaq::DataSenderManager::
     175              :     count() const
     176              : {
     177            0 :         return sent_frag_count_.count();
     178              : }
     179              : 
     180              : inline size_t
     181              : artdaq::DataSenderManager::
     182              :     slotCount(size_t rank) const
     183              : {
     184              :         return sent_frag_count_.slotCount(rank);
     185              : }
     186              : #endif  // ARTDAQ_DAQRATE_DATASENDERMANAGER_HH
        

Generated by: LCOV version 2.0-1