LCOV - code coverage report
Current view: top level - artdaq/DAQrate - FragmentReceiverManager.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 200 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 40 0

            Line data    Source code
       1              : #include "artdaq/DAQdata/Globals.hh"
       2              : #define TRACE_NAME (app_name + "_FragmentReceiverManager").c_str()
       3              : 
       4              : #include <chrono>
       5              : #include <memory>
       6              : 
       7              : #include "artdaq/DAQrate/FragmentReceiverManager.hh"
       8              : #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
       9              : #include "cetlib_except/exception.h"
      10              : 
      11            0 : artdaq::FragmentReceiverManager::FragmentReceiverManager(const fhicl::ParameterSet& pset)
      12            0 :     : stop_requested_(false)
      13            0 :     , recv_frag_count_()
      14            0 :     , recv_frag_size_()
      15            0 :     , recv_seq_count_()
      16            0 :     , suppress_noisy_senders_(pset.get<bool>("auto_suppression_enabled", true))
      17            0 :     , suppression_threshold_(pset.get<size_t>("max_receive_difference", 50))
      18            0 :     , receive_timeout_(pset.get<size_t>("receive_timeout_usec", 100000))
      19            0 :     , last_source_(-1)
      20              : {
      21            0 :         TLOG(TLVL_DEBUG + 32) << "Constructor";
      22            0 :         auto enabled_srcs = pset.get<std::vector<int>>("enabled_sources", std::vector<int>());
      23            0 :         auto enabled_srcs_empty = enabled_srcs.empty();
      24            0 :         if (enabled_srcs_empty)
      25              :         {
      26            0 :                 TLOG(TLVL_INFO) << "enabled_sources not specified, assuming all sources enabled.";
      27              :         }
      28              :         else
      29              :         {
      30            0 :                 for (auto& s : enabled_srcs)
      31              :                 {
      32            0 :                         enabled_sources_[s] = true;
      33              :                 }
      34              :         }
      35              : 
      36            0 :         auto srcs = pset.get<fhicl::ParameterSet>("sources", fhicl::ParameterSet());
      37            0 :         for (auto& s : srcs.get_pset_names())
      38              :         {
      39              :                 try
      40              :                 {
      41              :                         auto transfer = std::unique_ptr<TransferInterface>(MakeTransferPlugin(srcs, s,
      42            0 :                                                                                               TransferInterface::Role::kReceive));
      43            0 :                         auto source_rank = transfer->source_rank();
      44            0 :                         if (enabled_srcs_empty)
      45              :                         {
      46            0 :                                 enabled_sources_[source_rank] = true;
      47              :                         }
      48            0 :                         else if (enabled_sources_.count(source_rank) == 0u)
      49              :                         {
      50            0 :                                 enabled_sources_[source_rank] = false;
      51              :                         }
      52            0 :                         running_sources_[source_rank] = false;
      53            0 :                         source_plugins_[source_rank] = std::move(transfer);
      54            0 :                         fragment_store_[source_rank];
      55            0 :                         source_metric_send_time_[source_rank] = std::chrono::steady_clock::now();
      56            0 :                         source_metric_data_[source_rank] = std::pair<size_t, double>();
      57            0 :                 }
      58            0 :                 catch (const cet::exception& ex)
      59              :                 {
      60            0 :                         TLOG(TLVL_WARNING) << "cet::exception caught while setting up source " << s << ": " << ex.what();
      61            0 :                 }
      62            0 :                 catch (const std::exception& ex)
      63              :                 {
      64            0 :                         TLOG(TLVL_WARNING) << "std::exception caught while setting up source " << s << ": " << ex.what();
      65            0 :                 }
      66            0 :                 catch (...)
      67              :                 {
      68            0 :                         TLOG(TLVL_WARNING) << "Non-cet exception caught while setting up source " << s << ".";
      69            0 :                 }
      70            0 :         }
      71            0 :         if (srcs.get_pset_names().empty())
      72              :         {
      73            0 :                 TLOG(TLVL_ERROR) << "No sources configured!";
      74              :         }
      75            0 : }
      76              : 
      77            0 : artdaq::FragmentReceiverManager::~FragmentReceiverManager()
      78              : {
      79            0 :         TLOG(TLVL_DEBUG + 32) << "Destructor";
      80            0 :         TLOG(TLVL_DEBUG + 34) << "~FragmentReceiverManager: BEGIN: Setting stop_requested to true, frags=" << count() << ", bytes=" << byteCount();
      81            0 :         stop_requested_ = true;
      82              : 
      83            0 :         TLOG(TLVL_DEBUG + 34) << "~FragmentReceiverManager: Notifying all threads";
      84            0 :         output_cv_.notify_all();
      85              : 
      86            0 :         TLOG(TLVL_DEBUG + 34) << "~FragmentReceiverManager: Joining all threads";
      87            0 :         for (auto& s : source_threads_)
      88              :         {
      89            0 :                 auto& thread = s.second;
      90              :                 try
      91              :                 {
      92            0 :                         if (thread.joinable())
      93              :                         {
      94            0 :                                 thread.join();
      95              :                         }
      96              :                 }
      97            0 :                 catch (...)
      98              :                 {
      99              :                         // IGNORED
     100            0 :                 }
     101              :         }
     102            0 :         TLOG(TLVL_DEBUG + 34) << "~FragmentReceiverManager: DONE";
     103            0 : }
     104              : 
     105            0 : bool artdaq::FragmentReceiverManager::fragments_ready_() const
     106              : {
     107            0 :         for (auto& it : fragment_store_)
     108              :         {
     109            0 :                 if (enabled_sources_.count(it.first) == 0u)
     110              :                 {
     111            0 :                         continue;
     112              :                 }
     113            0 :                 if (!it.second.empty()) { return true; }
     114              :         }
     115            0 :         return false;
     116              : }
     117              : 
     118            0 : int artdaq::FragmentReceiverManager::get_next_source_() const
     119              : {
     120              :         // std::unique_lock<std::mutex> lck(fragment_store_mutex_);
     121            0 :         std::set<int> ready_sources;
     122            0 :         for (auto& it : fragment_store_)
     123              :         {
     124            0 :                 if (enabled_sources_.count(it.first) == 0u)
     125              :                 {
     126            0 :                         continue;
     127              :                 }
     128            0 :                 if (!it.second.empty())
     129              :                 {
     130            0 :                         ready_sources.insert(it.first);
     131              :                 }
     132              :         }
     133              : 
     134            0 :         if (!ready_sources.empty())
     135              :         {
     136            0 :                 auto iter = ready_sources.find(last_source_);
     137            0 :                 if (iter == ready_sources.end() || ++iter == ready_sources.end())
     138              :                 {
     139            0 :                         TLOG(TLVL_DEBUG + 35) << "get_next_source returning " << *ready_sources.begin();
     140            0 :                         last_source_ = *ready_sources.begin();
     141            0 :                         return *ready_sources.begin();
     142              :                 }
     143              : 
     144            0 :                 TLOG(TLVL_DEBUG + 35) << "get_next_source returning " << *iter;
     145            0 :                 last_source_ = *iter;
     146            0 :                 return *iter;
     147              :         }
     148              : 
     149            0 :         TLOG(TLVL_DEBUG + 35) << "get_next_source returning -1";
     150            0 :         return -1;
     151            0 : }
     152              : 
     153            0 : void artdaq::FragmentReceiverManager::start_threads()
     154              : {
     155            0 :         for (auto& source : source_plugins_)
     156              :         {
     157            0 :                 auto& rank = source.first;
     158            0 :                 if (enabled_sources_.count(rank) != 0u)
     159              :                 {
     160            0 :                         running_sources_[rank] = true;
     161              :                         try
     162              :                         {
     163            0 :                                 source_threads_[rank] = boost::thread(&FragmentReceiverManager::runReceiver_, this, rank);
     164              :                                 char tname[16];                                                    // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
     165            0 :                                 snprintf(tname, sizeof(tname) - 1, "%d-%d FRecv", rank, my_rank);  // NOLINT
     166            0 :                                 tname[sizeof(tname) - 1] = '\0';                                   // assure term. snprintf is not too evil :)
     167            0 :                                 auto handle = source_threads_[rank].native_handle();
     168            0 :                                 pthread_setname_np(handle, tname);
     169              :                         }
     170            0 :                         catch (const boost::exception& e)
     171              :                         {
     172            0 :                                 TLOG(TLVL_ERROR) << "Caught boost::exception starting Receiver " << rank << " thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
     173            0 :                                 std::cerr << "Caught boost::exception starting Receiver " << rank << " thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
     174            0 :                                 exit(5);
     175            0 :                         }
     176              :                 }
     177              :         }
     178            0 : }
     179              : 
     180            0 : artdaq::FragmentPtr artdaq::FragmentReceiverManager::recvFragment(int& rank, size_t timeout_usec)
     181              : {
     182            0 :         TLOG(TLVL_DEBUG + 34) << "recvFragment entered tmo=" << timeout_usec << " us";
     183              : 
     184            0 :         if (timeout_usec == 0)
     185              :         {
     186            0 :                 timeout_usec = 1000000;
     187              :         }
     188              : 
     189            0 :         auto ready = fragments_ready_();
     190            0 :         size_t waited = 0;
     191            0 :         auto wait_amount = timeout_usec / 1000 > 1000 ? timeout_usec / 1000 : 1000;
     192            0 :         TLOG(TLVL_DEBUG + 34) << "recvFragment fragment_ready_=" << ready << " before wait";
     193            0 :         while (!ready && waited < timeout_usec)
     194              :         {
     195              :                 {
     196            0 :                         std::unique_lock<std::mutex> lck(input_cv_mutex_);
     197            0 :                         input_cv_.wait_for(lck, std::chrono::microseconds(wait_amount));
     198            0 :                 }
     199            0 :                 waited += wait_amount;
     200            0 :                 ready = fragments_ready_();
     201            0 :                 if (running_sources().empty())
     202              :                 {
     203            0 :                         break;
     204              :                 }
     205              :         }
     206            0 :         TLOG(TLVL_DEBUG + 34) << "recvFragment fragment_ready_=" << ready << " after waited=" << waited;
     207            0 :         if (!ready)
     208              :         {
     209            0 :                 TLOG(TLVL_DEBUG + 34) << "recvFragment: No fragments ready, returning empty";
     210            0 :                 rank = TransferInterface::RECV_TIMEOUT;
     211            0 :                 return std::unique_ptr<Fragment>{};
     212              :         }
     213              : 
     214            0 :         int current_source = get_next_source_();
     215            0 :         FragmentPtr current_fragment = fragment_store_[current_source].front();
     216            0 :         output_cv_.notify_all();
     217            0 :         rank = current_source;
     218              : 
     219            0 :         if (current_fragment != nullptr)
     220              :         {
     221            0 :                 TLOG(TLVL_DEBUG + 34) << "recvFragment: Done  rank=" << rank << ", fragment size=" << std::to_string(current_fragment->size()) << " words, seqId=" << current_fragment->sequenceID();
     222              :         }
     223            0 :         return current_fragment;
     224            0 : }
     225              : 
     226            0 : std::set<int> artdaq::FragmentReceiverManager::running_sources() const
     227              : {
     228            0 :         std::set<int> output;
     229            0 :         for (auto& src : running_sources_)
     230              :         {
     231            0 :                 if (src.second)
     232              :                 {
     233            0 :                         output.insert(src.first);
     234              :                 }
     235              :         }
     236            0 :         return output;
     237            0 : }
     238              : 
     239            0 : std::set<int> artdaq::FragmentReceiverManager::enabled_sources() const
     240              : {
     241            0 :         std::set<int> output;
     242            0 :         for (auto& src : enabled_sources_)
     243              :         {
     244            0 :                 if (src.second)
     245              :                 {
     246            0 :                         output.insert(src.first);
     247              :                 }
     248              :         }
     249            0 :         return output;
     250            0 : }
     251              : 
     252            0 : void artdaq::FragmentReceiverManager::runReceiver_(int source_rank)
     253              : {
     254            0 :         while (!stop_requested_ && (enabled_sources_.count(source_rank) != 0u))
     255              :         {
     256            0 :                 TLOG(TLVL_DEBUG + 36) << "runReceiver_ " << source_rank << ": Begin loop";
     257            0 :                 auto is_suppressed = suppress_noisy_senders_ && recv_seq_count_.slotCount(source_rank) > suppression_threshold_ + recv_seq_count_.minCount();
     258            0 :                 while (!stop_requested_ && is_suppressed)
     259              :                 {
     260            0 :                         TLOG(TLVL_DEBUG + 37) << "runReceiver_: Suppressing receiver rank " << source_rank;
     261            0 :                         if (!is_suppressed)
     262              :                         {
     263            0 :                                 input_cv_.notify_all();
     264              :                         }
     265              :                         else
     266              :                         {
     267            0 :                                 std::unique_lock<std::mutex> lck(output_cv_mutex_);
     268            0 :                                 output_cv_.wait_for(lck, std::chrono::seconds(1));
     269            0 :                         }
     270            0 :                         is_suppressed = suppress_noisy_senders_ && recv_seq_count_.slotCount(source_rank) > suppression_threshold_ + recv_seq_count_.minCount();
     271              :                 }
     272            0 :                 if (stop_requested_)
     273              :                 {
     274            0 :                         running_sources_[source_rank] = false;
     275            0 :                         return;
     276              :                 }
     277              : 
     278            0 :                 if (fragment_store_[source_rank].GetEndOfData() <= recv_frag_count_.slotCount(source_rank) && !source_plugins_[source_rank]->isRunning())
     279              :                 {
     280            0 :                         TLOG(TLVL_DEBUG + 32) << "runReceiver_: EndOfData conditions satisfied, ending receive loop";
     281            0 :                         running_sources_[source_rank] = false;
     282            0 :                         return;
     283              :                 }
     284              : 
     285            0 :                 auto start_time = std::chrono::steady_clock::now();
     286            0 :                 TLOG(TLVL_DEBUG + 36) << "runReceiver_: Calling receiveFragment";
     287            0 :                 auto fragment = std::make_unique<Fragment>();
     288              : #if 0
     289              :                 auto ret = source_plugins_[source_rank]->receiveFragment(*fragment, receive_timeout_);
     290              :                 TLOG(TLVL_DEBUG + 36) << "runReceiver_: Done with receiveFragment, ret=" << ret << " (should be " << source_rank << ")";
     291              :                 if (ret != source_rank) continue; // Receive timeout or other oddness
     292              : #else
     293              :                 artdaq::detail::RawFragmentHeader hdr;
     294            0 :                 auto ret1 = source_plugins_[source_rank]->receiveFragmentHeader(hdr, receive_timeout_);
     295            0 :                 TLOG(TLVL_DEBUG + 36) << "runReceiver_: Done with receiveFragmentHeader, ret1=" << ret1 << " (should be " << source_rank << ")";
     296              : 
     297            0 :                 if (ret1 != source_rank)
     298              :                 {
     299            0 :                         continue;  // Receive timeout or other oddness
     300              :                 }
     301              : 
     302            0 :                 fragment->resize(hdr.word_count - hdr.num_words());
     303            0 :                 memcpy(fragment->headerAddress(), &hdr, hdr.num_words() * sizeof(artdaq::RawDataType));
     304            0 :                 auto ret2 = source_plugins_[source_rank]->receiveFragmentData(fragment->headerAddress() + hdr.num_words(), hdr.word_count - hdr.num_words());  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     305            0 :                 if (ret2 != ret1)
     306              :                 {
     307            0 :                         TLOG(TLVL_ERROR) << "ReceiveFragmentHeader returned " << ret1 << ", but ReceiveFragmentData returned " << ret2;
     308            0 :                         continue;
     309            0 :                 }
     310              : #endif
     311              : 
     312            0 :                 if (fragment->type() == artdaq::Fragment::EndOfDataFragmentType)
     313              :                 {
     314            0 :                         TLOG(TLVL_DEBUG + 33) << "runReceiver_: EndOfData Fragment received!";
     315            0 :                         fragment_store_[source_rank].SetEndOfData(*reinterpret_cast<size_t*>(fragment->dataBegin()));  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     316              :                 }
     317            0 :                 else if (fragment->type() == artdaq::Fragment::DataFragmentType || fragment->type() == artdaq::Fragment::ContainerFragmentType || fragment->isUserFragmentType(fragment->type()))
     318              :                 {
     319            0 :                         TLOG(TLVL_DEBUG + 33) << "runReceiver_: Data Fragment received!";
     320            0 :                         recv_frag_count_.incSlot(source_rank);
     321            0 :                         recv_frag_size_.incSlot(source_rank, fragment->size() * sizeof(RawDataType));
     322            0 :                         recv_seq_count_.setSlot(source_rank, fragment->sequenceID());
     323              :                 }
     324              :                 else
     325              :                 {
     326            0 :                         continue;
     327              :                 }
     328              : 
     329            0 :                 auto delta_t = std::chrono::duration_cast<std::chrono::duration<double, std::ratio<1>>>(std::chrono::steady_clock::now() - start_time).count();
     330            0 :                 source_metric_data_[source_rank].first += fragment->size() * sizeof(RawDataType);
     331            0 :                 source_metric_data_[source_rank].second += delta_t;
     332              : 
     333            0 :                 if (metricMan && TimeUtils::GetElapsedTime(source_metric_send_time_[source_rank]) > 1)
     334              :                 {
     335            0 :                         TLOG(TLVL_DEBUG + 37) << "runReceiver_: Sending receive stats";
     336            0 :                         metricMan->sendMetric("Data Receive Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].second, "s", 1, MetricMode::Accumulate);
     337            0 :                         metricMan->sendMetric("Data Receive Size From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].first, "B", 1, MetricMode::Accumulate);
     338            0 :                         metricMan->sendMetric("Data Receive Rate From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].first / source_metric_data_[source_rank].second, "B/s", 1, MetricMode::Average);
     339              : 
     340            0 :                         source_metric_send_time_[source_rank] = std::chrono::steady_clock::now();
     341            0 :                         source_metric_data_[source_rank].first = 0;
     342            0 :                         source_metric_data_[source_rank].second = 0.0;
     343              :                 }
     344              : 
     345            0 :                 fragment_store_[source_rank].emplace_back(std::move(fragment));
     346            0 :                 TLOG(TLVL_DEBUG + 33) << "runReceiver_: There are now " << fragment_store_[source_rank].size() << " Fragments stored from this source";
     347            0 :                 input_cv_.notify_all();
     348            0 :         }
     349              : 
     350            0 :         running_sources_[source_rank] = false;
     351              : }
        

Generated by: LCOV version 2.0-1