LCOV - code coverage report
Current view: top level - artdaq/RoutingPolicies - RoutingManagerPolicy.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 98.8 % 85 84
Test Date: 2025-09-04 00:45:34 Functions: 100.0 % 11 11

            Line data    Source code
       1              : #define TRACE_NAME "RoutingManagerPolicy"
       2              : #include "TRACE/tracemf.h"
       3              : 
       4              : #include "artdaq/DAQdata/Globals.hh"
       5              : #include "artdaq/RoutingPolicies/RoutingManagerPolicy.hh"
       6              : 
       7              : #include "fhiclcpp/ParameterSet.h"
       8              : 
       9           21 : artdaq::RoutingManagerPolicy::RoutingManagerPolicy(const fhicl::ParameterSet& ps)
      10           21 :     : tokens_used_since_last_update_(0)
      11           21 :     , next_sequence_id_(1)
      12           42 :     , max_token_count_(0)
      13              : {
      14           63 :         routing_mode_ = detail::RoutingManagerModeConverter::stringToRoutingManagerMode(ps.get<std::string>("routing_manager_mode", "EventBuilding"));
      15           42 :         routing_cache_max_size_ = ps.get<size_t>("routing_cache_size", 1000);
      16           21 : }
      17              : 
      18           48 : artdaq::detail::RoutingPacket artdaq::RoutingManagerPolicy::GetCurrentTable()
      19              : {
      20           48 :         auto table = detail::RoutingPacket();
      21              : 
      22           48 :         if (routing_mode_ == detail::RoutingManagerMode::EventBuilding)
      23              :         {
      24           36 :                 std::lock_guard<std::mutex> lk(tokens_mutex_);
      25           36 :                 CreateRoutingTable(table);
      26           36 :                 UpdateCache(table);
      27           36 :                 CreateRoutingTableFromCache(table);
      28           36 :         }
      29           48 :         if (routing_mode_ == detail::RoutingManagerMode::RequestBasedEventBuilding)
      30              :         {
      31           12 :                 CreateRoutingTableFromCache(table);
      32              :         }
      33              : 
      34           48 :         return table;
      35            0 : }
      36              : 
      37          138 : void artdaq::RoutingManagerPolicy::AddReceiverToken(int rank, unsigned new_slots_free)
      38              : {
      39          138 :         if (receiver_ranks_.count(rank) == 0u)
      40              :         {
      41          189 :                 TLOG(TLVL_INFO) << "Adding rank " << rank << " to receivers list (initial tokens=" << new_slots_free << ")";
      42           63 :                 receiver_ranks_.insert(rank);
      43              :         }
      44          276 :         TLOG(TLVL_DEBUG + 35) << "AddReceiverToken BEGIN";
      45          138 :         std::lock_guard<std::mutex> lk(tokens_mutex_);
      46          138 :         if (new_slots_free == 1)
      47              :         {
      48          111 :                 tokens_.push_back(rank);
      49              :         }
      50              :         else
      51              :         {
      52              :                 // Randomly distribute multitokens through the token list
      53              :                 // Only used at start run time, so we can take the performance hit
      54          105 :                 for (unsigned i = 0; i < new_slots_free; ++i)
      55              :                 {
      56           78 :                         auto it = tokens_.begin();
      57           78 :                         if (!tokens_.empty())
      58              :                         {
      59           72 :                                 std::advance(it, rand() % tokens_.size());  // NOLINT(cert-msc50-cpp)
      60              :                         }
      61           78 :                         tokens_.insert(it, rank);
      62              :                 }
      63              :         }
      64          138 :         if (tokens_.size() > max_token_count_)
      65              :         {
      66           91 :                 max_token_count_ = tokens_.size();
      67              :         }
      68          276 :         TLOG(TLVL_DEBUG + 35) << "AddReceiverToken END";
      69          138 : }
      70              : 
      71           18 : void artdaq::RoutingManagerPolicy::Reset()
      72              : {
      73           18 :         next_sequence_id_ = 1;
      74           18 :         std::unique_lock<std::mutex> lk(tokens_mutex_);
      75           18 :         tokens_.clear();
      76           18 :         receiver_ranks_.clear();
      77           18 : }
      78              : 
      79           73 : artdaq::detail::RoutingPacketEntry artdaq::RoutingManagerPolicy::GetRouteForSequenceID(artdaq::Fragment::sequence_id_t seq, int requesting_rank)
      80              : {
      81           73 :         if (routing_mode_ != detail::RoutingManagerMode::DataFlow)
      82              :         {
      83           43 :                 std::lock_guard<std::mutex> lk(routing_cache_mutex_);
      84           43 :                 if (routing_cache_.count(seq))
      85              :                 {
      86           16 :                         return detail::RoutingPacketEntry(seq, routing_cache_[seq][0].destination_rank);
      87              :                 }
      88              :                 else
      89              :                 {
      90           27 :                         std::lock_guard<std::mutex> tlk(tokens_mutex_);
      91           27 :                         auto entry = CreateRouteForSequenceID(seq, requesting_rank);
      92           27 :                         if (entry.sequence_id == seq)
      93              :                         {
      94           23 :                                 routing_cache_[seq].emplace_back(seq, entry.destination_rank, requesting_rank);
      95              :                         }
      96           27 :                         return entry;
      97           27 :                 }
      98           43 :         }
      99              :         else
     100              :         {
     101           30 :                 std::lock_guard<std::mutex> lk(routing_cache_mutex_);
     102           30 :                 if (routing_cache_.count(seq))
     103              :                 {
     104           40 :                         for (auto& entry : routing_cache_[seq])
     105              :                         {
     106           27 :                                 if (entry.requesting_rank == requesting_rank)
     107              :                                 {
     108            5 :                                         return detail::RoutingPacketEntry(seq, entry.destination_rank);
     109              :                                 }
     110              :                         }
     111              :                 }
     112              : 
     113           25 :                 std::lock_guard<std::mutex> tlk(tokens_mutex_);
     114           25 :                 auto entry = CreateRouteForSequenceID(seq, requesting_rank);
     115           25 :                 if (entry.sequence_id == seq)
     116              :                 {
     117           24 :                         routing_cache_[seq].emplace_back(seq, entry.destination_rank, requesting_rank);
     118              :                 }
     119              : 
     120           25 :                 TrimRoutingCache();
     121           25 :                 return entry;
     122           30 :         }
     123              :         return detail::RoutingPacketEntry();
     124              : }
     125              : 
     126           37 : void artdaq::RoutingManagerPolicy::TrimRoutingCache()
     127              : {
     128           52 :         while (routing_cache_.size() > routing_cache_max_size_)
     129              :         {
     130           15 :                 routing_cache_.erase(routing_cache_.begin());
     131              :         }
     132           37 : }
     133              : 
     134           36 : void artdaq::RoutingManagerPolicy::UpdateCache(detail::RoutingPacket& table)
     135              : {
     136           36 :         std::lock_guard<std::mutex> lk(routing_cache_mutex_);
     137              : 
     138          163 :         for (auto& entry : table)
     139              :         {
     140          127 :                 if (!routing_cache_.count(entry.sequence_id))
     141              :                 {
     142          126 :                         routing_cache_[entry.sequence_id].emplace_back(entry.sequence_id, entry.destination_rank, my_rank);
     143              :                 }
     144              :         }
     145           36 : }
     146              : 
     147           48 : void artdaq::RoutingManagerPolicy::CreateRoutingTableFromCache(detail::RoutingPacket& table)
     148              : {
     149           48 :         std::lock_guard<std::mutex> lk(routing_cache_mutex_);
     150              : 
     151           48 :         if (routing_mode_ == detail::RoutingManagerMode::RequestBasedEventBuilding)
     152              :         {
     153           47 :                 for (auto& cache_entry : routing_cache_)
     154              :                 {
     155           35 :                         if (!cache_entry.second[0].included_in_table)
     156              :                         {
     157           23 :                                 table.push_back(artdaq::detail::RoutingPacketEntry(cache_entry.second[0].sequence_id, cache_entry.second[0].destination_rank));
     158           23 :                                 cache_entry.second[0].included_in_table = true;
     159              :                         }
     160              :                 }
     161              : 
     162           12 :                 TrimRoutingCache();
     163              :         }
     164           48 : }
        

Generated by: LCOV version 2.0-1