LCOV - code coverage report
Current view: top level - artdaq/Application - RoutingManagerCore.hh (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 1 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 1 0

            Line data    Source code
       1              : #ifndef artdaq_Application_MPI2_RoutingManagerCore_hh
       2              : #define artdaq_Application_MPI2_RoutingManagerCore_hh
       3              : 
       4              : #include "artdaq/DAQrate/StatisticsHelper.hh"
       5              : #include "artdaq/DAQrate/detail/RoutingPacket.hh"
       6              : #include "artdaq/DAQrate/detail/TokenReceiver.hh"
       7              : #include "artdaq/RoutingPolicies/RoutingManagerPolicy.hh"
       8              : 
       9              : #include "canvas/Persistency/Provenance/RunID.h"
      10              : #include "fhiclcpp/ParameterSet.h"
      11              : 
      12              : #include <boost/thread.hpp>
      13              : 
      14              : // Socket Includes
      15              : #include <netinet/in.h>
      16              : #include <sys/epoll.h>
      17              : 
      18              : #include <atomic>
      19              : #include <map>
      20              : #include <memory>
      21              : #include <mutex>
      22              : #include <set>
      23              : #include <string>
      24              : 
      25              : namespace artdaq {
      26              : class RoutingManagerCore;
      27              : }
      28              : 
      29              : /**
      30              :  * \brief RoutingManagerCore implements the state machine for the RoutingManager artdaq application.
      31              :  * RoutingManagerCore collects tokens from receivers, and at regular intervals uses these tokens to build
      32              :  * Routing Tables that are sent to the senders.
      33              :  */
      34              : class artdaq::RoutingManagerCore
      35              : {
      36              : public:
      37              :         static const std::string TABLE_UPDATES_STAT_KEY;           ///< Key for Table Update count MonnitoredQuantity
      38              :         static const std::string TOKENS_RECEIVED_STAT_KEY;         ///< Key for the Tokens Received MonitoredQuantity
      39              :         static const std::string CURRENT_TABLE_INTERVAL_STAT_KEY;  ///< Key for the Current Table Interval MonitoredQuantity
      40              : 
      41              :         /**
      42              :          * \brief RoutingManagerCore Constructor.
      43              :          */
      44              :         RoutingManagerCore();
      45              : 
      46              :         /**
      47              :          * \brief Copy Constructor is deleted
      48              :          */
      49              :         RoutingManagerCore(RoutingManagerCore const&) = delete;
      50              : 
      51              :         /**
      52              :          * Destructor.
      53              :          */
      54              :         ~RoutingManagerCore();
      55              : 
      56              :         /**
      57              :          * \brief Copy Assignment operator is deleted
      58              :          * \return RoutingManagerCore copy
      59              :          */
      60              :         RoutingManagerCore& operator=(RoutingManagerCore const&) = delete;
      61              : 
      62              :         RoutingManagerCore(RoutingManagerCore&&) = delete;             ///< Move Constructor is deleted
      63              :         RoutingManagerCore& operator=(RoutingManagerCore&&) = delete;  ///< Move Assignment Operator is deleted
      64              : 
      65              :         /**
      66              :          * \brief Processes the initialize request.
      67              :          * \param pset ParameterSet used to configure the RoutingManagerCore
      68              :          * \return Whether the initialize attempt succeeded
      69              :          *
      70              :          * \verbatim
      71              :          * RoutingManagerCore accepts the following Parameters:
      72              :          * "daq" (REQUIRED): FHiCL table containing DAQ configuration
      73              :          *   "policy" (REQUIRED): FHiCL table containing the RoutingManagerPolicy configuration
      74              :          *     "policy" (Default: ""): Name of the RoutingManagerPolicy plugin to load
      75              :          *   "rt_priority" (Default: 0): Unix process priority to assign to RoutingManagerCore
      76              :          *   "table_update_interval_ms" (Default: 1000): Maximum amount of time between table updates
      77              :          *   "table_update_interval_high_frac" (Default: 0.75): Fraction of the maximum seen table size at which the interval should be reduced
      78              :          *   "table_update_interval_low_frac" (Default: 0.5): Fraction of the maximum seen table size at which the interval should be increased
      79              :          *   "senders_send_by_send_count" (Default: false): If true, senders will use the current send count to lookup routing information in the table, instead of sequence ID.
      80              :          *   "table_ack_retry_count" (Default: 5): The number of times the table will be resent while waiting for acknowledements
      81              :          *   "table_update_port" (Default: 35556): The port on which to send table updates
      82              :          *   "table_acknowledge_port" (Default: 35557): The port on which to listen for RoutingAckPacket datagrams
      83              :          *   "table_update_address" (Default: "227.128.12.28"): Multicast address to send table updates to
      84              :          *   "routing_manager_hostname" (Default: "localhost"): Hostname to send table updates from
      85              :          *   "metrics": FHiCL table containing configuration for MetricManager
      86              :          * \endverbatim
      87              :          */
      88              :         bool initialize(fhicl::ParameterSet const& pset, uint64_t, uint64_t);
      89              : 
      90              :         /**
      91              :          * \brief Start the RoutingManagerCore
      92              :          * \param id Run ID of the current run
      93              :          * \return True if no exception
      94              :          */
      95              :         bool start(art::RunID id, uint64_t, uint64_t);
      96              : 
      97              :         /**
      98              :          * \brief Stops the RoutingManagerCore
      99              :          * \return True if no exception
     100              :          */
     101              :         bool stop(uint64_t, uint64_t);
     102              : 
     103              :         /**
     104              :          * \brief Pauses the RoutingManagerCore
     105              :          * \return True if no exception
     106              :          */
     107              :         bool pause(uint64_t, uint64_t);
     108              : 
     109              :         /**
     110              :          * \brief Resumes the RoutingManagerCore
     111              :          * \return True if no exception
     112              :          */
     113              :         bool resume(uint64_t, uint64_t);
     114              : 
     115              :         /**
     116              :          * \brief Shuts Down the RoutingManagerCore
     117              :          * \return If the shutdown was successful
     118              :          */
     119              :         bool shutdown(uint64_t);
     120              : 
     121              :         /**
     122              :          * \brief Soft-Initializes the RoutingManagerCore.
     123              :          * \param pset ParameterSet for configuring RoutingManagerCore
     124              :          * \param timeout Timeout for transition
     125              :          * \param timestamp Timestamp of transition
     126              :          * \return Returns initialize status
     127              :          */
     128              :         bool soft_initialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp);
     129              : 
     130              :         /**
     131              :          * \brief Reinitializes the RoutingManagerCore.
     132              :          * \param pset ParameterSet for configuring RoutingManagerCore
     133              :          * \param timeout Timeout for transition
     134              :          * \param timestamp Timestamp of transition
     135              :          * \return Returns initialize status
     136              :          */
     137              :         bool reinitialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp);
     138              : 
     139              :         /**
     140              :          * \brief Main loop of the RoutingManagerCore. Determines when to send the next table update,
     141              :          * asks the RoutingManagerPolicy for the table to send, and sends it.
     142              :          */
     143              :         void process_event_table();
     144              : 
     145              :         /**
     146              :          * \brief Sends a detail::RoutingPacket to the table receivers
     147              :          * \param packet The detail::RoutingPacket to send
     148              :          *
     149              :          * send_event_table checks the table update socket and the acknowledge socket before
     150              :          * sending the table update the first time. It then enters a loop where it sends the table
     151              :          * update, then waits for acknowledgement packets. It keeps track of which senders have sent
     152              :          * their acknowledgement packets, and discards duplicate acks. It leaves this loop once all
     153              :          * senders have sent a valid acknowledgement packet.
     154              :          */
     155              :         void send_event_table(detail::RoutingPacket packet);
     156              : 
     157              :         /**
     158              :          * \brief Send a report on the current status of the RoutingManagerCore
     159              :          * \return A string containing the report on the current status of the RoutingManagerCore
     160              :          *
     161              :          */
     162              :         std::string report(std::string const&) const;
     163              : 
     164              :         /**
     165              :          * \brief Get the number of table updates sent by this RoutingManager this run
     166              :          * \return The number of table updates sent by this RoutingManager this run
     167              :          */
     168            0 :         size_t get_update_count() const { return table_update_count_; }
     169              : 
     170              : private:
     171              :         art::RunID run_id_;
     172              : 
     173              :         fhicl::ParameterSet policy_pset_;
     174              :         fhicl::ParameterSet token_receiver_pset_;
     175              :         int rt_priority_;
     176              : 
     177              :         size_t max_table_update_interval_ms_;
     178              :         std::atomic<size_t> current_table_interval_ms_;
     179              :         double table_update_high_fraction_;
     180              :         double table_update_low_fraction_;
     181              :         std::atomic<size_t> table_update_count_;
     182              : 
     183              :         std::shared_ptr<RoutingManagerPolicy> policy_;
     184              :         std::unique_ptr<TokenReceiver> token_receiver_;
     185              : 
     186              :         std::atomic<bool> shutdown_requested_;
     187              :         std::atomic<bool> stop_requested_;
     188              :         std::atomic<bool> pause_requested_;
     189              : 
     190              :         std::unique_ptr<boost::thread> listen_thread_;
     191              :         int table_listen_port_;
     192              : 
     193              :         std::map<int, std::set<int>> connected_fds_;
     194              :         int epoll_fd_{-1};
     195              :         mutable std::mutex fd_mutex_;
     196              : 
     197              :         // attributes and methods for statistics gathering & reporting
     198              :         std::shared_ptr<artdaq::StatisticsHelper> statsHelperPtr_;
     199              : 
     200              :         std::string buildStatisticsString_() const;
     201              : 
     202              :         void sendMetrics_();
     203              : 
     204              :         void listen_();
     205              :         void receive_();
     206              :         int find_fd_(int fd) const;
     207              : };
     208              : 
     209              : #endif /* artdaq_Application_MPI2_RoutingManagerCore_hh */
        

Generated by: LCOV version 2.0-1