LCOV - code coverage report
Current view: top level - artdaq/TransferPlugins - Bundle_transfer.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 172 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 46 0

            Line data    Source code
       1              : #include <memory>
       2              : 
       3              : #include "artdaq/DAQdata/Globals.hh"
       4              : #define TRACE_NAME (app_name + "_BundleTransfer").c_str()
       5              : 
       6              : #include "artdaq-core/Data/ContainerFragmentLoader.hh"
       7              : #include "artdaq/TransferPlugins/TCPSocketTransfer.hh"
       8              : #include "artdaq/TransferPlugins/TransferInterface.hh"
       9              : 
      10              : #include <boost/thread.hpp>
      11              : 
      12              : namespace artdaq {
      13              : /**
      14              :  * \brief The BundleTransfer TransferInterface plugin automatically combines smaller Fragments to transfer a lower rate of larger Fragments, which TCP is better able to handle.
      15              :  */
      16              : class BundleTransfer : public TransferInterface
      17              : {
      18              : public:
      19              :         /**
      20              :          * \brief BundleTransfer Constructor
      21              :          * \param pset ParameterSet used to configure BundleTransfer
      22              :          * \param role Role of this TransferInterface, either kReceive or kSend
      23              :          */
      24              :         BundleTransfer(const fhicl::ParameterSet& pset, Role role);
      25              : 
      26              :         /**
      27              :          * \brief BundleTransfer default Destructor
      28              :          */
      29              :         ~BundleTransfer() override;
      30              : 
      31              :         /**
      32              :          * \brief Receive a Fragment, using the underlying transfer plugin
      33              :          * \param fragment Output Fragment
      34              :          * \param receiveTimeout Time to wait before returning TransferInterface::RECV_TIMEOUT
      35              :          * \return Rank of sender
      36              :          */
      37            0 :         int receiveFragment(artdaq::Fragment& fragment,
      38              :                             size_t receiveTimeout) override
      39              :         {
      40            0 :                 if (bundle_fragment_ == nullptr)
      41              :                 {
      42            0 :                         receive_bundle_fragment_(receiveTimeout);
      43            0 :                         if (current_rank_ < RECV_SUCCESS) return current_rank_;
      44              :                 }
      45              : 
      46            0 :                 ContainerFragment cf(*bundle_fragment_);
      47            0 :                 TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Retrieving Fragment " << (current_block_index_ + 1) << " of " << cf.block_count();
      48            0 :                 fragment.resizeBytes(cf.fragSize(current_block_index_) - sizeof(detail::RawFragmentHeader));
      49            0 :                 memcpy(fragment.headerAddress(), static_cast<const uint8_t*>(cf.dataBegin()) + cf.fragmentIndex(current_block_index_), cf.fragSize(current_block_index_));
      50            0 :                 current_block_index_++;
      51            0 :                 if (current_block_index_ >= cf.block_count())  // Index vs. count!
      52              :                 {
      53            0 :                         bundle_fragment_.reset(nullptr);
      54              :                 }
      55            0 :                 return current_rank_;
      56            0 :         }
      57              : 
      58              :         /**
      59              :          * \brief Receive a Fragment Header from the transport mechanism
      60              :          * \param[out] header Received Fragment Header
      61              :          * \param receiveTimeout Timeout for receive
      62              :          * \return The rank the Fragment was received from (should be source_rank), or RECV_TIMEOUT
      63              :          */
      64            0 :         int receiveFragmentHeader(detail::RawFragmentHeader& header, size_t receiveTimeout) override
      65              :         {
      66            0 :                 if (bundle_fragment_ == nullptr)
      67              :                 {
      68            0 :                         receive_bundle_fragment_(receiveTimeout);
      69            0 :                         if (current_rank_ < RECV_SUCCESS) return current_rank_;
      70              :                 }
      71            0 :                 ContainerFragment cf(*bundle_fragment_);
      72            0 :                 TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Retrieving Fragment Header " << (current_block_index_ + 1) << " of " << cf.block_count();
      73            0 :                 memcpy(&header, static_cast<const uint8_t*>(cf.dataBegin()) + cf.fragmentIndex(current_block_index_), sizeof(detail::RawFragmentHeader));
      74            0 :                 return current_rank_;
      75            0 :         }
      76              : 
      77              :         /**
      78              :          * \brief Receive the body of a Fragment to the given destination pointer
      79              :          * \param destination Pointer to memory region where Fragment data should be stored
      80              :          * \param wordCount Number of words of Fragment data to receive
      81              :          * \return The rank the Fragment was received from (should be source_rank), or RECV_TIMEOUT
      82              :          */
      83            0 :         int receiveFragmentData(RawDataType* destination, size_t /*wordCount*/) override
      84              :         {
      85            0 :                 if (bundle_fragment_ == nullptr)  // Should be impossible!
      86              :                 {
      87            0 :                         return RECV_TIMEOUT;
      88              :                 }
      89            0 :                 ContainerFragment cf(*bundle_fragment_);
      90            0 :                 TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Retrieving Fragment Data " << (current_block_index_ + 1) << " of " << cf.block_count();
      91            0 :                 memcpy(destination, static_cast<const uint8_t*>(cf.dataBegin()) + cf.fragmentIndex(current_block_index_) + sizeof(detail::RawFragmentHeader), cf.fragSize(current_block_index_) - sizeof(detail::RawFragmentHeader));
      92            0 :                 current_block_index_++;
      93            0 :                 if (current_block_index_ >= cf.block_count())  // Index vs. count!
      94              :                 {
      95            0 :                         bundle_fragment_.reset(nullptr);
      96              :                 }
      97            0 :                 return current_rank_;
      98            0 :         }
      99              : 
     100              :         /**
     101              :          * \brief Send a Fragment in non-reliable mode, using the underlying transfer plugin
     102              :          * \param fragment The Fragment to send
     103              :          * \param send_timeout_usec How long to wait before aborting. Defaults to size_t::MAX_VALUE
     104              :          * \return A TransferInterface::CopyStatus result variable
     105              :          */
     106            0 :         CopyStatus transfer_fragment_min_blocking_mode(artdaq::Fragment const& fragment, size_t send_timeout_usec) override
     107              :         {
     108            0 :                 TLOG(TLVL_DEBUG + 35) << GetTraceName() << "transfer_fragment_min_blocking_mode START";
     109            0 :                 last_send_call_reliable_ = false;
     110            0 :                 last_send_timeout_usec_ = send_timeout_usec;
     111              :                 {
     112            0 :                         std::unique_lock<std::mutex> lk(fragment_mutex_);
     113            0 :                         if (current_buffer_size_bytes_ > max_hold_size_bytes_)
     114              :                         {
     115            0 :                                 fragment_cv_.wait_for(lk, std::chrono::microseconds(send_timeout_usec), [&] { return current_buffer_size_bytes_ < max_hold_size_bytes_; });
     116              :                         }
     117              : 
     118            0 :                         if (current_buffer_size_bytes_ > max_hold_size_bytes_)
     119              :                         {
     120            0 :                                 TLOG(TLVL_WARNING) << GetTraceName() << "Dropping data due to timeout in min_blocking_mode";
     121            0 :                                 return CopyStatus::kTimeout;
     122              :                         }
     123              : 
     124            0 :                         TLOG(TLVL_DEBUG + 35) << GetTraceName() << "transfer_fragment_min_blocking_mode after wait for buffer";
     125              :                         // Always send along Broadcast Fragments immediately
     126            0 :                         if (Fragment::isBroadcastFragmentType(fragment.type()))
     127              :                         {
     128            0 :                                 system_fragment_cached_ = true;
     129              :                         }
     130              : 
     131            0 :                         current_buffer_size_bytes_ += fragment.sizeBytes();
     132              :                         // Eww, we have to copy
     133            0 :                         fragment_buffer_.emplace_back(fragment);
     134            0 :                 }
     135            0 :                 TLOG(TLVL_DEBUG + 35) << GetTraceName() << "transfer_fragment_min_blocking_mode END";
     136            0 :                 return last_copy_status_;  // Might be a lie, but we're going to send from the thread proc
     137              :         }
     138              : 
     139              :         /**
     140              :          * \brief Send a Fragment in reliable mode, using the underlying transfer plugin
     141              :          * \param fragment The Fragment to send
     142              :          * \return A TransferInterface::CopyStatus result variable
     143              :          */
     144            0 :         CopyStatus transfer_fragment_reliable_mode(artdaq::Fragment&& fragment) override
     145              :         {
     146            0 :                 TLOG(TLVL_DEBUG + 36) << GetTraceName() << "transfer_fragment_reliable_mode START";
     147            0 :                 last_send_call_reliable_ = true;
     148              :                 {
     149            0 :                         std::unique_lock<std::mutex> lk(fragment_mutex_);
     150            0 :                         while (current_buffer_size_bytes_ > max_hold_size_bytes_)
     151              :                         {
     152            0 :                                 fragment_cv_.wait(lk, [&] { return current_buffer_size_bytes_ < max_hold_size_bytes_; });
     153              :                         }
     154              : 
     155            0 :                         TLOG(TLVL_DEBUG + 36) << GetTraceName() << "transfer_fragment_reliable_mode after wait for buffer";
     156              : 
     157              :                         // Always send along Broadcast Fragments immediately
     158            0 :                         if (Fragment::isBroadcastFragmentType(fragment.type()))
     159              :                         {
     160            0 :                                 system_fragment_cached_ = true;
     161              :                         }
     162              : 
     163            0 :                         current_buffer_size_bytes_ += fragment.sizeBytes();
     164            0 :                         fragment_buffer_.emplace_back(std::move(fragment));
     165            0 :                 }
     166            0 :                 TLOG(TLVL_DEBUG + 36) << GetTraceName() << "transfer_fragment_reliable_mode END";
     167            0 :                 return last_copy_status_;  // Might be a lie, but we're going to send from the thread proc
     168              :         }
     169              : 
     170              :         /**
     171              :          * \brief Determine whether the TransferInterface plugin is able to send/receive data
     172              :          * \return True if the TransferInterface plugin is currently able to send/receive data
     173              :          */
     174            0 :         bool isRunning() override { return running_; }
     175              : 
     176              :         /**
     177              :          * \brief Flush any in-flight data. This should be used by the receiver after the receive loop has
     178              :          * ended.
     179              :          */
     180            0 :         void flush_buffers() override { theTransfer_->flush_buffers(); }
     181              : 
     182              : private:
     183              :         BundleTransfer(BundleTransfer const&) = delete;
     184              :         BundleTransfer(BundleTransfer&&) = delete;
     185              :         BundleTransfer& operator=(BundleTransfer const&) = delete;
     186              :         BundleTransfer& operator=(BundleTransfer&&) = delete;
     187              : 
     188              : private:
     189              :         std::unique_ptr<TransferInterface> theTransfer_;
     190              :         size_t send_threshold_bytes_;
     191              :         size_t max_hold_size_bytes_;
     192              :         int max_hold_time_us_;
     193              :         FragmentPtr bundle_fragment_{nullptr};
     194              :         Fragments fragment_buffer_;
     195              :         size_t current_block_index_{0};
     196              :         int current_rank_ = 0;
     197              :         CopyStatus last_copy_status_{CopyStatus::kSuccess};
     198              : 
     199              :         std::chrono::steady_clock::time_point send_fragment_started_;
     200              :         std::atomic<size_t> current_buffer_size_bytes_{0};
     201              :         std::unique_ptr<boost::thread> send_timeout_thread_;
     202              :         std::atomic<bool> system_fragment_cached_{false};
     203              :         std::atomic<bool> send_timeout_thread_running_{false};
     204              :         std::atomic<bool> last_send_call_reliable_{true};
     205              :         std::atomic<size_t> last_send_timeout_usec_{1000000};
     206              :         std::atomic<bool> running_{true};
     207              :         std::mutex fragment_mutex_;
     208              :         std::condition_variable fragment_cv_;
     209              : 
     210              :         bool check_send_(bool force);
     211              :         void start_timeout_thread_();
     212              :         void send_timeout_thread_proc_();
     213              :         bool send_bundle_fragment_(bool forceSend = false);
     214              :         void receive_bundle_fragment_(size_t receiveTimeout);
     215              : };
     216              : }  // namespace artdaq
     217              : 
     218            0 : artdaq::BundleTransfer::BundleTransfer(const fhicl::ParameterSet& pset, Role role)
     219              :     : TransferInterface(pset, role)
     220            0 :     , send_threshold_bytes_(pset.get<size_t>("send_threshold_bytes", 10 * 0x100000))  // 10 MB
     221            0 :     , max_hold_size_bytes_(pset.get<size_t>("max_hold_size_bytes", 1000 * 0x100000))  // 1000 MB
     222            0 :     , max_hold_time_us_(pset.get<int>("max_hold_time_us", 100000))                    // 0.1 s
     223              : {
     224            0 :         TLOG(TLVL_INFO) << GetTraceName() << "Begin BundleTransfer constructor";
     225            0 :         TLOG(TLVL_INFO) << GetTraceName() << "Constructing TCPSocketTransfer";
     226            0 :         theTransfer_ = std::make_unique<TCPSocketTransfer>(pset, role);
     227              : 
     228            0 :         if (role == Role::kSend)
     229              :         {
     230            0 :                 start_timeout_thread_();
     231              :         }
     232            0 : }
     233              : 
     234            0 : artdaq::BundleTransfer::~BundleTransfer()
     235              : {
     236            0 :         if (role_ == Role::kSend)
     237              :         {
     238            0 :                 send_timeout_thread_running_ = false;
     239            0 :                 if (send_timeout_thread_ && send_timeout_thread_->joinable())
     240              :                 {
     241            0 :                         send_timeout_thread_->join();
     242              :                 }
     243            0 :                 send_bundle_fragment_(true);
     244              :         }
     245            0 :         running_ = false;
     246            0 : }
     247              : 
     248            0 : void artdaq::BundleTransfer::start_timeout_thread_()
     249              : {
     250            0 :         if (send_timeout_thread_ && send_timeout_thread_->joinable())
     251              :         {
     252            0 :                 send_timeout_thread_->join();
     253              :         }
     254            0 :         send_timeout_thread_running_ = true;
     255            0 :         TLOG(TLVL_INFO) << GetTraceName() << "Starting Send Timeout Thread";
     256              : 
     257              :         try
     258              :         {
     259            0 :                 send_timeout_thread_ = std::make_unique<boost::thread>(&BundleTransfer::send_timeout_thread_proc_, this);
     260              :                 char tname[16];                                            // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
     261            0 :                 snprintf(tname, sizeof(tname) - 1, "%d-SNDTMO", my_rank);  // NOLINT
     262            0 :                 tname[sizeof(tname) - 1] = '\0';                           // assure term. snprintf is not too evil :)
     263            0 :                 auto handle = send_timeout_thread_->native_handle();
     264            0 :                 pthread_setname_np(handle, tname);
     265              :         }
     266            0 :         catch (const boost::exception& e)
     267              :         {
     268            0 :                 TLOG(TLVL_ERROR) << GetTraceName() << "Caught boost::exception starting Send Timeout thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
     269            0 :                 std::cerr << GetTraceName() << "Caught boost::exception starting Send Timeout thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
     270            0 :                 exit(5);
     271            0 :         }
     272            0 : }
     273              : 
     274            0 : void artdaq::BundleTransfer::send_timeout_thread_proc_()
     275              : {
     276            0 :         while (send_timeout_thread_running_)
     277              :         {
     278            0 :                 if (!send_bundle_fragment_())
     279              :                 {
     280            0 :                         usleep(5000);
     281              :                 }
     282              :         }
     283            0 : }
     284              : 
     285            0 : bool artdaq::BundleTransfer::check_send_(bool force)
     286              : {
     287            0 :         if (force)
     288              :         {
     289            0 :                 TLOG(TLVL_DEBUG + 37) << GetTraceName() << "check_send_: Send is forced, returning true";
     290            0 :                 return true;
     291              :         }
     292              : 
     293            0 :         if (system_fragment_cached_.load())
     294              :         {
     295            0 :                 TLOG(TLVL_DEBUG + 37) << GetTraceName() << "check_send_: System Fragment in cache, returning true";
     296            0 :                 return true;
     297              :         }
     298              : 
     299            0 :         if (std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - send_fragment_started_).count() >= max_hold_time_us_)
     300              :         {
     301            0 :                 TLOG(TLVL_DEBUG + 37) << GetTraceName() << "check_send_: Send timeout reached, returning true";
     302            0 :                 return true;
     303              :         }
     304              : 
     305            0 :         if (current_buffer_size_bytes_ >= send_threshold_bytes_)
     306              :         {
     307            0 :                 TLOG(TLVL_DEBUG + 37) << GetTraceName() << "check_send_: Buffer is full, returning true";
     308            0 :                 return true;
     309              :         }
     310              : 
     311            0 :         TLOG(TLVL_DEBUG + 37) << GetTraceName() << "check_send_: returning false";
     312            0 :         return false;
     313              : }
     314              : 
     315            0 : bool artdaq::BundleTransfer::send_bundle_fragment_(bool forceSend)
     316              : {
     317              :         {
     318            0 :                 std::unique_lock<std::mutex> lk(fragment_mutex_);
     319              : 
     320            0 :                 bool send_fragment = check_send_(forceSend);
     321              : 
     322            0 :                 if (send_fragment && fragment_buffer_.size() > 0)
     323              :                 {
     324            0 :                         TLOG(TLVL_DEBUG + 38) << GetTraceName() << "Swapping in new buffer";
     325            0 :                         Fragments temp_buffer;
     326            0 :                         size_t size = current_buffer_size_bytes_;
     327            0 :                         fragment_buffer_.swap(temp_buffer);
     328            0 :                         send_fragment_started_ = std::chrono::steady_clock::now();
     329            0 :                         system_fragment_cached_ = false;
     330            0 :                         current_buffer_size_bytes_ = 0;
     331            0 :                         lk.unlock();
     332            0 :                         TLOG(TLVL_DEBUG + 38) << GetTraceName() << "Notifying waiters";
     333            0 :                         fragment_cv_.notify_one();
     334              : 
     335            0 :                         TLOG(TLVL_DEBUG + 38) << GetTraceName() << "Setting up Bundle Fragment";
     336            0 :                         bundle_fragment_.reset(new artdaq::Fragment(temp_buffer.front().sequenceID() + 1, temp_buffer.front().fragmentID()));
     337            0 :                         bundle_fragment_->setTimestamp(temp_buffer.front().timestamp());
     338            0 :                         bundle_fragment_->reserve(size / sizeof(artdaq::RawDataType));
     339              : 
     340            0 :                         TLOG(TLVL_DEBUG + 38) << GetTraceName() << "Filling Bundle Fragment, sz = " << temp_buffer.size();
     341            0 :                         ContainerFragmentLoader container_fragment(*bundle_fragment_);
     342            0 :                         container_fragment.set_missing_data(false);  // Buffer mode is never missing data, even if there IS no data.
     343            0 :                         container_fragment.addFragments(temp_buffer, true);
     344            0 :                         temp_buffer.clear();
     345              : 
     346            0 :                         TLOG(TLVL_DEBUG + 38) << GetTraceName() << "Sending Fragment, reliable mode " << last_send_call_reliable_.load();
     347            0 :                         CopyStatus sts = CopyStatus::kSuccess;
     348            0 :                         if (last_send_call_reliable_)
     349              :                         {
     350            0 :                                 sts = theTransfer_->transfer_fragment_reliable_mode(std::move(*bundle_fragment_.get()));
     351            0 :                                 bundle_fragment_.reset(nullptr);
     352              :                         }
     353              :                         else
     354              :                         {
     355            0 :                                 while (sts != CopyStatus::kSuccess && send_timeout_thread_running_)
     356              :                                 {
     357            0 :                                         sts = theTransfer_->transfer_fragment_min_blocking_mode(*bundle_fragment_.get(), last_send_timeout_usec_);
     358              :                                 }
     359            0 :                                 bundle_fragment_.reset(nullptr);
     360              :                         }
     361            0 :                         last_copy_status_ = sts;
     362            0 :                         if (sts != CopyStatus::kSuccess)
     363              :                         {
     364            0 :                                 auto sts_string = sts == CopyStatus::kTimeout ? "timeout" : "other error";
     365            0 :                                 TLOG(TLVL_WARNING) << GetTraceName() << "Transfer of Bundle fragment returned status " << sts_string;
     366              :                         }
     367              : 
     368            0 :                         TLOG(TLVL_DEBUG + 38) << GetTraceName() << "Done sending Bundle Fragment";
     369              : 
     370            0 :                         return true;  // Status of actual transfer
     371            0 :                 }
     372            0 :         }
     373            0 :         return false;  // Waiting on more data
     374              : }
     375              : 
     376            0 : void artdaq::BundleTransfer::receive_bundle_fragment_(size_t receiveTimeout)
     377              : {
     378            0 :         std::lock_guard<std::mutex> lk(fragment_mutex_);
     379            0 :         bundle_fragment_.reset(new artdaq::Fragment(1));
     380              : 
     381            0 :         TLOG(TLVL_DEBUG + 34) << GetTraceName() << "Going to receive next bundle fragment";
     382            0 :         current_rank_ = theTransfer_->receiveFragment(*bundle_fragment_, receiveTimeout);
     383            0 :         TLOG(TLVL_DEBUG + 34) << GetTraceName() << "Done with receiveFragment, current_rank_ = " << current_rank_;
     384              : 
     385            0 :         if (current_rank_ < RECV_SUCCESS)
     386              :         {
     387            0 :                 bundle_fragment_.reset(nullptr);
     388              :         }
     389            0 :         current_block_index_ = 0;
     390            0 : }
     391              : 
     392            0 : DEFINE_ARTDAQ_TRANSFER(artdaq::BundleTransfer)
        

Generated by: LCOV version 2.0-1