LCOV - code coverage report
Current view: top level - test/TransferPlugins - BrokenTransferTest.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 350 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 65 0

            Line data    Source code
       1              : #include "BrokenTransferTest.hh"
       2              : 
       3              : #include "artdaq-core/Data/detail/RawFragmentHeader.hh"
       4              : #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
       5              : 
       6              : #include <memory>
       7              : #include <thread>
       8              : #include "artdaq/DAQdata/Globals.hh"
       9              : #define TRACE_NAME "BrokenTransferTest"
      10              : 
      11              : #define TLVL_MAKE_TRANSFER_PS TLVL_DEBUG + 5
      12              : #define TLVL_START_TEST TLVL_DEBUG + 6
      13              : #define TLVL_STOP_TEST TLVL_DEBUG + 7
      14              : #define TLVL_SENDER TLVL_DEBUG + 8
      15              : #define TLVL_SENDER_TOKEN_WAIT TLVL_DEBUG + 9
      16              : #define TLVL_RECEIVER TLVL_DEBUG + 10
      17              : 
      18            0 : artdaqtest::BrokenTransferTest::BrokenTransferTest(const fhicl::ParameterSet& ps)
      19            0 :     : sender_ready_()
      20            0 :     , receiver_ready_()
      21            0 :     , sender_current_fragment_()
      22            0 :     , ps_(ps)
      23            0 :     , test_start_time_(std::chrono::steady_clock::now())
      24            0 :     , test_end_time_(std::chrono::steady_clock::now())
      25            0 :     , test_end_requested_(false)
      26            0 :     , fragment_rate_hz_(ps.get<size_t>("fragment_rate_hz", 10))
      27            0 :     , pause_first_sender_(false)
      28            0 :     , pause_receiver_(false)
      29            0 :     , kill_first_sender_(false)
      30            0 :     , kill_receiver_(false)
      31            0 :     , reliable_mode_(ps.get<bool>("reliable_mode", true))
      32            0 :     , fragment_size_(ps.get<size_t>("fragment_size", 0x10000))
      33            0 :     , send_timeout_us_(ps.get<size_t>("send_timeout_us", 100000))
      34            0 :     , transfer_buffer_count_(ps.get<size_t>("transfer_buffer_count", 10))
      35            0 :     , event_buffer_count_(ps.get<size_t>("event_buffer_count", 20))
      36            0 :     , event_buffer_timeout_us_(ps.get<size_t>("event_buffer_timeout_us", 1000000))
      37            0 :     , send_throttle_us_(0)
      38              : {
      39            0 :         if (fragment_rate_hz_ == 0 || fragment_rate_hz_ > 100000)
      40              :         {
      41            0 :                 TLOG(TLVL_WARNING) << "Invalid rate " << fragment_rate_hz_ << " Hz specified, setting to " << (fragment_rate_hz_ == 0 ? 1 : 1000) << " Hz";
      42            0 :                 fragment_rate_hz_ = (fragment_rate_hz_ == 0 ? 1 : 1000);
      43              :         }
      44            0 : }
      45              : 
      46            0 : void artdaqtest::BrokenTransferTest::TestSenderPause()
      47              : {
      48            0 :         TLOG(TLVL_INFO) << "TestSenderPause BEGIN";
      49            0 :         auto start_time = std::chrono::steady_clock::now();
      50            0 :         start_test_();
      51            0 :         usleep_for_n_buffer_epochs_(2);
      52              : 
      53            0 :         TLOG(TLVL_INFO) << "Pausing First Sender";
      54            0 :         pause_first_sender_ = true;
      55            0 :         usleep_for_n_buffer_epochs_(2);
      56            0 :         usleep(2 * event_buffer_timeout_us_);
      57              : 
      58            0 :         TLOG(TLVL_INFO) << "Resuming First Sender";
      59            0 :         pause_first_sender_ = false;
      60            0 :         usleep_for_n_buffer_epochs_(2);
      61              : 
      62            0 :         stop_test_();
      63            0 :         TLOG(TLVL_INFO) << "TestSenderPause END, duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
      64            0 : }
      65              : 
      66            0 : void artdaqtest::BrokenTransferTest::TestReceiverPause()
      67              : {
      68            0 :         TLOG(TLVL_INFO) << "TestReceiverPause BEGIN";
      69            0 :         auto start_time = std::chrono::steady_clock::now();
      70            0 :         start_test_();
      71            0 :         usleep_for_n_buffer_epochs_(2);
      72              : 
      73            0 :         TLOG(TLVL_INFO) << "Pausing Recevier";
      74            0 :         pause_receiver_ = true;
      75            0 :         usleep_for_n_buffer_epochs_(2);
      76            0 :         usleep(2 * event_buffer_timeout_us_);
      77              : 
      78            0 :         TLOG(TLVL_INFO) << "Resuming Receiver";
      79            0 :         pause_receiver_ = false;
      80            0 :         usleep_for_n_buffer_epochs_(2);
      81              : 
      82            0 :         stop_test_();
      83            0 :         TLOG(TLVL_INFO) << "TestReceiverPause END, duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
      84            0 : }
      85              : 
      86            0 : void artdaqtest::BrokenTransferTest::TestSenderReconnect()
      87              : {
      88            0 :         TLOG(TLVL_INFO) << "TestSenderReconnect BEGIN";
      89            0 :         auto start_time = std::chrono::steady_clock::now();
      90            0 :         start_test_();
      91            0 :         usleep_for_n_buffer_epochs_(2);
      92              : 
      93            0 :         TLOG(TLVL_INFO) << "Killing first Sender";
      94            0 :         kill_first_sender_ = true;
      95            0 :         if (sender_threads_[0].joinable())
      96              :         {
      97            0 :                 sender_threads_[0].join();
      98              :         }
      99            0 :         kill_first_sender_ = false;
     100              : 
     101            0 :         usleep_for_n_buffer_epochs_(2);
     102            0 :         usleep(2 * event_buffer_timeout_us_);
     103              : 
     104            0 :         TLOG(TLVL_INFO) << "Restarting First Sender";
     105            0 :         boost::thread::attributes attrs;
     106            0 :         attrs.set_stack_size(4096 * 2000);  // 2000 KB
     107              :         try
     108              :         {
     109            0 :                 sender_threads_[0] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_sending_, this, 0));
     110              :         }
     111            0 :         catch (const boost::exception& e)
     112              :         {
     113            0 :                 TLOG(TLVL_ERROR) << "Caught boost::exception starting Sender thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
     114            0 :                 std::cerr << "Caught boost::exception starting Sender thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
     115            0 :                 exit(5);
     116            0 :         }
     117              : 
     118            0 :         usleep_for_n_buffer_epochs_(2);
     119              : 
     120            0 :         stop_test_();
     121            0 :         TLOG(TLVL_INFO) << "TestSenderReconnect END, duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
     122            0 : }
     123              : 
     124            0 : void artdaqtest::BrokenTransferTest::TestReceiverReconnect(int send_throttle_factor)
     125              : {
     126            0 :         TLOG(TLVL_INFO) << "TestReceiverReconnect BEGIN";
     127            0 :         auto start_time = std::chrono::steady_clock::now();
     128            0 :         send_throttle_us_ = send_throttle_factor * 1000000 / fragment_rate_hz_;
     129            0 :         start_test_();
     130            0 :         usleep_for_n_buffer_epochs_(2);
     131              : 
     132            0 :         TLOG(TLVL_INFO) << "Killing Receiver duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
     133              :         ;
     134            0 :         kill_receiver_ = true;
     135            0 :         if (receiver_threads_[0].joinable())
     136              :         {
     137            0 :                 receiver_threads_[0].join();
     138              :         }
     139            0 :         if (receiver_threads_[1].joinable())
     140              :         {
     141            0 :                 receiver_threads_[1].join();
     142              :         }
     143            0 :         kill_receiver_ = false;
     144              : 
     145            0 :         usleep_for_n_buffer_epochs_(2);
     146            0 :         usleep(2 * event_buffer_timeout_us_);
     147              : 
     148            0 :         TLOG(TLVL_INFO) << "Restarting Receiver duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
     149            0 :         boost::thread::attributes attrs;
     150            0 :         attrs.set_stack_size(4096 * 2000);  // 2000 KB
     151              :         try
     152              :         {
     153            0 :                 receiver_threads_[0] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_receiving_, this, 0, 2));
     154            0 :                 receiver_threads_[1] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_receiving_, this, 1, 2));
     155              :         }
     156            0 :         catch (const boost::exception& e)
     157              :         {
     158            0 :                 TLOG(TLVL_ERROR) << "Caught boost::exception starting Receiver thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
     159            0 :                 std::cerr << "Caught boost::exception starting Receiver thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
     160            0 :                 exit(5);
     161            0 :         }
     162              : 
     163            0 :         usleep_for_n_buffer_epochs_(2);
     164              : 
     165            0 :         TLOG(TLVL_INFO) << "Stopping test, duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
     166            0 :         stop_test_();
     167            0 :         TLOG(TLVL_INFO) << "TestReceiverReconnect END, duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
     168            0 : }
     169              : 
     170            0 : fhicl::ParameterSet artdaqtest::BrokenTransferTest::make_transfer_ps_(int sender_rank, int receiver_rank, const std::string& name)
     171              : {
     172            0 :         auto thePs = ps_.get<fhicl::ParameterSet>("default_transfer_ps", fhicl::ParameterSet());
     173              : 
     174            0 :         thePs.put_or_replace("transferPluginType", ps_.get<std::string>("transfer_to_use", "TCPSocket"));
     175            0 :         thePs.put_or_replace("destination_rank", receiver_rank);
     176            0 :         thePs.put_or_replace("source_rank", sender_rank);
     177            0 :         thePs.put_or_replace("buffer_count", transfer_buffer_count_);
     178            0 :         if (!thePs.has_key("max_fragment_size_words"))
     179              :         {
     180            0 :                 thePs.put("max_fragment_size_words", fragment_size_ + artdaq::detail::RawFragmentHeader::num_words() + 1);
     181              :         }
     182            0 :         fhicl::ParameterSet outputPs;
     183              : 
     184            0 :         TLOG(TLVL_MAKE_TRANSFER_PS) << "Configuring transfer between " << sender_rank << " and " << receiver_rank << " with ParameterSet: " << thePs.to_string();
     185              : 
     186            0 :         outputPs.put(name, thePs);
     187            0 :         return outputPs;
     188            0 : }
     189              : 
     190            0 : void artdaqtest::BrokenTransferTest::start_test_()
     191              : {
     192            0 :         TLOG(TLVL_START_TEST) << "start_test_ BEGIN";
     193              : 
     194            0 :         sender_ready_[0] = false;
     195            0 :         sender_ready_[1] = false;
     196              : 
     197            0 :         receiver_ready_[0] = false;
     198            0 :         receiver_ready_[1] = false;
     199              : 
     200            0 :         sender_current_fragment_[0] = 0;
     201            0 :         sender_current_fragment_[1] = 0;
     202              : 
     203            0 :         test_start_time_ = std::chrono::steady_clock::now();
     204            0 :         test_end_time_ = std::chrono::steady_clock::now();
     205              : 
     206            0 :         test_end_requested_ = false;
     207            0 :         pause_first_sender_ = false;
     208            0 :         pause_receiver_ = false;
     209            0 :         kill_first_sender_ = false;
     210            0 :         kill_receiver_ = false;
     211              : 
     212            0 :         event_buffer_.clear();
     213            0 :         complete_events_.clear();
     214            0 :         timeout_events_.clear();
     215              : 
     216            0 :         TLOG(TLVL_START_TEST) << "start_test_: Starting receiver threads";
     217            0 :         boost::thread::attributes attrs;
     218            0 :         attrs.set_stack_size(4096 * 2000);  // 2000 KB
     219              :         try
     220              :         {
     221            0 :                 receiver_threads_[0] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_receiving_, this, 0, 2));
     222            0 :                 receiver_threads_[1] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_receiving_, this, 1, 2));
     223              :         }
     224            0 :         catch (const boost::exception& e)
     225              :         {
     226            0 :                 TLOG(TLVL_ERROR) << "Caught boost::exception starting Receiver thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
     227            0 :                 std::cerr << "Caught boost::exception starting Receiver thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
     228            0 :                 exit(5);
     229            0 :         }
     230              : 
     231            0 :         TLOG(TLVL_START_TEST) << "start_test_: Waiting for receiver_ready_";
     232            0 :         while (!receiver_ready_[0] || !receiver_ready_[1])
     233              :         {
     234            0 :                 usleep(10000);
     235              :         }
     236              : 
     237            0 :         TLOG(TLVL_START_TEST) << "start_test_: Starting sender threads";
     238              :         try
     239              :         {
     240            0 :                 sender_threads_[0] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_sending_, this, 0));
     241            0 :                 sender_threads_[1] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_sending_, this, 1));
     242              :         }
     243            0 :         catch (const boost::exception& e)
     244              :         {
     245            0 :                 TLOG(TLVL_ERROR) << "Caught boost::exception starting Sender thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
     246            0 :                 std::cerr << "Caught boost::exception starting Sender thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
     247            0 :                 exit(5);
     248            0 :         }
     249              : 
     250            0 :         TLOG(TLVL_START_TEST) << "start_test_: Waiting for sender_ready_";
     251            0 :         while (!sender_ready_[0] || !sender_ready_[1])
     252              :         {
     253            0 :                 usleep(1000);
     254              :         }
     255              : 
     256            0 :         TLOG(TLVL_START_TEST) << "start_test_ DONE";
     257            0 : }
     258              : 
     259            0 : void artdaqtest::BrokenTransferTest::stop_test_()
     260              : {
     261            0 :         TLOG(TLVL_STOP_TEST) << "stop_test_ BEGIN";
     262            0 :         test_end_time_ = std::chrono::steady_clock::now();
     263            0 :         test_end_requested_ = true;
     264              : 
     265            0 :         TLOG(TLVL_STOP_TEST) << "stop_test_: Waiting for sender threads to shut down";
     266            0 :         while (sender_ready_[0] || sender_ready_[1])
     267              :         {
     268            0 :                 usleep(1000);
     269              :         }
     270              : 
     271            0 :         TLOG(TLVL_STOP_TEST) << "stop_test_: Joining sender threads";
     272            0 :         if (sender_threads_[0].joinable())
     273              :         {
     274            0 :                 sender_threads_[0].join();
     275              :         }
     276            0 :         if (sender_threads_[1].joinable())
     277              :         {
     278            0 :                 sender_threads_[1].join();
     279              :         }
     280              : 
     281            0 :         TLOG(TLVL_STOP_TEST) << "stop_test_: Waiting for receiver threads to shut down";
     282            0 :         while (receiver_ready_[0] || receiver_ready_[1])
     283              :         {
     284            0 :                 usleep(1000);
     285              :         }
     286              : 
     287            0 :         TLOG(TLVL_STOP_TEST) << "stop_test_: Joining receiver threads";
     288            0 :         if (receiver_threads_[0].joinable())
     289              :         {
     290            0 :                 receiver_threads_[0].join();
     291              :         }
     292            0 :         if (receiver_threads_[1].joinable())
     293              :         {
     294            0 :                 receiver_threads_[1].join();
     295              :         }
     296              : 
     297            0 :         TLOG(TLVL_INFO) << "Sent " << sender_current_fragment_[0] << " events from rank 0 and " << sender_current_fragment_[1] << " events from rank 1.";
     298              : 
     299            0 :         artdaq::Fragment::sequence_id_t expected_events = sender_current_fragment_[0];
     300            0 :         if (sender_current_fragment_[1] > expected_events)
     301              :         {
     302            0 :                 expected_events = sender_current_fragment_[1];
     303              :         }
     304              : 
     305            0 :         auto complete_events = complete_events_.size();
     306            0 :         auto incomplete_events = timeout_events_.size();
     307            0 :         auto missing_events = expected_events - complete_events - incomplete_events;
     308              : 
     309            0 :         TLOG(TLVL_INFO) << "Received " << complete_events << " complete events in " << fm_(artdaq::TimeUtils::GetElapsedTime(test_start_time_), "s")
     310            0 :                         << ", Incomplete: " << incomplete_events << ", Missing: " << missing_events;
     311            0 :         TLOG(TLVL_STOP_TEST) << "stop_test_ END";
     312            0 : }
     313              : 
     314            0 : void artdaqtest::BrokenTransferTest::do_sending_(int sender_rank)
     315              : {
     316            0 :         std::unique_ptr<artdaq::TransferInterface> theTransfer = artdaq::MakeTransferPlugin(make_transfer_ps_(sender_rank, 2, "d2"),
     317            0 :                                                                                             "d2", artdaq::TransferInterface::Role::kSend);
     318              : 
     319            0 :         TLOG(TLVL_SENDER) << "Sender " << sender_rank << " setting sender_ready_";
     320            0 :         sender_ready_[sender_rank] = true;
     321              : 
     322            0 :         while (sender_current_fragment_[sender_rank] < sequence_id_target_() || !test_end_requested_)
     323              :         {
     324            0 :                 if (sender_rank == 0 && kill_first_sender_)
     325              :                 {
     326            0 :                         break;
     327              :                 }
     328            0 :                 while (sender_rank == 0 && pause_first_sender_)
     329              :                 {
     330            0 :                         std::this_thread::yield();
     331            0 :                         usleep(10000);
     332              :                 }
     333              : 
     334            0 :                 artdaq::Fragment frag(fragment_size_);
     335            0 :                 frag.setSequenceID(sender_current_fragment_[sender_rank]);
     336            0 :                 frag.setFragmentID(sender_rank);
     337            0 :                 frag.setSystemType(artdaq::Fragment::DataFragmentType);
     338              : 
     339            0 :                 auto start_time = std::chrono::steady_clock::now();
     340            0 :                 auto sts = artdaq::TransferInterface::CopyStatus::kErrorNotRequiringException;
     341              : 
     342            0 :                 if (sender_tokens_[sender_rank].load() == 0)
     343              :                 {
     344            0 :                         TLOG(TLVL_SENDER_TOKEN_WAIT) << "Sender " << sender_rank << " waiting for token from receiver";
     345            0 :                         while (sender_tokens_[sender_rank].load() == 0 && !test_end_requested_) { usleep(10000); }
     346            0 :                         if (test_end_requested_)
     347              :                         {
     348            0 :                                 continue;
     349              :                         }
     350            0 :                         TLOG(TLVL_SENDER_TOKEN_WAIT) << "Sender " << sender_rank << " waited " << fm_(artdaq::TimeUtils::GetElapsedTime(start_time), "s") << " for token from receiver";
     351              :                 }
     352              : 
     353            0 :                 if (reliable_mode_)
     354              :                 {
     355            0 :                         sts = theTransfer->transfer_fragment_reliable_mode(std::move(frag));
     356              :                 }
     357              :                 else
     358              :                 {
     359            0 :                         sts = theTransfer->transfer_fragment_min_blocking_mode(frag, send_timeout_us_);
     360              :                 }
     361              : 
     362            0 :                 if (sts != artdaq::TransferInterface::CopyStatus::kSuccess)
     363              :                 {
     364            0 :                         TLOG(TLVL_ERROR) << "Error sending Fragment " << sender_current_fragment_[sender_rank] << " from sender rank " << sender_rank << ": "
     365            0 :                                          << artdaq::TransferInterface::CopyStatusToString(sts);
     366              :                 }
     367            0 :                 auto duration = artdaq::TimeUtils::GetElapsedTime(start_time);
     368            0 :                 TLOG(TLVL_SENDER) << "Sender " << sender_rank << " Transferred Fragment " << sender_current_fragment_[sender_rank]
     369            0 :                                   << " with size " << fragment_size_ << " words in " << fm_(duration, "s")
     370            0 :                                   << " (approx " << fm_(static_cast<double>(fragment_size_ * sizeof(artdaq::detail::RawFragmentHeader::RawDataType)) / duration, "B/s")
     371            0 :                                   << ") throttle " << send_throttle_us_;
     372            0 :                 ++sender_current_fragment_[sender_rank];
     373            0 :                 sender_tokens_[sender_rank]--;
     374            0 :                 throttle_sender_(sender_rank);
     375            0 :         }
     376              : 
     377            0 :         TLOG(TLVL_SENDER) << "Sender " << sender_rank << " shutting down...";
     378            0 :         theTransfer.reset(nullptr);
     379            0 :         sender_ready_[sender_rank] = false;
     380            0 :         TLOG(TLVL_SENDER) << "Sender " << sender_rank << " DONE";
     381            0 : }
     382              : 
     383            0 : void artdaqtest::BrokenTransferTest::do_receiving_(int sender_rank, int receiver_rank)
     384              : {
     385              :         std::unique_ptr<artdaq::TransferInterface> theTransfer =
     386            0 :             artdaq::MakeTransferPlugin(make_transfer_ps_(sender_rank, receiver_rank, "s" + std::to_string(sender_rank)),
     387            0 :                                        "s" + std::to_string(sender_rank), artdaq::TransferInterface::Role::kReceive);
     388            0 :         artdaq::FragmentPtr dropFrag = nullptr;
     389              : 
     390            0 :         TLOG(TLVL_RECEIVER) << "Receiver " << sender_rank << "->" << receiver_rank << " setting receiver_ready_";
     391            0 :         receiver_ready_[sender_rank] = true;
     392            0 :         sender_tokens_[sender_rank] = event_buffer_count_;
     393              : 
     394            0 :         while (!event_buffer_.empty() || !test_end_requested_ || sender_ready_[0] || sender_ready_[1])
     395              :         {
     396            0 :                 if (kill_receiver_)
     397              :                 {
     398            0 :                         break;
     399              :                 }
     400            0 :                 while (pause_receiver_)
     401              :                 {
     402            0 :                         std::this_thread::yield();
     403            0 :                         usleep(10000);
     404              :                 }
     405              : 
     406              :                 artdaq::detail::RawFragmentHeader hdr;
     407            0 :                 auto rank = theTransfer->receiveFragmentHeader(hdr, 100000);
     408              : 
     409            0 :                 if (rank == artdaq::TransferInterface::RECV_TIMEOUT || event_buffer_.count(hdr.sequence_id) == 0)
     410              :                 {
     411            0 :                         std::unique_lock<std::mutex> lk(event_buffer_mutex_);
     412              :                         do
     413              :                         {
     414            0 :                                 event_buffer_cv_.wait_for(lk, std::chrono::microseconds(10000));
     415              : 
     416            0 :                                 auto it = event_buffer_.begin();
     417            0 :                                 while (it != event_buffer_.end())
     418              :                                 {
     419            0 :                                         if (artdaq::TimeUtils::GetElapsedTimeMicroseconds(it->second.open_time) > event_buffer_timeout_us_)
     420              :                                         {
     421            0 :                                                 TLOG(TLVL_WARNING) << "Receiver " << sender_rank << "->" << receiver_rank << ": Event " << it->first
     422            0 :                                                                    << " has timed out after " << artdaq::TimeUtils::GetElapsedTime(it->second.open_time) << " s, removing...";
     423            0 :                                                 timeout_events_.insert(it->first);
     424            0 :                                                 it = event_buffer_.erase(it);
     425            0 :                                                 sender_tokens_[0]++;
     426            0 :                                                 sender_tokens_[1]++;
     427              :                                         }
     428              :                                         else
     429              :                                         {
     430            0 :                                                 ++it;
     431              :                                         }
     432              :                                 }
     433            0 :                         } while (event_buffer_.size() > event_buffer_count_);
     434            0 :                 }
     435              : 
     436            0 :                 if (rank != sender_rank)
     437              :                 {
     438            0 :                         continue;
     439              :                 }
     440              : 
     441            0 :                 artdaq::RawDataType* ptr = nullptr;
     442            0 :                 bool first = true;
     443              :                 {
     444            0 :                         std::unique_lock<std::mutex> lk(event_buffer_mutex_);
     445            0 :                         if (timeout_events_.count(hdr.sequence_id) != 0u)
     446              :                         {
     447            0 :                                 TLOG(TLVL_WARNING) << "Event " << hdr.sequence_id << " has timed out, discarding";
     448            0 :                                 if (!dropFrag || dropFrag->size() < hdr.word_count)
     449              :                                 {
     450            0 :                                         dropFrag = std::make_unique<artdaq::Fragment>(hdr.word_count - hdr.num_words());
     451              :                                 }
     452            0 :                                 ptr = dropFrag->headerAddress() + hdr.num_words();  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     453              :                         }
     454              :                         else
     455              :                         {
     456            0 :                                 if (event_buffer_.count(hdr.sequence_id) == 0u)
     457              :                                 {
     458            0 :                                         event_buffer_[hdr.sequence_id].open_time = std::chrono::steady_clock::now();
     459            0 :                                         event_buffer_[hdr.sequence_id].first_frag.reset(new artdaq::Fragment(hdr.word_count - hdr.num_words()));
     460            0 :                                         ptr = event_buffer_[hdr.sequence_id].first_frag->headerAddress() + hdr.num_words();  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     461            0 :                                         TLOG(TLVL_RECEIVER) << "Receiver " << sender_rank << "->" << receiver_rank << " opened event " << hdr.sequence_id
     462            0 :                                                             << " with Fragment from rank " << sender_rank;
     463              :                                 }
     464              :                                 else
     465              :                                 {
     466            0 :                                         event_buffer_[hdr.sequence_id].second_frag.reset(new artdaq::Fragment(hdr.word_count - hdr.num_words()));
     467            0 :                                         ptr = event_buffer_[hdr.sequence_id].second_frag->headerAddress() + hdr.num_words();  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     468            0 :                                         first = false;
     469              :                                 }
     470              :                         }
     471            0 :                 }
     472              : 
     473            0 :                 rank = theTransfer->receiveFragmentData(ptr, hdr.word_count - hdr.num_words());
     474            0 :                 if (rank != sender_rank)
     475              :                 {
     476            0 :                         TLOG(TLVL_ERROR) << "Error receiving Fragment data after header received successfully!";
     477            0 :                         exit(1);
     478              :                 }
     479              : 
     480            0 :                 if (!first)
     481              :                 {
     482            0 :                         TLOG(TLVL_RECEIVER) << "Receiver " << sender_rank << "->" << receiver_rank << " completed event " << hdr.sequence_id
     483            0 :                                             << " in " << fm_(artdaq::TimeUtils::GetElapsedTime(event_buffer_[hdr.sequence_id].open_time), "s") << ".";
     484              : 
     485            0 :                         std::unique_lock<std::mutex> lk(event_buffer_mutex_);
     486            0 :                         complete_events_.insert(hdr.sequence_id);
     487            0 :                         event_buffer_.erase(hdr.sequence_id);
     488            0 :                         event_buffer_cv_.notify_one();
     489            0 :                         sender_tokens_[0]++;
     490            0 :                         sender_tokens_[1]++;
     491            0 :                 }
     492              :         }
     493              : 
     494            0 :         TLOG(TLVL_RECEIVER) << "Receiver " << sender_rank << "->" << receiver_rank << " shutting down...";
     495            0 :         theTransfer->flush_buffers();
     496              : 
     497            0 :         std::lock_guard<std::mutex> lk(event_buffer_mutex_);
     498            0 :         theTransfer.reset(nullptr);
     499            0 :         receiver_ready_[sender_rank] = false;
     500            0 :         TLOG(TLVL_RECEIVER) << "Receiver " << sender_rank << "->" << receiver_rank << " DONE";
     501            0 : }
     502              : 
     503            0 : void artdaqtest::BrokenTransferTest::throttle_sender_(int sender_rank)
     504              : {
     505            0 :         if (send_throttle_us_ != 0 && sender_current_fragment_[sender_rank] >= sequence_id_target_() - fragment_rate_hz_)
     506              :         {
     507            0 :                 usleep(send_throttle_us_);
     508              :         }
     509            0 : }
     510              : 
     511            0 : artdaq::Fragment::sequence_id_t artdaqtest::BrokenTransferTest::sequence_id_target_()
     512              : {
     513            0 :         auto ret = 1 + (artdaq::TimeUtils::GetElapsedTimeMicroseconds(test_start_time_) * fragment_rate_hz_ / 1000000);
     514            0 :         if (test_end_requested_)
     515              :         {
     516            0 :                 ret = 1 + (artdaq::TimeUtils::GetElapsedTimeMicroseconds(test_start_time_, test_end_time_) * fragment_rate_hz_ / 1000000);
     517              :         }
     518              :         // TLOG(TLVL_DEBUG) << "sequence_id_target_ is " << ret;
     519            0 :         return ret;
     520              : }
     521              : 
     522            0 : std::string artdaqtest::BrokenTransferTest::fm_(double data, const std::string& units, int logt)
     523              : {
     524            0 :         if (data < 1 && logt > -3)
     525              :         {
     526            0 :                 return fm_(data * 1000, units, logt - 1);
     527              :         }
     528            0 :         if (data > 1000 && logt < 3)
     529              :         {
     530            0 :                 return fm_(data / 1000, units, logt + 1);
     531              :         }
     532              : 
     533            0 :         std::stringstream o;
     534            0 :         o << std::fixed << std::setprecision(2) << data << " ";
     535            0 :         switch (logt)
     536              :         {
     537            0 :                 case -3:
     538            0 :                         o << "n";
     539            0 :                         break;
     540            0 :                 case -2:
     541            0 :                         o << "u";
     542            0 :                         break;
     543            0 :                 case -1:
     544            0 :                         o << "m";
     545            0 :                         break;
     546            0 :                 case 0:
     547              :                 default:
     548            0 :                         break;
     549            0 :                 case 1:
     550            0 :                         o << "K";
     551            0 :                         break;
     552            0 :                 case 2:
     553            0 :                         o << "M";
     554            0 :                         break;
     555            0 :                 case 3:
     556            0 :                         o << "G";
     557            0 :                         break;
     558              :         }
     559            0 :         o << units;
     560            0 :         return o.str();
     561            0 : }
        

Generated by: LCOV version 2.0-1