LCOV - code coverage report
Current view: top level - artdaq/DAQrate/detail - TableReceiver.hh (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 1 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 1 0

            Line data    Source code
       1              : #ifndef ARTDAQ_DAQRATE_DETAIL_TABLERECEIVER_HH
       2              : #define ARTDAQ_DAQRATE_DETAIL_TABLERECEIVER_HH
       3              : 
       4              : #include "TRACE/tracemf.h"  // Pre-empt TRACE/trace.h from Fragment.hh.
       5              : #include "artdaq-core/Data/Fragment.hh"
       6              : 
       7              : namespace fhicl {
       8              : class ParameterSet;
       9              : }
      10              : 
      11              : #include "fhiclcpp/types/Atom.h"
      12              : #include "fhiclcpp/types/Comment.h"
      13              : #include "fhiclcpp/types/ConfigurationTable.h"
      14              : #include "fhiclcpp/types/Name.h"
      15              : #include "fhiclcpp/types/OptionalTable.h"
      16              : #include "fhiclcpp/types/TableFragment.h"
      17              : 
      18              : #include "boost/thread.hpp"
      19              : 
      20              : #include <netinet/in.h>
      21              : #include <condition_variable>
      22              : #include <map>
      23              : #include <memory>
      24              : #include <set>
      25              : 
      26              : namespace artdaq {
      27              : class TableReceiver;
      28              : }
      29              : 
      30              : /**
      31              :  * \brief Sends Fragment objects using TransferInterface plugins. Uses Routing Tables if confgiured,
      32              :  * otherwise will Round-Robin Fragments to the destinations.
      33              :  */
      34              : class artdaq::TableReceiver
      35              : {
      36              : public:
      37              :         /// <summary>
      38              :         /// Configuration for Routing table reception
      39              :         ///
      40              :         /// This configuration should be the same for all processes receiving routing tables from a given RoutingManager.
      41              :         /// </summary>
      42              :         struct Config
      43              :         {
      44              :                 ///   "use_routing_manager" (Default: false): True if using the Routing Manager
      45              :                 fhicl::Atom<bool> use_routing_manager{fhicl::Name{"use_routing_manager"}, fhicl::Comment{"True if using the Routing Manager"}, false};
      46              :                 ///   "route_on_request_mode" (Default: false): True if a request for routing information should be sent to the RoutingManager (versus RoutingManager pushing table updates).
      47              :                 fhicl::Atom<bool> route_on_request_mode{fhicl::Name{"route_on_request_mode"}, fhicl::Comment{"True if a request for routing information should be sent to the RoutingManager (versus RoutingManager pushing table updates)."}, false};
      48              :                 ///   "use_routing_table_thread" (Default: true): True if a thread should be run to receive routing updates. Required if route_on_request_mode is false.
      49              :                 fhicl::Atom<bool> use_routing_table_thread{fhicl::Name{"use_routing_table_thread"}, fhicl::Comment{"True if a thread should be run to receive routing updates. Required if route_on_request_mode is false."}, true};
      50              :                 ///   "table_update_port" (Default: 35556): Port to connect to for receiving table updates
      51              :                 fhicl::Atom<int> table_port{fhicl::Name{"table_update_port"}, fhicl::Comment{"Port to connect to for receiving table updates"}, 35556};
      52              :                 ///   "routing_manager_hostname" (Default: "localhost"): RoutingManager hostname for Table connection
      53              :                 fhicl::Atom<std::string> routing_manager_hostname{fhicl::Name{"routing_manager_hostname"}, fhicl::Comment{"outingManager hostname for Table connection"}, "localhost"};
      54              :                 ///   "routing_timeout_ms" (Default: 1000): Time to wait for a routing table update
      55              :                 fhicl::Atom<int> routing_timeout_ms{fhicl::Name{"routing_timeout_ms"}, fhicl::Comment{"Time to wait (in ms) for a routing table update"}, 1000};
      56              :                 ///   "routing_table_max_size" (Default: 1000): Maximum number of entries in the routing table
      57              :                 fhicl::Atom<size_t> routing_table_max_size{fhicl::Name{"routing_table_max_size"}, fhicl::Comment{"Maximum number of entries in the routing table"}, 1000};
      58              :         };
      59              :         /// Used for ParameterSet validation (if desired)
      60              :         using Parameters = fhicl::WrappedTable<Config>;
      61              : 
      62              :         using RoutingTable = std::map<artdaq::Fragment::sequence_id_t, int>;  ///< Internal representation of a routing table, relating a sequence ID to a destination rank
      63              : 
      64              :         static constexpr int ROUTING_FAILED = -1111;  ///< Value used to indicate that a route was not properly generated
      65              : 
      66              :         /**
      67              :          * \brief TableReceiver Constructor
      68              :          * \param ps ParameterSet used to configure the TableReceiver. See artdaq::TableReceiver::Config
      69              :          */
      70              :         explicit TableReceiver(const fhicl::ParameterSet& ps);
      71              : 
      72              :         /**
      73              :          * \brief TableReceiver Destructor
      74              :          */
      75              :         virtual ~TableReceiver();
      76              : 
      77              :         /**
      78              :          * @brief Get a copy of the current RoutingTable
      79              :          */
      80              :         RoutingTable GetRoutingTable() const;
      81              : 
      82              :         /**
      83              :          * @brief Get the current RoutingTable and remove all entries
      84              :          */
      85              :         RoutingTable GetAndClearRoutingTable();
      86              : 
      87              :         /**
      88              :          * @brief Get the destination rank for the given sequence ID
      89              :          * @param seqID Sequence ID to query
      90              :          * @return Destination rank for given Sequence ID
      91              :          */
      92              :         int GetRoutingTableEntry(artdaq::Fragment::sequence_id_t seqID);
      93              : 
      94              :         /**
      95              :          * \brief Gets the current size of the Routing Table, in case other parts of the system want to use this information
      96              :          * \return The current size of the Routing Table.
      97              :          */
      98              :         size_t GetRoutingTableEntryCount() const;
      99              : 
     100              :         /**
     101              :          * \brief Gets the number of sends remaining in the routing table, in case other parts of the system want to use this information
     102              :          * \return The number of sends remaining in the routing table
     103              :          */
     104              :         size_t GetRemainingRoutingTableEntries() const;
     105              : 
     106              :         /**
     107              :          * \brief Stop the TableReceiver
     108              :          */
     109              :         void StopTableReceiver() { should_stop_ = true; }
     110              : 
     111              :         /**
     112              :          * \brief Remove the given sequence ID from the routing table and sent_count lists
     113              :          * \param seq Sequence ID to remove
     114              :          */
     115              :         void RemoveRoutingTableEntry(Fragment::sequence_id_t seq);
     116              : 
     117              :         /**
     118              :          * @brief Report metrics to MetricManager
     119              :          */
     120              :         void SendMetrics() const;
     121              : 
     122              :         /**
     123              :          * @brief Whether the TableReceiver will receive tables from the RoutingManager
     124              :          */
     125            0 :         bool RoutingManagerEnabled() const { return use_routing_manager_; }
     126              : 
     127              : private:
     128              :         TableReceiver(TableReceiver const&) = delete;
     129              :         TableReceiver(TableReceiver&&) = delete;
     130              :         TableReceiver& operator=(TableReceiver const&) = delete;
     131              :         TableReceiver& operator=(TableReceiver&&) = delete;
     132              : 
     133              :         void connectToRoutingManager_();
     134              :         void disconnectFromRoutingManager_();
     135              : 
     136              :         void startTableReceiverThread_();
     137              : 
     138              :         bool receiveTableUpdate_();
     139              :         void receiveTableUpdatesLoop_();
     140              : 
     141              :         void sendTableUpdateRequest_(Fragment::sequence_id_t seq);
     142              : 
     143              : private:
     144              :         bool use_routing_manager_;
     145              :         std::atomic<bool> should_stop_;
     146              :         int table_port_;
     147              :         std::string table_address_;
     148              :         int table_socket_;
     149              :         RoutingTable routing_table_;
     150              :         Fragment::sequence_id_t routing_table_last_;
     151              :         size_t routing_table_max_size_;
     152              :         mutable std::mutex routing_mutex_;
     153              :         std::unique_ptr<boost::thread> routing_thread_;
     154              :         mutable std::atomic<size_t> routing_wait_time_;
     155              :         mutable std::atomic<size_t> routing_wait_time_count_;
     156              :         mutable std::condition_variable routing_cv_;
     157              : 
     158              :         size_t routing_timeout_ms_;
     159              : 
     160              :         mutable std::atomic<uint64_t> highest_sequence_id_routed_;
     161              : };
     162              : 
     163              : #endif  // ARTDAQ_DAQRATE_DETAIL_TABLERECEIVER_HH
        

Generated by: LCOV version 2.0-1