LCOV - code coverage report
Current view: top level - artdaq/DAQrate - DataSenderManager.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 183 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 32 0

            Line data    Source code
       1              : #include "artdaq/DAQdata/Globals.hh"
       2              : #define TRACE_NAME (app_name + "_DataSenderManager").c_str()
       3              : #include "artdaq/DAQdata/HostMap.hh"
       4              : #include "artdaq/DAQrate/DataSenderManager.hh"
       5              : #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
       6              : 
       7              : #include <arpa/inet.h>
       8              : #include <netinet/in.h>
       9              : #include <poll.h>
      10              : #include <sys/socket.h>
      11              : #include <sys/types.h>
      12              : #include <chrono>
      13              : #include "artdaq/DAQdata/TCPConnect.hh"
      14              : #include "artdaq/DAQrate/detail/MergeParameterSets.hh"
      15              : #include "canvas/Utilities/Exception.h"
      16              : 
      17            0 : artdaq::DataSenderManager::DataSenderManager(const fhicl::ParameterSet& pset)
      18            0 :     : sent_frag_count_()
      19            0 :     , broadcast_sends_(pset.get<bool>("broadcast_sends", false))
      20            0 :     , non_blocking_mode_(pset.get<bool>("nonblocking_sends", false))
      21            0 :     , send_timeout_us_(pset.get<size_t>("send_timeout_usec", 5000000))
      22            0 :     , send_retry_count_(pset.get<size_t>("send_retry_count", 2))
      23            0 :     , should_stop_(false)
      24            0 :     , highest_sequence_id_routed_(0)
      25              : {
      26            0 :         TLOG(TLVL_DEBUG + 32) << "Received pset: " << pset.to_string();
      27              : 
      28              :         // Validate parameters
      29            0 :         if (send_timeout_us_ == 0)
      30              :         {
      31            0 :                 send_timeout_us_ = std::numeric_limits<size_t>::max();
      32              :         }
      33              : 
      34            0 :         auto rmConfig = pset.get<fhicl::ParameterSet>("routing_table_config", fhicl::ParameterSet());
      35            0 :         table_receiver_.reset(new TableReceiver(rmConfig));
      36              : 
      37            0 :         hostMap_t host_map = MakeHostMap(pset);
      38            0 :         auto max_fragment_size_words = pset.get<size_t>("max_fragment_size_words", 0);
      39            0 :         auto transfer_parameters = pset.get<fhicl::ParameterSet>("transfer_parameters", fhicl::ParameterSet());
      40              : 
      41            0 :         auto dests = pset.get<fhicl::ParameterSet>("destinations", fhicl::ParameterSet());
      42            0 :         for (auto& d : dests.get_pset_names())
      43              :         {
      44            0 :                 auto dest_pset = dests.get<fhicl::ParameterSet>(d);
      45            0 :                 host_map = MakeHostMap(dest_pset, host_map);
      46            0 :         }
      47            0 :         auto host_map_pset = MakeHostMapPset(host_map);
      48            0 :         fhicl::ParameterSet dests_mod;
      49            0 :         for (auto& d : dests.get_pset_names())
      50              :         {
      51            0 :                 auto dest_pset = dests.get<fhicl::ParameterSet>(d);
      52            0 :                 dest_pset.erase("host_map");
      53            0 :                 dest_pset.put<std::vector<fhicl::ParameterSet>>("host_map", host_map_pset);
      54              : 
      55            0 :                 if (max_fragment_size_words != 0 && !dest_pset.has_key("max_fragment_size_words"))
      56              :                 {
      57            0 :                         dest_pset.put<size_t>("max_fragment_size_words", max_fragment_size_words);
      58              :                 }
      59              : 
      60            0 :                 auto resultant_set = merge(transfer_parameters, dest_pset);
      61              : 
      62            0 :                 dests_mod.put<fhicl::ParameterSet>(d, resultant_set);
      63            0 :         }
      64              : 
      65            0 :         for (auto& d : dests_mod.get_pset_names())
      66              :         {
      67              :                 try
      68              :                 {
      69            0 :                         auto transfer = MakeTransferPlugin(dests_mod, d, TransferInterface::Role::kSend);
      70            0 :                         auto destination_rank = transfer->destination_rank();
      71            0 :                         destinations_.emplace(destination_rank, std::move(transfer));
      72            0 :                 }
      73            0 :                 catch (const std::invalid_argument&)
      74              :                 {
      75            0 :                         TLOG(TLVL_DEBUG + 32) << "Invalid destination specification: " << d;
      76            0 :                 }
      77            0 :                 catch (const cet::exception& ex)
      78              :                 {
      79            0 :                         TLOG(TLVL_WARNING) << "Caught cet::exception: " << ex.what();
      80            0 :                 }
      81            0 :                 catch (...)
      82              :                 {
      83            0 :                         TLOG(TLVL_WARNING) << "Non-cet exception while setting up TransferPlugin: " << d << ".";
      84            0 :                 }
      85            0 :         }
      86            0 :         if (destinations_.empty())
      87              :         {
      88            0 :                 TLOG(TLVL_ERROR) << "No destinations specified!";
      89              :         }
      90              :         else
      91              :         {
      92            0 :                 auto enabled_dests = pset.get<std::vector<size_t>>("enabled_destinations", std::vector<size_t>());
      93            0 :                 if (enabled_dests.empty())
      94              :                 {
      95            0 :                         TLOG(TLVL_INFO) << "enabled_destinations not specified, assuming all destinations enabled.";
      96            0 :                         for (auto& d : destinations_)
      97              :                         {
      98            0 :                                 enabled_destinations_.insert(d.first);
      99              :                         }
     100              :                 }
     101              :                 else
     102              :                 {
     103            0 :                         for (auto& d : enabled_dests)
     104              :                         {
     105            0 :                                 enabled_destinations_.insert(d);
     106              :                         }
     107              :                 }
     108            0 :         }
     109            0 : }
     110              : 
     111            0 : artdaq::DataSenderManager::~DataSenderManager()
     112              : {
     113            0 :         TLOG(TLVL_DEBUG + 32) << "Shutting down DataSenderManager BEGIN";
     114            0 :         should_stop_ = true;
     115            0 :         for (auto& dest : enabled_destinations_)
     116              :         {
     117            0 :                 if (destinations_.count(dest) != 0u)
     118              :                 {
     119            0 :                         auto sts = destinations_[dest]->transfer_fragment_reliable_mode(std::move(*Fragment::eodFrag(sent_frag_count_.slotCount(dest))));
     120            0 :                         if (sts != TransferInterface::CopyStatus::kSuccess)
     121              :                         {
     122            0 :                                 TLOG(TLVL_ERROR) << "Error sending EOD Fragment to sender rank " << dest;
     123              :                         }
     124              :                         //  sendFragTo(std::move(*Fragment::eodFrag(nFragments)), dest, true);
     125              :                 }
     126              :         }
     127            0 :         TLOG(TLVL_DEBUG + 32) << "Shutting down DataSenderManager END. Sent " << count() << " fragments.";
     128            0 : }
     129              : 
     130            0 : size_t artdaq::DataSenderManager::GetRoutingTableEntryCount() const
     131              : {
     132            0 :         return table_receiver_->GetRoutingTableEntryCount();
     133              : }
     134              : 
     135            0 : size_t artdaq::DataSenderManager::GetRemainingRoutingTableEntries() const
     136              : {
     137            0 :         return table_receiver_->GetRemainingRoutingTableEntries();
     138              : }
     139              : 
     140            0 : int artdaq::DataSenderManager::calcDest_(Fragment::sequence_id_t sequence_id) const
     141              : {
     142            0 :         if (enabled_destinations_.empty())
     143              :         {
     144            0 :                 return TableReceiver::ROUTING_FAILED;  // No destinations configured.
     145              :         }
     146              : 
     147            0 :         if (table_receiver_->RoutingManagerEnabled())
     148              :         {
     149            0 :                 TLOG(TLVL_DEBUG + 35) << "calcDest_ use_routing_manager check for routing info for seqID=" << sequence_id << " should_stop_=" << should_stop_;
     150            0 :                 return table_receiver_->GetRoutingTableEntry(sequence_id);
     151              :         }
     152            0 :         if (enabled_destinations_.size() == 1)
     153              :         {
     154            0 :                 return *enabled_destinations_.begin();  // Trivial case
     155              :         }
     156            0 :         auto index = sequence_id % enabled_destinations_.size();
     157            0 :         auto it = enabled_destinations_.begin();
     158            0 :         for (; index > 0; --index)
     159              :         {
     160            0 :                 ++it;
     161            0 :                 if (it == enabled_destinations_.end())
     162              :                 {
     163            0 :                         it = enabled_destinations_.begin();
     164              :                 }
     165              :         }
     166            0 :         return *it;
     167              : }
     168              : 
     169            0 : void artdaq::DataSenderManager::RemoveRoutingTableEntry(Fragment::sequence_id_t seq)
     170              : {
     171            0 :         TLOG(TLVL_DEBUG + 35) << "RemoveRoutingTableEntry: Removing sequence ID " << seq << " from routing table. Sent " << GetSentSequenceIDCount(seq) << " Fragments with this Sequence ID.";
     172            0 :         table_receiver_->RemoveRoutingTableEntry(seq);
     173              : 
     174            0 :         std::unique_lock<std::mutex> lck(sent_sequence_id_mutex_);
     175            0 :         if (sent_sequence_id_count_.find(seq) != sent_sequence_id_count_.end())
     176              :         {
     177            0 :                 sent_sequence_id_count_.erase(sent_sequence_id_count_.find(seq));
     178              :         }
     179            0 : }
     180              : 
     181            0 : size_t artdaq::DataSenderManager::GetSentSequenceIDCount(Fragment::sequence_id_t seq)
     182              : {
     183            0 :         std::unique_lock<std::mutex> lck(sent_sequence_id_mutex_);
     184            0 :         if (sent_sequence_id_count_.count(seq) == 0u)
     185              :         {
     186            0 :                 return 0;
     187              :         }
     188            0 :         return sent_sequence_id_count_[seq];
     189            0 : }
     190              : 
     191            0 : std::pair<int, artdaq::TransferInterface::CopyStatus> artdaq::DataSenderManager::sendFragment(Fragment&& frag)
     192              : {
     193              :         // Precondition: Fragment must be complete and consistent (including
     194              :         // header information).
     195            0 :         auto start_time = std::chrono::steady_clock::now();
     196            0 :         if (frag.type() == Fragment::EndOfDataFragmentType)
     197              :         {
     198            0 :                 throw cet::exception("LogicError")  // NOLINT(cert-err60-cpp)
     199              :                     << "EOD fragments should not be sent on as received: "
     200            0 :                     << "use sendEODFrag() instead.";
     201              :         }
     202            0 :         size_t seqID = frag.sequenceID();
     203            0 :         size_t fragSize = frag.sizeBytes();
     204            0 :         auto latency_s = frag.getLatency(true);
     205            0 :         auto isSystemBroadcast = Fragment::isBroadcastFragmentType(frag.type());
     206              : 
     207            0 :         double latency = latency_s.tv_sec + (latency_s.tv_nsec / 1000000000.0);
     208            0 :         TLOG(TLVL_DEBUG + 36) << "sendFragment start frag.fragmentHeader()=" << std::hex << static_cast<void*>(frag.headerBeginBytes()) << ", szB=" << std::dec << fragSize
     209            0 :                               << ", seqID=" << seqID << ", fragID=" << frag.fragmentID() << ", type=" << frag.typeString();
     210            0 :         int dest = TableReceiver::ROUTING_FAILED;
     211            0 :         auto outsts = TransferInterface::CopyStatus::kSuccess;
     212            0 :         if (broadcast_sends_ || isSystemBroadcast)
     213              :         {
     214            0 :                 for (auto& bdest : enabled_destinations_)
     215              :                 {
     216            0 :                         TLOG(TLVL_DEBUG + 33) << "sendFragment: Sending fragment with seqId " << seqID << " to destination " << bdest << " (broadcast)";
     217              :                         // Gross, we have to copy.
     218            0 :                         auto sts = TransferInterface::CopyStatus::kTimeout;
     219            0 :                         size_t retries = 0;  // Have NOT yet tried, so retries <= send_retry_count_ will have it RETRY send_retry_count_ times
     220            0 :                         while (sts == TransferInterface::CopyStatus::kTimeout && retries <= send_retry_count_)
     221              :                         {
     222            0 :                                 if (!non_blocking_mode_)
     223              :                                 {
     224            0 :                                         sts = destinations_[bdest]->transfer_fragment_reliable_mode(Fragment(frag));
     225              :                                 }
     226              :                                 else
     227              :                                 {
     228            0 :                                         sts = destinations_[bdest]->transfer_fragment_min_blocking_mode(frag, send_timeout_us_);
     229              :                                 }
     230            0 :                                 ++retries;
     231              :                         }
     232            0 :                         if (sts != TransferInterface::CopyStatus::kSuccess)
     233              :                         {
     234            0 :                                 outsts = sts;
     235              :                         }
     236            0 :                         sent_frag_count_.incSlot(bdest);
     237              :                 }
     238            0 :         }
     239            0 :         else if (non_blocking_mode_)
     240              :         {
     241            0 :                 dest = calcDest_(seqID);
     242            0 :                 if (dest == TableReceiver::ROUTING_FAILED)
     243              :                 {
     244            0 :                         TLOG(TLVL_WARNING) << "Could not get destination for seqID " << seqID;
     245              :                 }
     246              : 
     247            0 :                 if (dest != TableReceiver::ROUTING_FAILED && (destinations_.count(dest) != 0u) && (enabled_destinations_.count(dest) != 0u))
     248              :                 {
     249            0 :                         TLOG(TLVL_DEBUG + 33) << "sendFragment: Sending fragment with seqId " << seqID << " to destination " << dest;
     250            0 :                         TransferInterface::CopyStatus sts = TransferInterface::CopyStatus::kErrorNotRequiringException;
     251            0 :                         auto lastWarnTime = std::chrono::steady_clock::now();
     252            0 :                         size_t retries = 0;  // Have NOT yet tried, so retries <= send_retry_count_ will have it RETRY send_retry_count_ times
     253            0 :                         while (sts != TransferInterface::CopyStatus::kSuccess && retries <= send_retry_count_)
     254              :                         {
     255            0 :                                 sts = destinations_[dest]->transfer_fragment_min_blocking_mode(frag, send_timeout_us_);
     256            0 :                                 if (sts != TransferInterface::CopyStatus::kSuccess && TimeUtils::GetElapsedTime(lastWarnTime) >= 1)
     257              :                                 {
     258            0 :                                         TLOG(TLVL_WARNING) << "sendFragment: Sending fragment " << seqID << " to destination " << dest << " failed! Retrying...";
     259            0 :                                         lastWarnTime = std::chrono::steady_clock::now();
     260              :                                 }
     261            0 :                                 ++retries;
     262              :                         }
     263            0 :                         if (sts != TransferInterface::CopyStatus::kSuccess)
     264              :                         {
     265            0 :                                 outsts = sts;
     266              :                         }
     267              :                         // sendFragTo(std::move(frag), dest);
     268            0 :                         sent_frag_count_.incSlot(dest);
     269              :                 }
     270            0 :                 else if (!should_stop_)
     271              :                 {
     272            0 :                         TLOG(TLVL_ERROR) << "(in non_blocking) calcDest returned invalid destination rank " << dest << "! This event has been lost: " << seqID
     273            0 :                                          << ". enabled_destinantions_.size()=" << enabled_destinations_.size();
     274              :                 }
     275              :         }
     276              :         else
     277              :         {
     278            0 :                 auto start = std::chrono::steady_clock::now();
     279            0 :                 while (!should_stop_ && dest == TableReceiver::ROUTING_FAILED)
     280              :                 {
     281            0 :                         dest = calcDest_(seqID);
     282            0 :                         if (dest == TableReceiver::ROUTING_FAILED)
     283              :                         {
     284            0 :                                 TLOG(TLVL_WARNING) << "Could not get destination for seqID " << seqID << ", send number " << sent_frag_count_.count() << ", retrying. Waited " << TimeUtils::GetElapsedTime(start) << " s for routing information.";
     285            0 :                                 usleep(10000);
     286              :                         }
     287              :                 }
     288            0 :                 if (dest != TableReceiver::ROUTING_FAILED && (destinations_.count(dest) != 0u) && (enabled_destinations_.count(dest) != 0u))
     289              :                 {
     290            0 :                         TLOG(TLVL_DEBUG + 34) << "DataSenderManager::sendFragment: Sending fragment with seqId " << seqID << " to destination " << dest;
     291            0 :                         TransferInterface::CopyStatus sts = TransferInterface::CopyStatus::kErrorNotRequiringException;
     292              : 
     293            0 :                         sts = destinations_[dest]->transfer_fragment_reliable_mode(std::move(frag));
     294            0 :                         if (sts != TransferInterface::CopyStatus::kSuccess)
     295              :                         {
     296            0 :                                 TLOG(TLVL_ERROR) << "sendFragment: Sending fragment " << seqID << " to destination "
     297            0 :                                                  << dest << " failed! Data has been lost!";
     298              :                         }
     299              : 
     300              :                         // sendFragTo(std::move(frag), dest);
     301            0 :                         sent_frag_count_.incSlot(dest);
     302            0 :                         outsts = sts;
     303              :                 }
     304            0 :                 else if (!should_stop_)
     305              :                 {
     306            0 :                         TLOG(TLVL_ERROR) << "calcDest returned invalid destination rank " << dest << "! This event has been lost: " << seqID
     307            0 :                                          << ". enabled_destinantions_.size()=" << enabled_destinations_.size();
     308              :                 }
     309              :         }
     310              : 
     311            0 :         if (!isSystemBroadcast)
     312              :         {
     313            0 :                 std::unique_lock<std::mutex> lck(sent_sequence_id_mutex_);
     314            0 :                 sent_sequence_id_count_[seqID]++;
     315            0 :         }
     316              : 
     317            0 :         auto delta_t = TimeUtils::GetElapsedTime(start_time);
     318              : 
     319            0 :         if (metricMan)
     320              :         {
     321            0 :                 TLOG(TLVL_DEBUG + 34) << "sendFragment: sending metrics";
     322            0 :                 metricMan->sendMetric("Data Send Time to Rank " + std::to_string(dest), delta_t, "s", 5, MetricMode::Accumulate);
     323            0 :                 metricMan->sendMetric("Data Send Size to Rank " + std::to_string(dest), fragSize, "B", 5, MetricMode::Accumulate | MetricMode::Maximum);
     324            0 :                 metricMan->sendMetric("Data Send Rate to Rank " + std::to_string(dest), fragSize / delta_t, "B/s", 5, MetricMode::Average);
     325            0 :                 metricMan->sendMetric("Data Send Count to Rank " + std::to_string(dest), sent_frag_count_.slotCount(dest), "fragments", 3, MetricMode::LastPoint);
     326              : 
     327            0 :                 metricMan->sendMetric("Rank", std::to_string(my_rank), "", 3, MetricMode::LastPoint);
     328            0 :                 metricMan->sendMetric("App Name", app_name, "", 3, MetricMode::LastPoint);
     329              : 
     330            0 :                 metricMan->sendMetric("Fragment Latency at Send", latency, "s", 4, MetricMode::Average | MetricMode::Maximum);
     331              :         }
     332              : 
     333            0 :         TLOG(TLVL_DEBUG + 34) << "sendFragment: Done sending fragment " << seqID << " to dest=" << dest;
     334            0 :         return std::make_pair(dest, outsts);
     335              : }  // artdaq::DataSenderManager::sendFragment
        

Generated by: LCOV version 2.0-1