LCOV - code coverage report
Current view: top level - artdaq/DAQrate - TransferTest.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 224 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 34 0

            Line data    Source code
       1              : #include "artdaq/DAQrate/TransferTest.hh"
       2              : 
       3              : #include "artdaq-core/Data/Fragment.hh"
       4              : #include "artdaq/DAQrate/DataSenderManager.hh"
       5              : #include "artdaq/DAQrate/FragmentReceiverManager.hh"
       6              : 
       7              : #define TRACE_NAME "TransferTest"
       8              : #include "artdaq/DAQdata/Globals.hh"
       9              : 
      10              : #include <future>
      11              : 
      12            0 : artdaq::TransferTest::TransferTest(fhicl::ParameterSet psi)
      13            0 :     : senders_(psi.get<int>("num_senders"))
      14            0 :     , receivers_(psi.get<int>("num_receivers"))
      15            0 :     , sending_threads_(psi.get<int>("sending_threads", 1))
      16            0 :     , sends_each_sender_(psi.get<int>("sends_per_sender"))
      17            0 :     , receives_each_receiver_(0)
      18            0 :     , buffer_count_(psi.get<int>("buffer_count", 10))
      19            0 :     , error_count_max_(psi.get<int>("max_errors_before_abort", 3))
      20            0 :     , fragment_size_(psi.get<size_t>("fragment_size", 0x100000))
      21            0 :     , validate_mode_(psi.get<bool>("validate_data_mode", false))
      22            0 :     , partition_number_(psi.get<int>("partition_number", rand() % 0x7F))  // NOLINT(cert-msc50-cpp)
      23              : {
      24            0 :         TLOG(TLVL_DEBUG + 35) << "CONSTRUCTOR";
      25              : 
      26            0 :         if (fragment_size_ < artdaq::detail::RawFragmentHeader::num_words() * sizeof(artdaq::RawDataType))
      27              :         {
      28            0 :                 fragment_size_ = artdaq::detail::RawFragmentHeader::num_words() * sizeof(artdaq::RawDataType);
      29              :         }
      30              : 
      31            0 :         fhicl::ParameterSet metric_pset;
      32              : 
      33              :         try
      34              :         {
      35            0 :                 metric_pset = psi.get<fhicl::ParameterSet>("metrics");
      36              :         }
      37            0 :         catch (...)
      38            0 :         {}  // OK if there's no metrics table defined in the FHiCL
      39              : 
      40              :         try
      41              :         {
      42            0 :                 std::string name = "TransferTest" + std::to_string(my_rank);
      43            0 :                 metricMan->initialize(metric_pset, name);
      44            0 :                 metricMan->do_start();
      45            0 :         }
      46            0 :         catch (...)
      47            0 :         {}
      48              : 
      49            0 :         auto type(psi.get<std::string>("transfer_plugin_type", "TCPSocket"));
      50              : 
      51            0 :         bool broadcast_mode = psi.get<bool>("broadcast_sends", false);
      52            0 :         if (broadcast_mode)
      53              :         {
      54            0 :                 receives_each_receiver_ = senders_ * sending_threads_ * sends_each_sender_;
      55              :         }
      56              :         else
      57              :         {
      58            0 :                 if (receivers_ > 0)
      59              :                 {
      60            0 :                         if (senders_ * sending_threads_ * sends_each_sender_ % receivers_ != 0)
      61              :                         {
      62            0 :                                 TLOG(TLVL_DEBUG + 33) << "Adding sends so that sends_each_sender * num_sending_ranks is a multiple of num_receiving_ranks" << std::endl;
      63            0 :                                 while (senders_ * sends_each_sender_ % receivers_ != 0)
      64              :                                 {
      65            0 :                                         sends_each_sender_++;
      66              :                                 }
      67            0 :                                 receives_each_receiver_ = senders_ * sending_threads_ * sends_each_sender_ / receivers_;
      68            0 :                                 TLOG(TLVL_DEBUG + 33) << "sends_each_sender is now " << sends_each_sender_ << std::endl;
      69            0 :                                 psi.put_or_replace("sends_per_sender", sends_each_sender_);
      70              :                         }
      71              :                         else
      72              :                         {
      73            0 :                                 receives_each_receiver_ = senders_ * sending_threads_ * sends_each_sender_ / receivers_;
      74              :                         }
      75              :                 }
      76              :         }
      77              : 
      78            0 :         std::string hostmap;
      79            0 :         if (psi.has_key("hostmap"))
      80              :         {
      81            0 :                 hostmap = " host_map: @local::hostmap";
      82              :         }
      83              : 
      84            0 :         std::stringstream ss;
      85            0 :         ss << psi.to_string() << std::endl;
      86              : 
      87            0 :         ss << " sources: {";
      88            0 :         for (int ii = 0; ii < senders_; ++ii)
      89              :         {
      90            0 :                 ss << "s" << ii << ": { transferPluginType: " << type << " source_rank: " << ii << " max_fragment_size_words : " << fragment_size_ << " buffer_count : " << buffer_count_ << " partition_number : " << partition_number_ << hostmap << " }" << std::endl;
      91              :         }
      92            0 :         ss << "}" << std::endl
      93            0 :            << " destinations: {";
      94            0 :         for (int jj = senders_; jj < senders_ + receivers_; ++jj)
      95              :         {
      96            0 :                 ss << "d" << jj << ": { transferPluginType: " << type << " destination_rank: " << jj << " max_fragment_size_words : " << fragment_size_ << " buffer_count : " << buffer_count_ << " partition_number : " << partition_number_ << hostmap << " }" << std::endl;
      97              :         }
      98            0 :         ss << "}" << std::endl;
      99              : 
     100            0 :         ps_ = fhicl::ParameterSet::make(ss.str());
     101              : 
     102            0 :         TLOG(TLVL_DEBUG + 32) << "Going to configure with ParameterSet: " << ps_.to_string() << std::endl;
     103            0 : }
     104              : 
     105            0 : int artdaq::TransferTest::runTest()
     106              : {
     107            0 :         TLOG(TLVL_INFO) << "runTest BEGIN: " << (my_rank < senders_ ? "sending" : "receiving");
     108            0 :         start_time_ = std::chrono::steady_clock::now();
     109            0 :         std::pair<size_t, double> result;
     110            0 :         if (my_rank >= senders_ + receivers_)
     111              :         {
     112            0 :                 return 0;
     113              :         }
     114            0 :         if (my_rank < senders_)
     115              :         {
     116            0 :                 std::vector<std::future<std::pair<size_t, double>>> results_futures(sending_threads_);
     117            0 :                 for (int ii = 0; ii < sending_threads_; ++ii)
     118              :                 {
     119            0 :                         results_futures[ii] = std::async(std::launch::async, std::bind(&TransferTest::do_sending, this, ii));
     120              :                 }
     121            0 :                 for (auto& future : results_futures)
     122              :                 {
     123            0 :                         if (future.valid())
     124              :                         {
     125            0 :                                 auto thisresult = future.get();
     126            0 :                                 result.first += thisresult.first;
     127            0 :                                 result.second += thisresult.second;
     128              :                         }
     129              :                 }
     130            0 :         }
     131              :         else
     132              :         {
     133            0 :                 result = do_receiving();
     134              :         }
     135            0 :         auto duration = std::chrono::duration_cast<artdaq::TimeUtils::seconds>(std::chrono::steady_clock::now() - start_time_).count();
     136            0 :         TLOG(TLVL_INFO) << (my_rank < senders_ ? "Sent " : "Received ") << result.first << " bytes in " << duration << " seconds ( " << formatBytes(result.first / duration) << "/s )." << std::endl;
     137            0 :         TLOG(TLVL_INFO) << "Rate of " << (my_rank < senders_ ? "sending" : "receiving") << ": " << formatBytes(result.first / result.second) << "/s, " << formatHertz((my_rank < senders_ ? sends_each_sender_ : receives_each_receiver_) / duration)
     138            0 :                         << std::endl;
     139            0 :         metricMan->do_stop();
     140            0 :         metricMan->shutdown();
     141            0 :         TLOG(TLVL_DEBUG + 36) << "runTest DONE";
     142            0 :         return return_code_;
     143              : }
     144              : 
     145            0 : std::pair<size_t, double> artdaq::TransferTest::do_sending(int index)
     146              : {
     147            0 :         TLOG(TLVL_DEBUG + 34) << "do_sending entered RawFragmentHeader::num_words()=" << artdaq::detail::RawFragmentHeader::num_words();
     148              : 
     149            0 :         size_t totalSize = 0;
     150            0 :         double totalTime = 0;
     151            0 :         artdaq::DataSenderManager sender(ps_);
     152              : 
     153            0 :         unsigned data_size_wrds = (fragment_size_ / sizeof(artdaq::RawDataType)) - artdaq::detail::RawFragmentHeader::num_words();
     154            0 :         artdaq::Fragment frag(data_size_wrds);
     155              : 
     156            0 :         if (validate_mode_)
     157              :         {
     158            0 :                 artdaq::RawDataType gen_seed = 0;
     159              : 
     160            0 :                 std::generate_n(frag.dataBegin(), data_size_wrds, [&]() { return ++gen_seed; });
     161            0 :                 for (size_t ii = 0; ii < frag.dataSize(); ++ii)
     162              :                 {
     163            0 :                         if (*(frag.dataBegin() + ii) != ii + 1)
     164              :                         {
     165            0 :                                 TLOG(TLVL_ERROR) << "Data corruption detected! (" << (*(frag.dataBegin() + ii)) << " != " << (ii + 1) << ") Aborting!";
     166            0 :                                 return_code_ = 255;
     167            0 :                                 return std::make_pair(0, 0.0);
     168              :                         }
     169              :                 }
     170              :         }
     171              : 
     172            0 :         int metric_send_interval = sends_each_sender_ / 1000 > 1 ? sends_each_sender_ / 1000 : 1;
     173            0 :         auto init_time_metric = 0.0;
     174            0 :         auto send_time_metric = 0.0;
     175            0 :         auto after_time_metric = 0.0;
     176            0 :         auto send_size_metric = 0.0;
     177            0 :         auto error_count = 0;
     178              : 
     179            0 :         for (int ii = 0; ii < sends_each_sender_; ++ii)
     180              :         {
     181            0 :                 auto loop_start = std::chrono::steady_clock::now();
     182            0 :                 TLOG(TLVL_DEBUG + 34) << "sender rank " << my_rank << " #" << ii << " resized bytes=" << frag.sizeBytes();
     183            0 :                 totalSize += frag.sizeBytes();
     184              : 
     185              :                 // unsigned sndDatSz = data_size_wrds;
     186            0 :                 frag.setSequenceID(ii * sending_threads_ + index);
     187            0 :                 frag.setFragmentID(my_rank);
     188            0 :                 frag.setSystemType(artdaq::Fragment::DataFragmentType);
     189              :                 /*
     190              :                         artdaq::Fragment::iterator it = frag.dataBegin();
     191              :                         *it = my_rank;
     192              :                         *++it = ii;
     193              :                         *++it = sndDatSz;*/
     194              : 
     195            0 :                 auto send_start = std::chrono::steady_clock::now();
     196            0 :                 TLOG(TLVL_DEBUG + 32) << "Sender " << my_rank << " sending fragment " << ii;
     197            0 :                 auto stspair = sender.sendFragment(std::move(frag));
     198            0 :                 auto after_send = std::chrono::steady_clock::now();
     199            0 :                 TLOG(TLVL_DEBUG + 33) << "Sender " << my_rank << " sent fragment " << ii;
     200            0 :                 sender.RemoveRoutingTableEntry(ii * sending_threads_ + index);
     201              :                 // usleep( (data_size_wrds*sizeof(artdaq::RawDataType))/233 );
     202              : 
     203            0 :                 if (stspair.second != artdaq::TransferInterface::CopyStatus::kSuccess)
     204              :                 {
     205            0 :                         error_count++;
     206            0 :                         if (error_count >= error_count_max_)
     207              :                         {
     208            0 :                                 TLOG(TLVL_ERROR) << "Too many errors sending fragments! Aborting... (sent=" << ii << "/" << sends_each_sender_ << ")";
     209            0 :                                 return_code_ = sends_each_sender_ - ii;
     210            0 :                                 return std::make_pair(0, 0.0);
     211              :                         }
     212              :                 }
     213              : 
     214            0 :                 frag = artdaq::Fragment(data_size_wrds);  // replace/renew
     215            0 :                 if (validate_mode_)
     216              :                 {
     217            0 :                         artdaq::RawDataType gen_seed = ii + 1;
     218              : 
     219            0 :                         std::generate_n(frag.dataBegin(), data_size_wrds, [&]() { return ++gen_seed; });
     220            0 :                         for (size_t jj = 0; jj < frag.dataSize(); ++jj)
     221              :                         {
     222            0 :                                 if (*(frag.dataBegin() + jj) != (ii + 1) + jj + 1)
     223              :                                 {
     224            0 :                                         TLOG(TLVL_ERROR) << "Input Data corruption detected! (" << *(frag.dataBegin() + jj) << " != " << ii + jj + 2 << " at position " << ii << ") Aborting!";
     225            0 :                                         return_code_ = 254;
     226            0 :                                         return std::make_pair(0, 0.0);
     227              :                                 }
     228              :                         }
     229              :                 }
     230            0 :                 TLOG(TLVL_DEBUG + 37) << "sender rank " << my_rank << " frag replaced";
     231              : 
     232            0 :                 auto total_send_time = std::chrono::duration_cast<artdaq::TimeUtils::seconds>(after_send - send_start).count();
     233            0 :                 totalTime += total_send_time;
     234            0 :                 send_time_metric += total_send_time;
     235            0 :                 send_size_metric += data_size_wrds * sizeof(artdaq::RawDataType);
     236            0 :                 after_time_metric += std::chrono::duration_cast<artdaq::TimeUtils::seconds>(std::chrono::steady_clock::now() - after_send).count();
     237            0 :                 init_time_metric += std::chrono::duration_cast<artdaq::TimeUtils::seconds>(send_start - loop_start).count();
     238              : 
     239            0 :                 if (metricMan && ii % metric_send_interval == 0)
     240              :                 {
     241            0 :                         metricMan->sendMetric("send_init_time", init_time_metric, "seconds", 3, MetricMode::Accumulate);
     242            0 :                         metricMan->sendMetric("total_send_time", send_time_metric, "seconds", 3, MetricMode::Accumulate);
     243            0 :                         metricMan->sendMetric("after_send_time", after_time_metric, "seconds", 3, MetricMode::Accumulate);
     244            0 :                         metricMan->sendMetric("send_rate", send_size_metric / send_time_metric, "B/s", 3, MetricMode::Average);
     245            0 :                         init_time_metric = 0.0;
     246            0 :                         send_time_metric = 0.0;
     247            0 :                         after_time_metric = 0.0;
     248            0 :                         send_size_metric = 0.0;
     249              :                 }
     250            0 :                 usleep(0);  // Yield execution
     251              :         }
     252              : 
     253            0 :         return std::make_pair(totalSize, totalTime);
     254            0 : }  // do_sending
     255              : 
     256            0 : std::pair<size_t, double> artdaq::TransferTest::do_receiving()
     257              : {
     258            0 :         TLOG(TLVL_DEBUG + 34) << "do_receiving entered";
     259              : 
     260            0 :         artdaq::FragmentReceiverManager receiver(ps_);
     261            0 :         receiver.start_threads();
     262            0 :         int counter = receives_each_receiver_;
     263            0 :         size_t totalSize = 0;
     264            0 :         double totalTime = 0;
     265            0 :         bool first = true;
     266            0 :         bool nonblocking_mode = ps_.get<bool>("nonblocking_sends", false);
     267            0 :         std::atomic<int> activeSenders(senders_ * sending_threads_);
     268            0 :         auto end_loop = std::chrono::steady_clock::now();
     269            0 :         auto last_receive = std::chrono::steady_clock::now();
     270              : 
     271            0 :         auto recv_size_metric = 0.0;
     272            0 :         auto recv_time_metric = 0.0;
     273            0 :         auto input_wait_metric = 0.0;
     274            0 :         auto init_wait_metric = 0.0;
     275            0 :         int metric_send_interval = receives_each_receiver_ / 1000 > 1 ? receives_each_receiver_ : 1;
     276              : 
     277              :         // Only abort when there are no senders if were's > 90% done
     278            0 :         while ((activeSenders > 0 || (counter > receives_each_receiver_ / 10 && !nonblocking_mode)) && counter > 0)
     279              :         {
     280            0 :                 auto start_loop = std::chrono::steady_clock::now();
     281            0 :                 TLOG(TLVL_DEBUG + 34) << "do_receiving: Counter is " << counter << ", calling recvFragment (activeSenders=" << activeSenders << ")";
     282            0 :                 int senderSlot = artdaq::TransferInterface::RECV_TIMEOUT;
     283            0 :                 auto before_receive = std::chrono::steady_clock::now();
     284              : 
     285            0 :                 auto ignoreFragPtr = receiver.recvFragment(senderSlot);
     286            0 :                 auto after_receive = std::chrono::steady_clock::now();
     287            0 :                 init_wait_metric += std::chrono::duration_cast<artdaq::TimeUtils::seconds>(before_receive - start_loop).count();
     288            0 :                 size_t thisSize = 0;
     289            0 :                 if (senderSlot >= artdaq::TransferInterface::RECV_SUCCESS && ignoreFragPtr)
     290              :                 {
     291            0 :                         last_receive = std::chrono::steady_clock::now();
     292            0 :                         if (ignoreFragPtr->type() == artdaq::Fragment::EndOfDataFragmentType)
     293              :                         {
     294            0 :                                 TLOG(TLVL_INFO) << "Receiver " << my_rank << " received EndOfData Fragment from Sender " << senderSlot;
     295            0 :                                 activeSenders--;
     296            0 :                                 TLOG(TLVL_DEBUG + 32) << "Active Senders is now " << activeSenders;
     297              :                         }
     298            0 :                         else if (ignoreFragPtr->type() != artdaq::Fragment::DataFragmentType)
     299              :                         {
     300            0 :                                 TLOG(TLVL_WARNING) << "Receiver " << my_rank << " received Fragment with System type " << artdaq::detail::RawFragmentHeader::SystemTypeToString(ignoreFragPtr->type()) << " (Unexpected!)";
     301              :                         }
     302              :                         else
     303              :                         {
     304            0 :                                 if (first)
     305              :                                 {
     306            0 :                                         start_time_ = std::chrono::steady_clock::now();
     307            0 :                                         first = false;
     308              :                                 }
     309            0 :                                 counter--;
     310            0 :                                 TLOG(TLVL_INFO) << "Receiver " << my_rank << " received fragment " << receives_each_receiver_ - counter
     311            0 :                                                 << " with seqID " << ignoreFragPtr->sequenceID() << " from Sender " << senderSlot << " (Expecting " << counter << " more)";
     312            0 :                                 thisSize = ignoreFragPtr->size() * sizeof(artdaq::RawDataType);
     313            0 :                                 totalSize += thisSize;
     314            0 :                                 if (validate_mode_)
     315              :                                 {
     316            0 :                                         for (size_t ii = 0; ii < ignoreFragPtr->dataSize(); ++ii)
     317              :                                         {
     318            0 :                                                 if (*(ignoreFragPtr->dataBegin() + ii) != ignoreFragPtr->sequenceID() + ii + 1)
     319              :                                                 {
     320            0 :                                                         TLOG(TLVL_ERROR) << "Output Data corruption detected! (" << *(ignoreFragPtr->dataBegin() + ii) << " != " << (ignoreFragPtr->sequenceID() + ii + 1) << " at position " << ii << ") Aborting!";
     321            0 :                                                         return_code_ = -3;
     322            0 :                                                         return std::make_pair(0, 0.0);
     323              :                                                 }
     324              :                                         }
     325              :                                 }
     326              :                         }
     327            0 :                         input_wait_metric += std::chrono::duration_cast<artdaq::TimeUtils::seconds>(after_receive - end_loop).count();
     328              :                 }
     329            0 :                 else if (senderSlot == artdaq::TransferInterface::DATA_END)
     330              :                 {
     331            0 :                         TLOG(TLVL_ERROR) << "Receiver " << my_rank << " detected fatal protocol error! Reducing active sender count by one!" << std::endl;
     332            0 :                         activeSenders--;
     333            0 :                         TLOG(TLVL_DEBUG + 32) << "Active Senders is now " << activeSenders;
     334              :                 }
     335            0 :                 TLOG(TLVL_DEBUG + 34) << "do_receiving: Recv Loop end, counter is " << counter;
     336              : 
     337            0 :                 auto total_recv_time = std::chrono::duration_cast<artdaq::TimeUtils::seconds>(after_receive - before_receive).count();
     338            0 :                 recv_time_metric += total_recv_time;
     339            0 :                 totalTime += total_recv_time;
     340            0 :                 recv_size_metric += thisSize;
     341              : 
     342            0 :                 if (metricMan && counter % metric_send_interval == 0)
     343              :                 {
     344            0 :                         metricMan->sendMetric("input_wait", input_wait_metric, "seconds", 3, MetricMode::Accumulate);
     345            0 :                         metricMan->sendMetric("recv_init_time", init_wait_metric, "seconds", 3, MetricMode::Accumulate);
     346            0 :                         metricMan->sendMetric("total_recv_time", recv_time_metric, "seconds", 3, MetricMode::Accumulate);
     347            0 :                         metricMan->sendMetric("recv_rate", recv_size_metric / recv_time_metric, "B/s", 3, MetricMode::Average);
     348              : 
     349            0 :                         input_wait_metric = 0.0;
     350            0 :                         init_wait_metric = 0.0;
     351            0 :                         recv_time_metric = 0.0;
     352            0 :                         recv_size_metric = 0.0;
     353              :                 }
     354              : 
     355            0 :                 if (artdaq::TimeUtils::GetElapsedTime(last_receive) > 5.0)
     356              :                 {
     357            0 :                         TLOG(TLVL_ERROR) << "Senders appear to have stopped (no data for >5 seconds), aborting test! counter=" << counter;
     358            0 :                         return std::make_pair(0, 0.0);
     359              :                 }
     360            0 :                 end_loop = std::chrono::steady_clock::now();
     361            0 :         }
     362              : 
     363            0 :         if (counter != 0 && !nonblocking_mode)
     364              :         {
     365            0 :                 TLOG(TLVL_ERROR) << "Did not receive all expected Fragments! Missing " << counter << " Fragments!";
     366            0 :                 return_code_ = counter;
     367            0 :                 return std::make_pair(0, 0.0);
     368              :         }
     369              : 
     370            0 :         return std::make_pair(totalSize, totalTime);
     371            0 : }
        

Generated by: LCOV version 2.0-1