LCOV - code coverage report
Current view: top level - artdaq/RoutingPolicies - RoundRobin_policy.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 61 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 16 0

            Line data    Source code
       1              : #include "artdaq/DAQdata/Globals.hh"
       2              : #define TRACE_NAME (app_name + "_RoundRobin_policy").c_str()
       3              : #include "TRACE/tracemf.h"
       4              : 
       5              : #include "artdaq/RoutingPolicies/PolicyMacros.hh"
       6              : #include "artdaq/RoutingPolicies/RoutingManagerPolicy.hh"
       7              : 
       8              : #include "fhiclcpp/ParameterSet.h"
       9              : 
      10              : namespace artdaq {
      11              : /**
      12              :  * \brief A RoutingManagerPolicy which evenly distributes Sequence IDs to all receivers.
      13              :  * If an uneven number of tokens have been received, extra tokens are stored for the next table update.
      14              :  */
      15              : class RoundRobinPolicy : public RoutingManagerPolicy
      16              : {
      17              : public:
      18              :         /**
      19              :          * \brief RoundRobinPolicy Constructor
      20              :          * \param ps ParameterSet used to configure RoundRobinPolicy
      21              :          *
      22              :          * RoundRobinPolicy accepts the following Parameter:
      23              :          * "minimum_participants" (Default: 0): Minimum number of receivers to distribute between. Use negative number to indicate how many can be missing from total. If the number of allowed missing receivers is greater than the number that exist, then the minimum number of participants will be set to 1.
      24              :          */
      25            0 :         explicit RoundRobinPolicy(const fhicl::ParameterSet& ps)
      26            0 :             : RoutingManagerPolicy(ps)
      27            0 :             , minimum_participants_(ps.get<int>("minimum_participants", 0))
      28              :         {
      29            0 :         }
      30              : 
      31              :         /**
      32              :          * \brief Default virtual Destructor
      33              :          */
      34            0 :         ~RoundRobinPolicy() override = default;
      35              : 
      36              :         /**
      37              :          * @brief Add entries to the given RoutingPacket using currently-held tokens
      38              :          * @param output RoutingPacket to add entries to
      39              :          *
      40              :          * RoundRobinPolicy will go through the list of receivers as many times
      41              :          * as it can, until one or more receivers have no tokens. It always does full
      42              :          * "turns" through the recevier list.
      43              :          */
      44              :         void CreateRoutingTable(detail::RoutingPacket& output) override;
      45              :         /**
      46              :          * @brief Get an artdaq::detail::RoutingPacketEntry for a given sequence ID and rank. Used by RequestBasedEventBuilder and DataFlow RoutingManagerMode
      47              :          * @param seq Sequence Number to get route for
      48              :          * @param requesting_rank Rank to route for
      49              :          * @return artdaq::detail::RoutingPacketEntry connecting sequence ID to destination rank
      50              :          */
      51              :         detail::RoutingPacketEntry CreateRouteForSequenceID(artdaq::Fragment::sequence_id_t seq, int requesting_rank) override;
      52              : 
      53              : private:
      54              :         RoundRobinPolicy(RoundRobinPolicy const&) = delete;
      55              :         RoundRobinPolicy(RoundRobinPolicy&&) = delete;
      56              :         RoundRobinPolicy& operator=(RoundRobinPolicy const&) = delete;
      57              :         RoundRobinPolicy& operator=(RoundRobinPolicy&&) = delete;
      58              : 
      59              :         std::map<int, int> sortTokens_();
      60              :         void restoreUnusedTokens_(std::map<int, int> const& sorted_tokens_);
      61              :         int calculateMinimum_();
      62              : 
      63              :         int minimum_participants_;
      64              :         std::set<int> receivers_in_current_round_;
      65              : };
      66              : 
      67            0 : void RoundRobinPolicy::CreateRoutingTable(detail::RoutingPacket& output)
      68              : {
      69            0 :         TLOG(TLVL_DEBUG + 35) << "RoundRobinPolicy::GetCurrentTable token list size is " << tokens_.size();
      70            0 :         auto table = sortTokens_();
      71            0 :         TLOG(TLVL_DEBUG + 36) << "RoundRobinPolicy::GetCurrentTable table size is " << table.size();
      72              : 
      73            0 :         int minimum = calculateMinimum_();
      74            0 :         bool endCondition = table.size() < static_cast<size_t>(minimum);
      75            0 :         TLOG(TLVL_DEBUG + 37) << "RoundRobinPolicy::GetCurrentTable initial endCondition is " << endCondition << ", minimum is " << minimum;
      76              : 
      77            0 :         while (!endCondition)
      78              :         {
      79            0 :                 for (auto it = table.begin(); it != table.end();)
      80              :                 {
      81            0 :                         TLOG(TLVL_DEBUG + 38) << "RoundRobinPolicy::GetCurrentTable assigning sequenceID " << next_sequence_id_ << " to rank " << it->first;
      82            0 :                         output.emplace_back(detail::RoutingPacketEntry(next_sequence_id_++, it->first));
      83            0 :                         table[it->first]--;
      84              : 
      85            0 :                         if (table[it->first] <= 0)
      86              :                         {
      87            0 :                                 it = table.erase(it);
      88              :                         }
      89              :                         else
      90              :                         {
      91            0 :                                 ++it;
      92              :                         }
      93              :                 }
      94            0 :                 endCondition = table.size() < static_cast<size_t>(minimum);
      95              :         }
      96              : 
      97            0 :         restoreUnusedTokens_(table);
      98            0 :         TLOG(TLVL_DEBUG + 36) << "RoundRobinPolicy::GetCurrentTable " << tokens_.size() << " unused tokens will be saved for later";
      99              : 
     100            0 :         TLOG(TLVL_DEBUG + 35) << "RoundRobinPolicy::GetCurrentTable return with table size " << output.size();
     101            0 : }
     102            0 : detail::RoutingPacketEntry RoundRobinPolicy::CreateRouteForSequenceID(artdaq::Fragment::sequence_id_t seq, int)
     103              : {
     104            0 :         detail::RoutingPacketEntry output;
     105              : 
     106              :         // Trivial case: We've already started a round
     107            0 :         if (!receivers_in_current_round_.empty())
     108              :         {
     109            0 :                 output = detail::RoutingPacketEntry(seq, *receivers_in_current_round_.begin());
     110            0 :                 receivers_in_current_round_.erase(receivers_in_current_round_.begin());
     111            0 :                 return output;
     112              :         }
     113              : 
     114              :         // We need to set up the next round...
     115            0 :         auto table = sortTokens_();
     116              : 
     117              :         // Can't route anything until a full "turn" is available
     118            0 :         int minimum = calculateMinimum_();
     119            0 :         if (table.size() < static_cast<size_t>(minimum))
     120              :         {
     121            0 :                 TLOG(TLVL_WARNING) << "Do not have tokens from a minimum set of receivers to start a round";
     122              :         }
     123              :         else
     124              :         {
     125            0 :                 for (auto& entry : table)
     126              :                 {
     127            0 :                         receivers_in_current_round_.insert(entry.first);
     128            0 :                         entry.second--;
     129              :                 }
     130            0 :                 output = detail::RoutingPacketEntry(seq, *receivers_in_current_round_.begin());
     131            0 :                 receivers_in_current_round_.erase(receivers_in_current_round_.begin());
     132              :         }
     133              : 
     134            0 :         restoreUnusedTokens_(table);
     135            0 :         return output;
     136            0 : }
     137            0 : std::map<int, int> RoundRobinPolicy::sortTokens_()
     138              : {
     139            0 :         auto output = std::map<int, int>();
     140            0 :         for (auto token : tokens_)
     141              :         {
     142            0 :                 output[token]++;
     143              :         }
     144            0 :         tokens_.clear();
     145            0 :         return output;
     146            0 : }
     147              : 
     148            0 : void RoundRobinPolicy::restoreUnusedTokens_(std::map<int, int> const& sorted_tokens)
     149              : {
     150            0 :         for (auto r : sorted_tokens)
     151              :         {
     152            0 :                 for (auto i = 0; i < r.second; ++i)
     153              :                 {
     154            0 :                         tokens_.push_back(r.first);
     155              :                 }
     156              :         }
     157            0 : }
     158              : 
     159            0 : int RoundRobinPolicy::calculateMinimum_()
     160              : {
     161              :         // If 0 or negative, add minimum_participants_ to GetRecevierCount to ensure that it's correct
     162              :         // 02-Apr-2019, KAB: changed the declared type of "minimum" from 'auto' to 'int' to avoid the
     163              :         // situation in which the compiler chooses a type of 'unsigned int', the minimum_participants_ is
     164              :         // a negative number that is larger (in absolute value) to the receiver count, and "minimum"
     165              :         // ends up with a large positive value.
     166            0 :         int minimum = minimum_participants_ > 0 ? minimum_participants_ : GetReceiverCount() + minimum_participants_;
     167            0 :         if (minimum < 1)
     168              :         {
     169            0 :                 minimum = 1;  // Can't go below 1
     170              :         }
     171            0 :         return minimum;
     172              : }
     173              : 
     174              : }  // namespace artdaq
     175              : 
     176            0 : DEFINE_ARTDAQ_ROUTING_POLICY(artdaq::RoundRobinPolicy)
        

Generated by: LCOV version 2.0-1