LCOV - code coverage report
Current view: top level - artdaq/DAQrate - DataReceiverManager.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 72.7 % 282 205
Test Date: 2025-09-04 00:45:34 Functions: 61.2 % 49 30

            Line data    Source code
       1              : #include "artdaq/DAQdata/Globals.hh"
       2              : #define TRACE_NAME (app_name + "_DataReceiverManager").c_str()
       3              : 
       4              : #include "artdaq/DAQdata/HostMap.hh"
       5              : #include "artdaq/DAQrate/DataReceiverManager.hh"
       6              : #include "artdaq/DAQrate/detail/MergeParameterSets.hh"
       7              : #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
       8              : 
       9              : #include "cetlib_except/exception.h"
      10              : #include "fhiclcpp/ParameterSet.h"
      11              : 
      12              : #include <boost/bind.hpp>
      13              : #include <boost/exception/all.hpp>
      14              : #include <boost/thread.hpp>
      15              : 
      16              : #include <chrono>
      17              : #include <iomanip>
      18              : #include <thread>
      19              : #include <utility>
      20              : 
      21            2 : artdaq::DataReceiverManager::DataReceiverManager(const fhicl::ParameterSet& pset, std::shared_ptr<SharedMemoryEventManager> shm)
      22            2 :     : stop_requested_(false)
      23            2 :     , stop_requested_time_(0)
      24            2 :     , recv_frag_count_()
      25            2 :     , recv_frag_size_()
      26            2 :     , recv_seq_count_()
      27            4 :     , receive_timeout_(pset.get<size_t>("receive_timeout_usec", 100000))
      28            4 :     , stop_timeout_ms_(pset.get<size_t>("stop_timeout_ms", 1500))
      29            2 :     , shm_manager_(std::move(std::move(shm)))
      30            4 :     , non_reliable_mode_enabled_(pset.get<bool>("non_reliable_mode", false))
      31            8 :     , non_reliable_mode_retry_count_(pset.get<size_t>("non_reliable_mode_retry_count", -1))
      32              : {
      33            4 :         TLOG(TLVL_DEBUG + 32) << "Constructor";
      34            6 :         auto enabled_srcs = pset.get<std::vector<int>>("enabled_sources", std::vector<int>());
      35            2 :         auto enabled_srcs_empty = enabled_srcs.empty();
      36              : 
      37            2 :         if (non_reliable_mode_enabled_)
      38              :         {
      39            0 :                 TLOG(TLVL_WARNING) << "DataReceiverManager is configured to drop data after " << non_reliable_mode_retry_count_
      40            0 :                                    << " failed attempts to put data into the SharedMemoryEventManager! If this is unexpected, please check your configuration!";
      41              :         }
      42              : 
      43            2 :         if (enabled_srcs_empty)
      44              :         {
      45            6 :                 TLOG(TLVL_INFO) << "enabled_sources not specified, assuming all sources enabled.";
      46              :         }
      47              :         else
      48              :         {
      49            0 :                 for (auto& s : enabled_srcs)
      50              :                 {
      51            0 :                         enabled_sources_[s] = true;
      52              :                 }
      53              :         }
      54              : 
      55            2 :         hostMap_t host_map = MakeHostMap(pset);
      56            4 :         auto max_fragment_size_words = pset.get<size_t>("max_fragment_size_words", 0);
      57            6 :         auto transfer_parameters = pset.get<fhicl::ParameterSet>("transfer_parameters", fhicl::ParameterSet());
      58              : 
      59            6 :         auto srcs = pset.get<fhicl::ParameterSet>("sources", fhicl::ParameterSet());
      60            4 :         for (auto& s : srcs.get_pset_names())
      61              :         {
      62            2 :                 auto src_pset = srcs.get<fhicl::ParameterSet>(s);
      63            2 :                 host_map = MakeHostMap(src_pset, host_map);
      64            4 :         }
      65            2 :         auto host_map_pset = MakeHostMapPset(host_map);
      66            2 :         fhicl::ParameterSet srcs_mod;
      67            4 :         for (auto& s : srcs.get_pset_names())
      68              :         {
      69            2 :                 auto src_pset = srcs.get<fhicl::ParameterSet>(s);
      70            4 :                 src_pset.erase("host_map");
      71            2 :                 src_pset.put<std::vector<fhicl::ParameterSet>>("host_map", host_map_pset);
      72              : 
      73            2 :                 if (max_fragment_size_words != 0 && !src_pset.has_key("max_fragment_size_words"))
      74              :                 {
      75            0 :                         src_pset.put<size_t>("max_fragment_size_words", max_fragment_size_words);
      76              :                 }
      77              : 
      78            2 :                 auto resultant_set = merge(transfer_parameters, src_pset);
      79              : 
      80            2 :                 srcs_mod.put<fhicl::ParameterSet>(s, resultant_set);
      81            4 :         }
      82              : 
      83            4 :         for (auto& s : srcs_mod.get_pset_names())
      84              :         {
      85              :                 try
      86              :                 {
      87              :                         auto transfer = std::unique_ptr<TransferInterface>(MakeTransferPlugin(srcs_mod, s,
      88            2 :                                                                                               TransferInterface::Role::kReceive));
      89            2 :                         auto source_rank = transfer->source_rank();
      90            2 :                         if (enabled_srcs_empty)
      91              :                         {
      92            2 :                                 enabled_sources_[source_rank] = true;
      93              :                         }
      94            0 :                         else if (enabled_sources_.count(source_rank) == 0u)
      95              :                         {
      96            0 :                                 enabled_sources_[source_rank] = false;
      97              :                         }
      98            2 :                         running_sources_[source_rank] = false;
      99            2 :                         source_plugins_[source_rank] = std::move(transfer);
     100            2 :                 }
     101            0 :                 catch (const cet::exception& ex)
     102              :                 {
     103            0 :                         TLOG(TLVL_WARNING) << "cet::exception caught while setting up source " << s << ": " << ex.what();
     104            0 :                 }
     105            0 :                 catch (const std::exception& ex)
     106              :                 {
     107            0 :                         TLOG(TLVL_WARNING) << "std::exception caught while setting up source " << s << ": " << ex.what();
     108            0 :                 }
     109            0 :                 catch (...)
     110              :                 {
     111            0 :                         TLOG(TLVL_WARNING) << "Non-cet exception caught while setting up source " << s << ".";
     112            0 :                 }
     113            2 :         }
     114            2 :         if (srcs.get_pset_names().empty())
     115              :         {
     116            0 :                 TLOG(TLVL_ERROR) << "No sources configured!";
     117              :         }
     118            2 : }
     119              : 
     120            2 : artdaq::DataReceiverManager::~DataReceiverManager()
     121              : {
     122            4 :         TLOG(TLVL_DEBUG + 33) << "~DataReceiverManager: BEGIN";
     123            2 :         stop_threads();
     124            2 :         shm_manager_.reset();
     125            4 :         TLOG(TLVL_DEBUG + 33) << "Destructor END";
     126            2 : }
     127              : 
     128            1 : void artdaq::DataReceiverManager::start_threads()
     129              : {
     130            1 :         stop_requested_ = false;
     131            1 :         if (shm_manager_)
     132              :         {
     133            1 :                 shm_manager_->setRequestMode(artdaq::detail::RequestMessageMode::Normal);
     134              :         }
     135            2 :         for (auto& source : source_plugins_)
     136              :         {
     137            1 :                 auto& rank = source.first;
     138            1 :                 if ((enabled_sources_.count(rank) != 0u) && enabled_sources_[rank].load())
     139              :                 {
     140            1 :                         recv_frag_count_.setSlot(rank, 0);
     141            1 :                         recv_frag_size_.setSlot(rank, 0);
     142            1 :                         recv_seq_count_.setSlot(rank, 0);
     143              : 
     144            1 :                         running_sources_[rank] = true;
     145            1 :                         boost::thread::attributes attrs;
     146            1 :                         attrs.set_stack_size(4096 * 2000);  // 2000 KB
     147              :                         try
     148              :                         {
     149            1 :                                 source_threads_[rank] = boost::thread(attrs, boost::bind(&DataReceiverManager::runReceiver_, this, rank));
     150              :                                 char tname[16];                                                   // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
     151            1 :                                 snprintf(tname, sizeof(tname) - 1, "%d-%d RECV", rank, my_rank);  // NOLINT
     152            1 :                                 tname[sizeof(tname) - 1] = '\0';                                  // assure term. snprintf is not too evil :)
     153            1 :                                 auto handle = source_threads_[rank].native_handle();
     154            1 :                                 pthread_setname_np(handle, tname);
     155              :                         }
     156            0 :                         catch (const boost::exception& e)
     157              :                         {
     158            0 :                                 TLOG(TLVL_ERROR) << "Caught boost::exception starting Receiver " << rank << " thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
     159            0 :                                 std::cerr << "Caught boost::exception starting Receiver " << rank << " thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
     160            0 :                                 exit(5);
     161            0 :                         }
     162            1 :                 }
     163              :         }
     164            1 : }
     165              : 
     166            2 : void artdaq::DataReceiverManager::stop_threads()
     167              : {
     168            4 :         TLOG(TLVL_DEBUG + 33) << "stop_threads: BEGIN: Setting stop_requested to true, frags=" << count() << ", bytes=" << byteCount();
     169              : 
     170            2 :         stop_requested_time_ = TimeUtils::gettimeofday_us();
     171            2 :         stop_requested_ = true;
     172              : 
     173            2 :         auto initial_count = running_sources().size();
     174            4 :         TLOG(TLVL_DEBUG + 33) << "stop_threads: Waiting for " << initial_count << " running receiver threads to stop";
     175            2 :         auto wait_start = std::chrono::steady_clock::now();
     176            2 :         auto last_report = std::chrono::steady_clock::now();
     177            4 :         while (!running_sources().empty() && TimeUtils::GetElapsedTime(wait_start) < 60.0)
     178              :         {
     179            0 :                 usleep(10000);
     180            0 :                 if (TimeUtils::GetElapsedTime(last_report) > 1.0)
     181              :                 {
     182            0 :                         TLOG(TLVL_DEBUG + 32) << "stop_threads: Waited " << TimeUtils::GetElapsedTime(wait_start) << " s for " << initial_count
     183            0 :                                               << " receiver threads to end (" << running_sources().size() << " remain)";
     184            0 :                         last_report = std::chrono::steady_clock::now();
     185              :                 }
     186              :         }
     187            2 :         if (!running_sources().empty())
     188              :         {
     189            0 :                 TLOG(TLVL_WARNING) << "stop_threads: Timeout expired while waiting for all receiver threads to end. There are "
     190            0 :                                    << running_sources().size() << " threads remaining.";
     191              :         }
     192              : 
     193            4 :         TLOG(TLVL_DEBUG + 33) << "stop_threads: Joining " << source_threads_.size() << " receiver threads";
     194            3 :         for (auto& source_thread : source_threads_)
     195              :         {
     196            2 :                 TLOG(TLVL_DEBUG + 33) << "stop_threads: Joining thread for source_rank " << source_thread.first;
     197              :                 try
     198              :                 {
     199            1 :                         if (source_thread.second.joinable())
     200              :                         {
     201            1 :                                 source_thread.second.join();
     202              :                         }
     203              :                         else
     204              :                         {
     205            0 :                                 TLOG(TLVL_ERROR) << "stop_threads: Thread for source rank " << source_thread.first << " is not joinable!";
     206              :                         }
     207              :                 }
     208            0 :                 catch (...)
     209              :                 {
     210              :                         // IGNORED
     211            0 :                 }
     212              :         }
     213            2 :         source_threads_.clear();  // To prevent error messages from shutdown-after-stop
     214              : 
     215            4 :         TLOG(TLVL_DEBUG + 33) << "stop_threads: END";
     216            2 : }
     217              : 
     218            9 : std::set<int> artdaq::DataReceiverManager::enabled_sources() const
     219              : {
     220            9 :         std::set<int> output;
     221           18 :         for (auto& src : enabled_sources_)
     222              :         {
     223            9 :                 if (src.second)
     224              :                 {
     225            9 :                         output.insert(src.first);
     226              :                 }
     227              :         }
     228            9 :         return output;
     229            0 : }
     230              : 
     231           25 : std::set<int> artdaq::DataReceiverManager::running_sources() const
     232              : {
     233           25 :         std::set<int> output;
     234           50 :         for (auto& src : running_sources_)
     235              :         {
     236           25 :                 if (src.second)
     237              :                 {
     238           13 :                         output.insert(src.first);
     239              :                 }
     240              :         }
     241           25 :         return output;
     242            0 : }
     243              : 
     244            1 : void artdaq::DataReceiverManager::runReceiver_(int source_rank)
     245              : {
     246            1 :         std::chrono::steady_clock::time_point start_time, after_header, before_body, after_body, end_time = std::chrono::steady_clock::now();
     247              :         int ret;
     248              :         detail::RawFragmentHeader header;
     249            1 :         size_t endOfDataCount = -1;
     250            1 :         auto sleep_time = receive_timeout_ / 100 > 100000 ? 100000 : receive_timeout_ / 100;
     251            1 :         if (sleep_time < 5000)
     252              :         {
     253            1 :                 sleep_time = 5000;
     254              :         }
     255            1 :         auto max_retries = non_reliable_mode_retry_count_ * ceil(receive_timeout_ / sleep_time);
     256              : 
     257           13 :         while (!(stop_requested_ && TimeUtils::gettimeofday_us() - stop_requested_time_ > stop_timeout_ms_ * 1000) && (enabled_sources_.count(source_rank) != 0u))
     258              :         {
     259           26 :                 TLOG(TLVL_DEBUG + 35) << "runReceiver_: Begin loop stop_requested_=" << stop_requested_ << ", stop_timeout_ms_=" << stop_timeout_ms_ << ", enabled_sources_.count(source_rank)=" << enabled_sources_.count(source_rank) << ", now - stop_requested_time_=" << (TimeUtils::gettimeofday_us() - stop_requested_time_);
     260           13 :                 std::this_thread::yield();
     261              : 
     262              :                 // Don't stop receiving until we haven't received anything for 1 second
     263           13 :                 if (endOfDataCount <= recv_frag_count_.slotCount(source_rank) && !source_plugins_[source_rank]->isRunning())
     264              :                 {
     265            2 :                         TLOG(TLVL_DEBUG + 32) << "runReceiver_: End of Data conditions met, ending runReceiver loop";
     266            1 :                         break;
     267              :                 }
     268              : 
     269           12 :                 start_time = std::chrono::steady_clock::now();
     270              : 
     271           24 :                 TLOG(TLVL_DEBUG + 35) << "runReceiver_: Calling receiveFragmentHeader tmo=" << receive_timeout_;
     272           12 :                 ret = source_plugins_[source_rank]->receiveFragmentHeader(header, receive_timeout_);
     273           24 :                 TLOG(TLVL_DEBUG + 35) << "runReceiver_: Done with receiveFragmentHeader, ret=" << ret << " (should be " << source_rank << ")";
     274           12 :                 if (ret != source_rank)
     275              :                 {
     276           10 :                         if (ret >= 0)
     277              :                         {
     278            0 :                                 TLOG(TLVL_WARNING) << "Received Fragment from rank " << ret << ", but was expecting one from rank " << source_rank << "!";
     279              :                         }
     280           10 :                         else if (ret == TransferInterface::DATA_END)
     281              :                         {
     282            0 :                                 TLOG(TLVL_ERROR) << "Transfer Plugin returned DATA_END, ending receive loop!";
     283            0 :                                 break;
     284              :                         }
     285           10 :                         if (*running_sources().begin() == source_rank)  // Only do this for the first sender in the running_sources_ map
     286              :                         {
     287           20 :                                 TLOG(TLVL_DEBUG + 34) << "Calling SMEM::CheckPendingBuffers from DRM receiver thread for " << source_rank << " to make sure that things aren't stuck";
     288           10 :                                 shm_manager_->CheckPendingBuffers();
     289              :                         }
     290              : 
     291           10 :                         usleep(sleep_time);
     292           10 :                         continue;  // Receive timeout or other oddness
     293           10 :                 }
     294              : 
     295            2 :                 after_header = std::chrono::steady_clock::now();
     296              : 
     297            2 :                 if (Fragment::isUserFragmentType(header.type) || header.type == Fragment::DataFragmentType || header.type == Fragment::EmptyFragmentType || header.type == Fragment::ContainerFragmentType)
     298              :                 {
     299            2 :                         TLOG(TLVL_DEBUG + 33) << "Received Fragment Header from rank " << source_rank << ", sequence ID " << header.sequence_id << ", timestamp " << header.timestamp << ", type " << header.type;
     300            1 :                         RawDataType* loc = nullptr;
     301            1 :                         size_t retries = 0;
     302            1 :                         auto latency_s = header.getLatency(true);
     303            1 :                         auto latency = latency_s.tv_sec + (latency_s.tv_nsec / 1000000000.0);
     304            2 :                         while (loc == nullptr)  //&& TimeUtils::GetElapsedTimeMicroseconds(after_header)) < receive_timeout_)
     305              :                         {
     306            1 :                                 loc = shm_manager_->WriteFragmentHeader(header);
     307              : 
     308              :                                 // Break here and outside of the loop to go to the cleanup steps at the end of runReceiver_
     309            1 :                                 if (loc == nullptr && stop_requested_)
     310              :                                 {
     311            0 :                                         break;
     312              :                                 }
     313              : 
     314            1 :                                 if (loc == nullptr)
     315              :                                 {
     316            0 :                                         usleep(sleep_time);
     317              :                                 }
     318            1 :                                 retries++;
     319            1 :                                 if (non_reliable_mode_enabled_ && retries > max_retries)
     320              :                                 {
     321            0 :                                         loc = shm_manager_->WriteFragmentHeader(header, true);
     322              :                                 }
     323              :                         }
     324              :                         // Break here to go to cleanup at the end of runReceiver_
     325            1 :                         if (loc == nullptr && stop_requested_)
     326              :                         {
     327            0 :                                 break;
     328              :                         }
     329            1 :                         if (loc == nullptr)
     330              :                         {
     331              :                                 // Could not enqueue event!
     332            0 :                                 TLOG(TLVL_ERROR) << "runReceiver_: Could not get data location for event " << header.sequence_id;
     333            0 :                                 continue;
     334            0 :                         }
     335            1 :                         before_body = std::chrono::steady_clock::now();
     336              : 
     337            2 :                         TLOG(TLVL_DEBUG + 35) << "runReceiver_: Calling receiveFragmentData from rank " << source_rank << ", sequence ID " << header.sequence_id << ", timestamp " << header.timestamp;
     338            1 :                         auto ret2 = source_plugins_[source_rank]->receiveFragmentData(loc, header.word_count - header.num_words());
     339            2 :                         TLOG(TLVL_DEBUG + 35) << "runReceiver_: Done with receiveFragmentData, ret2=" << ret2 << " (should be " << source_rank << ")";
     340              : 
     341            1 :                         if (ret != ret2)
     342              :                         {
     343            0 :                                 TLOG(TLVL_ERROR) << "Unexpected return code from receiveFragmentData after receiveFragmentHeader! (Expected: " << ret << ", Got: " << ret2 << ")";
     344            0 :                                 TLOG(TLVL_ERROR) << "Error receiving data from rank " << source_rank << ", data has been lost! Event " << header.sequence_id << " will most likely be Incomplete!";
     345              : 
     346              :                                 // Mark the Fragment as invalid
     347            0 :                                 header.valid = false;
     348            0 :                                 header.complete = false;
     349              : 
     350            0 :                                 shm_manager_->DoneWritingFragment(header);
     351              :                                 // throw cet::exception("DataReceiverManager") << "Unexpected return code from receiveFragmentData after receiveFragmentHeader! (Expected: " << ret << ", Got: " << ret2 << ")";
     352            0 :                                 continue;
     353            0 :                         }
     354              : 
     355            1 :                         shm_manager_->DoneWritingFragment(header);
     356            2 :                         TLOG(TLVL_DEBUG + 33) << "Done receiving fragment with sequence ID " << header.sequence_id << " from rank " << source_rank;
     357              : 
     358            1 :                         recv_frag_count_.incSlot(source_rank);
     359            1 :                         recv_frag_size_.incSlot(source_rank, header.word_count * sizeof(RawDataType));
     360            1 :                         recv_seq_count_.setSlot(source_rank, header.sequence_id);
     361            1 :                         if (endOfDataCount != static_cast<size_t>(-1))
     362              :                         {
     363            0 :                                 TLOG(TLVL_DEBUG + 32) << "Received fragment " << header.sequence_id << " from rank " << source_rank
     364            0 :                                                       << " (" << recv_frag_count_.slotCount(source_rank) << "/" << endOfDataCount << ")";
     365              :                         }
     366              : 
     367            1 :                         after_body = std::chrono::steady_clock::now();
     368              : 
     369            1 :                         auto hdr_delta_t = TimeUtils::GetElapsedTime(start_time, after_header);
     370            1 :                         auto store_delta_t = TimeUtils::GetElapsedTime(after_header, before_body);
     371            1 :                         auto data_delta_t = TimeUtils::GetElapsedTime(before_body, after_body);
     372            1 :                         auto delta_t = TimeUtils::GetElapsedTime(start_time, after_body);
     373            1 :                         auto dead_t = TimeUtils::GetElapsedTime(end_time, start_time);
     374            1 :                         auto recv_wait_t = hdr_delta_t - latency;
     375              : 
     376            1 :                         uint64_t data_size = header.word_count * sizeof(RawDataType);
     377            1 :                         auto header_size = header.num_words() * sizeof(RawDataType);
     378              : 
     379            1 :                         if (metricMan)
     380              :                         {  //&& recv_frag_count_.slotCount(source_rank) % 100 == 0) {
     381            2 :                                 TLOG(TLVL_DEBUG + 34) << "runReceiver_: Sending receive stats for rank " << source_rank;
     382            4 :                                 metricMan->sendMetric("Total Receive Time From Rank " + std::to_string(source_rank), delta_t, "s", 5, MetricMode::Accumulate);
     383            4 :                                 metricMan->sendMetric("Total Receive Size From Rank " + std::to_string(source_rank), data_size, "B", 5, MetricMode::Accumulate);
     384            4 :                                 metricMan->sendMetric("Total Receive Rate From Rank " + std::to_string(source_rank), data_size / delta_t, "B/s", 5, MetricMode::Average);
     385              : 
     386            4 :                                 metricMan->sendMetric("Header Receive Time From Rank " + std::to_string(source_rank), hdr_delta_t, "s", 5, MetricMode::Accumulate);
     387            4 :                                 metricMan->sendMetric("Header Receive Size From Rank " + std::to_string(source_rank), header_size, "B", 5, MetricMode::Accumulate);
     388            4 :                                 metricMan->sendMetric("Header Receive Rate From Rank " + std::to_string(source_rank), header_size / hdr_delta_t, "B/s", 5, MetricMode::Average);
     389              : 
     390            1 :                                 auto payloadSize = data_size - header_size;
     391            4 :                                 metricMan->sendMetric("Data Receive Time From Rank " + std::to_string(source_rank), data_delta_t, "s", 5, MetricMode::Accumulate);
     392            4 :                                 metricMan->sendMetric("Data Receive Size From Rank " + std::to_string(source_rank), payloadSize, "B", 5, MetricMode::Accumulate);
     393            4 :                                 metricMan->sendMetric("Data Receive Rate From Rank " + std::to_string(source_rank), payloadSize / data_delta_t, "B/s", 5, MetricMode::Average);
     394              : 
     395            4 :                                 metricMan->sendMetric("Data Receive Count From Rank " + std::to_string(source_rank), recv_frag_count_.slotCount(source_rank), "fragments", 3, MetricMode::LastPoint);
     396              : 
     397            4 :                                 metricMan->sendMetric("Total Shared Memory Wait Time From Rank " + std::to_string(source_rank), store_delta_t, "s", 3, MetricMode::Accumulate);
     398            4 :                                 metricMan->sendMetric("Avg Shared Memory Wait Time From Rank " + std::to_string(source_rank), store_delta_t, "s", 3, MetricMode::Average);
     399            4 :                                 metricMan->sendMetric("Avg Fragment Wait Time From Rank " + std::to_string(source_rank), dead_t, "s", 3, MetricMode::Average);
     400              : 
     401            6 :                                 metricMan->sendMetric("Rank", std::to_string(my_rank), "", 3, MetricMode::LastPoint);
     402            6 :                                 metricMan->sendMetric("App Name", app_name, "", 3, MetricMode::LastPoint);
     403            4 :                                 metricMan->sendMetric("Fragment Latency at Receive From Rank " + std::to_string(source_rank), latency, "s", 4, MetricMode::Average | MetricMode::Maximum);
     404            4 :                                 metricMan->sendMetric("Header Receive Wait Time From Rank" + std::to_string(source_rank), recv_wait_t, "s", 4, MetricMode::Average | MetricMode::Maximum | MetricMode::Minimum);
     405              : 
     406            2 :                                 TLOG(TLVL_DEBUG + 34) << "runReceiver_: Done sending receive stats for rank " << source_rank;
     407              :                         }
     408              : 
     409            1 :                         end_time = std::chrono::steady_clock::now();
     410              :                 }
     411            1 :                 else if (Fragment::isBroadcastFragmentType(header.type))
     412              :                 {
     413            2 :                         TLOG(TLVL_DEBUG + 32) << "Received System Fragment broadcast " << header.sequence_id << " from rank " << source_rank << " of type " << detail::RawFragmentHeader::SystemTypeToString(header.type) << ".";
     414              : 
     415            1 :                         FragmentPtr frag(new Fragment(header.word_count - header.num_words()));
     416            1 :                         memcpy(frag->headerAddress(), &header, header.num_words() * sizeof(RawDataType));
     417            1 :                         auto ret3 = source_plugins_[source_rank]->receiveFragmentData(frag->headerAddress() + header.num_words(), header.word_count - header.num_words());  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     418            1 :                         if (ret3 != source_rank)
     419              :                         {
     420            0 :                                 TLOG(TLVL_ERROR) << "Unexpected return code from receiveFragmentData after receiveFragmentHeader while receiving System Fragment! (Expected: " << source_rank << ", Got: " << ret3 << ")";
     421            0 :                                 throw cet::exception("DataReceiverManager") << "Unexpected return code from receiveFragmentData after receiveFragmentHeader while receiving System Fragment! (Expected: " << source_rank << ", Got: " << ret3 << ")";  // NOLINT(cert-err60-cpp)
     422              :                         }
     423              : 
     424            1 :                         switch (header.type)
     425              :                         {
     426            1 :                                 case Fragment::EndOfDataFragmentType:
     427            1 :                                         shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
     428            1 :                                         if (endOfDataCount == static_cast<size_t>(-1))
     429              :                                         {
     430            1 :                                                 endOfDataCount = *(frag->dataBegin());
     431              :                                         }
     432              :                                         else
     433              :                                         {
     434            0 :                                                 endOfDataCount += *(frag->dataBegin());
     435              :                                         }
     436            2 :                                         TLOG(TLVL_DEBUG + 32) << "EndOfData Fragment indicates that " << endOfDataCount << " fragments are expected from rank " << source_rank
     437            1 :                                                               << " (recvd " << recv_frag_count_.slotCount(source_rank) << ").";
     438            1 :                                         break;
     439            0 :                                 case Fragment::InitFragmentType:
     440            0 :                                         TLOG(TLVL_DEBUG + 32) << "Received Init Fragment from rank " << source_rank << ".";
     441            0 :                                         shm_manager_->setRequestMode(detail::RequestMessageMode::Normal);
     442            0 :                                         shm_manager_->AddInitFragment(frag);
     443            0 :                                         break;
     444            0 :                                 case Fragment::EndOfRunFragmentType:
     445            0 :                                         shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
     446              :                                         // shm_manager_->endRun();
     447            0 :                                         break;
     448            0 :                                 case Fragment::EndOfSubrunFragmentType:
     449              :                                         // shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
     450            0 :                                         TLOG(TLVL_DEBUG + 32) << "Received EndOfSubrun Fragment from rank " << source_rank
     451            0 :                                                               << " with sequence_id " << header.sequence_id << " and timestamp " << header.timestamp << ".";
     452            0 :                                         if (header.sequence_id != Fragment::InvalidSequenceID)
     453              :                                         {
     454            0 :                                                 shm_manager_->rolloverSubrun(header.sequence_id, header.timestamp, false);
     455              :                                         }
     456              :                                         else
     457              :                                         {
     458            0 :                                                 shm_manager_->rolloverSubrun(recv_seq_count_.slotCount(source_rank), header.timestamp, false);
     459              :                                         }
     460            0 :                                         break;
     461            0 :                                 case Fragment::ShutdownFragmentType:
     462            0 :                                         shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
     463            0 :                                         break;
     464            0 :                                 default:
     465            0 :                                         break;
     466              :                         }
     467              : 
     468            1 :                         if (header.type != Fragment::InitFragmentType && header.type != Fragment::EndOfDataFragmentType && header.type != Fragment::ShutdownFragmentType)
     469              :                         {
     470            0 :                                 shm_manager_->BroadcastFragment(frag);
     471              :                         }
     472            1 :                 }
     473              :         }
     474              : 
     475            1 :         source_plugins_[source_rank]->flush_buffers();
     476              : 
     477            2 :         TLOG(TLVL_DEBUG + 32) << "runReceiver_ " << source_rank << " receive loop exited";
     478            1 :         running_sources_[source_rank] = false;
     479            1 : }
        

Generated by: LCOV version 2.0-1