LCOV - code coverage report
Current view: top level - artdaq/TransferPlugins - Multicast_transfer.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 183 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 31 0

            Line data    Source code
       1              : #include "artdaq/DAQdata/Globals.hh"
       2              : #define TRACE_NAME (app_name + "_MulticastTransfer").c_str()
       3              : 
       4              : #include "artdaq/TransferPlugins/TransferInterface.hh"
       5              : 
       6              : #include "artdaq-core/Data/Fragment.hh"
       7              : #include "artdaq-core/Utilities/ExceptionHandler.hh"
       8              : 
       9              : #include "cetlib_except/exception.h"
      10              : #include "fhiclcpp/ParameterSet.h"
      11              : 
      12              : #include <boost/asio.hpp>
      13              : 
      14              : #include <bitset>
      15              : #include <cassert>
      16              : #include <iostream>
      17              : #include <string>
      18              : #include <type_traits>
      19              : #include <vector>
      20              : 
      21              : #pragma GCC diagnostic push
      22              : #pragma GCC diagnostic ignored "-Wunused-parameter"
      23              : 
      24              : namespace artdaq {
      25              : /**
      26              :  * \brief MulticastTransfer is a TransferInterface implementation plugin that transfers data using Multicast
      27              :  */
      28              : class MulticastTransfer : public TransferInterface
      29              : {
      30              : public:
      31              :         using byte_t = artdaq::Fragment::byte_t;  ///< Copy Fragment::byte_t into local scope
      32              : 
      33              :         /**
      34              :          * \brief Default destructor
      35              :          */
      36            0 :         ~MulticastTransfer() override = default;
      37              : 
      38              :         /**
      39              :          * \brief MulticastTransfer Constructor
      40              :          * \param ps ParameterSet used to configure MulticastTransfer
      41              :          * \param role Role of this MulticastTransfer instance (kSend or kReceive)
      42              :          *
      43              :          * \verbatim
      44              :          * MulticastTransfer accepts the following Parameters:
      45              :          * "subfragment_size" (REQUIRED): Size of the sub-Fragments
      46              :          * "subfragments_per_send" (REQUIRED): How many sub-Fragments to send in each batch
      47              :          * "pause_on_copy_usecs" (Default: 0): Pause after sending a batch of sub-Fragments for this many microseconds
      48              :          * "multicast_port" (REQUIRED): Port number to connect to
      49              :          * "multicast_address" (REQUIRED): Multicast address to send to/receive from
      50              :          * "local_address" (REQUIRED): Local origination address for multicast
      51              :          * "receive_buffer_size" (Default: 0): The UDP receive buffer size. 0 uses automatic size.
      52              :          * \endverbatim
      53              :          * MulticastTransfer also requires all Parameters for configuring a TransferInterface
      54              :          */
      55              :         MulticastTransfer(fhicl::ParameterSet const& ps, Role role);
      56              : 
      57              :         /**
      58              :          * \brief Receive a Fragment using Multicast
      59              :          * \param[out] fragment Received Fragment
      60              :          * \param receiveTimeout Timeout for receive, in microseconds
      61              :          * \return Rank of sender or RECV_TIMEOUT
      62              :          */
      63              :         int receiveFragment(artdaq::Fragment& fragment,
      64              :                             size_t receiveTimeout) override;
      65              : 
      66              :         /**
      67              :          * \brief Receive a Fragment Header from the transport mechanism
      68              :          * \param[out] header Received Fragment Header
      69              :          * \param receiveTimeout Timeout for receive
      70              :          * \return The rank the Fragment was received from (should be source_rank), or RECV_TIMEOUT
      71              :          */
      72              :         int receiveFragmentHeader(detail::RawFragmentHeader& header, size_t receiveTimeout) override;
      73              : 
      74              :         /**
      75              :          * \brief Receive the body of a Fragment to the given destination pointer
      76              :          * \param destination Pointer to memory region where Fragment data should be stored
      77              :          * \param wordCount Number of words of Fragment data to receive
      78              :          * \return The rank the Fragment was received from (should be source_rank), or RECV_TIMEOUT
      79              :          */
      80              :         int receiveFragmentData(RawDataType* destination, size_t wordCount) override;
      81              : 
      82              :         /**
      83              :          * \brief Copy a Fragment to the destination. Multicast is always unreliable
      84              :          * \param fragment Fragment to copy
      85              :          * \param send_timeout_usec How long to try to send before discarding data
      86              :          * \return CopyStatus detailing result of copy
      87              :          */
      88              :         CopyStatus transfer_fragment_min_blocking_mode(artdaq::Fragment const& fragment, size_t send_timeout_usec) override;
      89              : 
      90              :         /**
      91              :          * \brief Move a Fragment to the destination. Multicast is always unreliable
      92              :          * \param fragment Fragment to move
      93              :          * \return CopyStatus detailing result of copy
      94              :          */
      95              :         CopyStatus transfer_fragment_reliable_mode(artdaq::Fragment&& fragment) override;
      96              : 
      97              :         /**
      98              :          * \brief Determine whether the TransferInterface plugin is able to send/receive data
      99              :          * \return True if the TransferInterface plugin is currently able to send/receive data
     100              :          */
     101            0 :         bool isRunning() override { return socket_ != nullptr; }
     102              : 
     103              :         /**
     104              :          * \brief Flush any in-flight data. This should be used by the receiver after the receive loop has
     105              :          * ended.
     106              :          */
     107            0 :         void flush_buffers() override {}
     108              : 
     109              : private:
     110              :         MulticastTransfer(MulticastTransfer const&) = delete;
     111              :         MulticastTransfer(MulticastTransfer&&) = delete;
     112              :         MulticastTransfer& operator=(MulticastTransfer const&) = delete;
     113              :         MulticastTransfer& operator=(MulticastTransfer&&) = delete;
     114              : 
     115              :         void fill_staging_memory(const artdaq::Fragment& frag);
     116              : 
     117              :         template<typename T>
     118              :         void book_container_of_buffers(std::vector<T>& buffers,
     119              :                                        size_t fragment_size,
     120              :                                        size_t total_subfragments,
     121              :                                        size_t first_subfragment_num,
     122              :                                        size_t last_subfragment_num);
     123              : 
     124              :         void get_fragment_quantities(const boost::asio::mutable_buffer& buf, size_t& payload_size, size_t& fragment_size,
     125              :                                      size_t& expected_subfragments);
     126              : 
     127              :         void set_receive_buffer_size(size_t recv_buff_size);
     128              : 
     129              :         class subfragment_identifier
     130              :         {
     131              :         public:
     132            0 :                 subfragment_identifier(size_t sequenceID, size_t fragmentID, size_t subfragment_number)
     133            0 :                     : sequenceID_(sequenceID)
     134            0 :                     , fragmentID_(fragmentID)
     135            0 :                     , subfragment_number_(subfragment_number) {}
     136              : 
     137              :                 size_t sequenceID() const { return sequenceID_; }
     138              :                 size_t fragmentID() const { return fragmentID_; }
     139              :                 size_t subfragment_number() const { return subfragment_number_; }
     140              : 
     141              :         private:
     142              :                 size_t sequenceID_;
     143              :                 size_t fragmentID_;
     144              :                 size_t subfragment_number_;
     145              :         };
     146              : 
     147              :         std::unique_ptr<boost::asio::io_service> io_service_;
     148              : 
     149              :         std::unique_ptr<boost::asio::ip::udp::endpoint> local_endpoint_;
     150              :         std::unique_ptr<boost::asio::ip::udp::endpoint> multicast_endpoint_;
     151              :         std::unique_ptr<boost::asio::ip::udp::endpoint> opposite_endpoint_;
     152              : 
     153              :         std::unique_ptr<boost::asio::ip::udp::socket> socket_;
     154              : 
     155              :         size_t subfragment_size_;
     156              :         size_t subfragments_per_send_;
     157              : 
     158              :         size_t pause_on_copy_usecs_;
     159              :         Fragment fragment_buffer_;
     160              : 
     161              :         std::vector<byte_t> staging_memory_;
     162              : 
     163              :         std::vector<boost::asio::mutable_buffer> receive_buffers_;
     164              : };
     165              : }  // namespace artdaq
     166              : 
     167            0 : artdaq::MulticastTransfer::MulticastTransfer(fhicl::ParameterSet const& pset, Role role)
     168              :     : TransferInterface(pset, role)
     169            0 :     , io_service_(std::make_unique<std::remove_reference<decltype(*io_service_)>::type>())
     170            0 :     , local_endpoint_(nullptr)
     171            0 :     , multicast_endpoint_(nullptr)
     172            0 :     , opposite_endpoint_(std::make_unique<std::remove_reference<decltype(*opposite_endpoint_)>::type>())
     173            0 :     , socket_(nullptr)
     174            0 :     , subfragment_size_(pset.get<size_t>("subfragment_size"))
     175            0 :     , subfragments_per_send_(pset.get<size_t>("subfragments_per_send"))
     176            0 :     , pause_on_copy_usecs_(pset.get<size_t>("pause_on_copy_usecs", 0))
     177              : {
     178              :         try
     179              :         {
     180            0 :                 portMan->UpdateConfiguration(pset);
     181            0 :                 auto port = portMan->GetMulticastTransferPort(source_rank());
     182            0 :                 auto multicast_address = boost::asio::ip::address::from_string(portMan->GetMulticastTransferGroupAddress());
     183            0 :                 auto local_address = boost::asio::ip::address::from_string(pset.get<std::string>("local_address"));
     184              : 
     185            0 :                 TLOG(TLVL_DEBUG + 32) << GetTraceName() << "multicast address is set to " << multicast_address;
     186            0 :                 TLOG(TLVL_DEBUG + 32) << GetTraceName() << "local address is set to " << local_address;
     187              : 
     188            0 :                 if (TransferInterface::role() == Role::kSend)
     189              :                 {
     190            0 :                         local_endpoint_ = std::make_unique<std::remove_reference<decltype(*local_endpoint_)>::type>(local_address, 0);
     191            0 :                         multicast_endpoint_ = std::make_unique<std::remove_reference<decltype(*multicast_endpoint_)>::type>(multicast_address, port);
     192              : 
     193            0 :                         socket_ = std::make_unique<std::remove_reference<decltype(*socket_)>::type>(*io_service_,
     194            0 :                                                                                                     multicast_endpoint_->protocol());
     195            0 :                         socket_->bind(*local_endpoint_);
     196              :                 }
     197              :                 else
     198              :                 {  // TransferInterface::role() == Role::kReceive
     199              : 
     200              :                         // Create the socket so that multiple may be bound to the same address.
     201              : 
     202            0 :                         local_endpoint_ = std::make_unique<std::remove_reference<decltype(*local_endpoint_)>::type>(local_address, port);
     203            0 :                         socket_ = std::make_unique<std::remove_reference<decltype(*socket_)>::type>(*io_service_,
     204            0 :                                                                                                     local_endpoint_->protocol());
     205              : 
     206            0 :                         boost::system::error_code ec;
     207              : 
     208            0 :                         socket_->set_option(boost::asio::ip::udp::socket::reuse_address(true), ec);
     209              : 
     210            0 :                         if (ec.value() != 0)
     211              :                         {
     212            0 :                                 TLOG(TLVL_ERROR) << "boost::system::error_code with value " << ec << " was found in setting reuse_address option";
     213              :                         }
     214              : 
     215            0 :                         set_receive_buffer_size(pset.get<size_t>("receive_buffer_size", 0));
     216              : 
     217            0 :                         socket_->bind(boost::asio::ip::udp::endpoint(multicast_address, port));
     218              : 
     219              :                         // Join the multicast group.
     220              : 
     221            0 :                         socket_->set_option(boost::asio::ip::multicast::join_group(multicast_address), ec);
     222              : 
     223            0 :                         if (ec.value() != 0)
     224              :                         {
     225            0 :                                 TLOG(TLVL_ERROR) << "boost::system::error_code with value " << ec << " was found in attempt to join multicast group";
     226              :                         }
     227              :                 }
     228              :         }
     229            0 :         catch (...)
     230              :         {
     231            0 :                 ExceptionHandler(ExceptionHandlerRethrow::yes, "Problem setting up the socket in MulticastTransfer");
     232            0 :         }
     233              : 
     234              :         auto max_subfragments =
     235            0 :             static_cast<size_t>(std::ceil(max_fragment_size_words_ / static_cast<float>(subfragment_size_)));
     236              : 
     237            0 :         staging_memory_.resize(max_subfragments * (sizeof(subfragment_identifier) + subfragment_size_));
     238              : 
     239            0 :         if (TransferInterface::role() == Role::kReceive)
     240              :         {
     241            0 :                 book_container_of_buffers(receive_buffers_, max_fragment_size_words_, max_subfragments, 0, max_subfragments - 1);
     242              :         }
     243              : 
     244            0 :         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "max_subfragments is " << max_subfragments;
     245            0 :         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Staging buffer size is " << staging_memory_.size();
     246            0 : }
     247              : 
     248              : #pragma GCC diagnostic push
     249              : #pragma GCC diagnostic ignored "-Wunused-variable"
     250              : 
     251            0 : int artdaq::MulticastTransfer::receiveFragment(artdaq::Fragment& fragment,
     252              :                                                size_t receiveTimeout)
     253              : {
     254            0 :         assert(TransferInterface::role() == Role::kReceive);
     255              : 
     256            0 :         if (fragment.dataSizeBytes() > 0)
     257              :         {
     258            0 :                 throw cet::exception("MulticastTransfer") << "Error in MulticastTransfer::receiveFragmentFrom: "  // NOLINT(cert-err60-cpp)
     259            0 :                                                           << "nonzero payload found in fragment passed as argument";
     260              :         }
     261              : 
     262              :         static bool print_warning = true;
     263              : 
     264            0 :         if (print_warning)
     265              :         {
     266            0 :                 TLOG(TLVL_WARNING) << "Please note that MulticastTransfer::receiveFragmentFrom does not use its receiveTimeout argument";
     267            0 :                 print_warning = false;
     268              :         }
     269              : 
     270            0 :         fragment.resizeBytes(max_fragment_size_words_ - sizeof(artdaq::detail::RawFragmentHeader));
     271              : 
     272              :         static auto current_sequenceID = std::numeric_limits<Fragment::sequence_id_t>::max();
     273              :         static auto current_fragmentID = std::numeric_limits<Fragment::fragment_id_t>::max();
     274              : 
     275            0 :         size_t fragment_size = 0;
     276            0 :         size_t expected_subfragments = 0;
     277            0 :         size_t current_subfragments = 0;
     278            0 :         bool fragment_complete = false;
     279            0 :         bool last_fragment_truncated = false;
     280              : 
     281              :         while (true)
     282              :         {
     283            0 :                 auto bytes_received = socket_->receive_from(receive_buffers_, *opposite_endpoint_);
     284              : 
     285            0 :                 size_t bytes_processed = 0;
     286              : 
     287            0 :                 for (auto& buf : receive_buffers_)
     288              :                 {
     289            0 :                         auto buf_size = boost::asio::buffer_size(buf);
     290            0 :                         auto size_t_ptr = boost::asio::buffer_cast<const size_t*>(buf);
     291            0 :                         auto seqID = *size_t_ptr;
     292            0 :                         auto fragID = *(size_t_ptr + 1);     // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     293            0 :                         auto subfragID = *(size_t_ptr + 2);  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     294              : 
     295            0 :                         if (seqID != current_sequenceID || fragID != current_fragmentID)
     296              :                         {
     297              :                                 // JCF, Jun-22-2016
     298              :                                 // Code currently operates under the assumption that all subfragments from the call are from the same fragment
     299              : 
     300            0 :                                 assert(bytes_processed == 0);
     301              : 
     302            0 :                                 if (current_subfragments < expected_subfragments)
     303              :                                 {
     304            0 :                                         last_fragment_truncated = true;
     305              : 
     306            0 :                                         if (expected_subfragments != std::numeric_limits<size_t>::max())
     307              :                                         {
     308            0 :                                                 TLOG(TLVL_WARNING) << "Warning: only received " << current_subfragments << " subfragments for fragment with seqID = " << current_sequenceID << ", fragID = " << current_fragmentID << " (expected " << expected_subfragments << ")";
     309              :                                         }
     310              :                                         else
     311              :                                         {
     312            0 :                                                 TLOG(TLVL_WARNING) << "Warning: only received " << current_subfragments << " subfragments for fragment with seqID = " << current_sequenceID << ", fragID = " << current_fragmentID << ", # of expected subfragments is unknown as fragment header was not received)";
     313              :                                         }
     314              :                                 }
     315              : 
     316            0 :                                 current_subfragments = 0;
     317            0 :                                 fragment_size = std::numeric_limits<size_t>::max();
     318            0 :                                 expected_subfragments = std::numeric_limits<size_t>::max();
     319            0 :                                 current_sequenceID = seqID;
     320            0 :                                 current_fragmentID = fragID;
     321              :                         }
     322              : 
     323            0 :                         auto ptr_into_fragment = fragment.headerBeginBytes() + subfragID * subfragment_size_;  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     324              : 
     325            0 :                         auto ptr_into_buffer = boost::asio::buffer_cast<const byte_t*>(buf) + sizeof(subfragment_identifier);
     326              : 
     327            0 :                         std::copy(ptr_into_buffer, ptr_into_buffer + buf_size - sizeof(subfragment_identifier), ptr_into_fragment);  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     328              : 
     329            0 :                         if (subfragID == 0)
     330              :                         {
     331            0 :                                 if (buf_size >= sizeof(subfragment_identifier) + sizeof(artdaq::detail::RawFragmentHeader))
     332              :                                 {
     333            0 :                                         auto payload_size = std::numeric_limits<size_t>::max();
     334            0 :                                         get_fragment_quantities(buf, payload_size, fragment_size, expected_subfragments);
     335              : 
     336            0 :                                         fragment.resizeBytes(payload_size);
     337              :                                 }
     338              :                                 else
     339              :                                 {
     340            0 :                                         throw cet::exception("MulticastTransfer") << "Buffer size is too small to completely contain an artdaq::Fragment header; "  // NOLINT(cert-err60-cpp)
     341            0 :                                                                                   << "please increase the default size";
     342              :                                 }
     343              :                         }
     344              : 
     345            0 :                         current_subfragments++;
     346              : 
     347            0 :                         if (current_subfragments == expected_subfragments)
     348              :                         {
     349            0 :                                 fragment_complete = true;
     350              :                         }
     351              : 
     352            0 :                         bytes_processed += buf_size;
     353              : 
     354            0 :                         if (bytes_processed >= bytes_received)
     355              :                         {
     356            0 :                                 break;
     357              :                         }
     358              :                 }
     359              : 
     360            0 :                 if (last_fragment_truncated)
     361              :                 {
     362              :                         // JCF, 7-7-2017
     363              : 
     364              :                         // Don't yet have code to handle the scenario where the set of
     365              :                         // subfragments received in the last iteration of the loop was
     366              :                         // its own complete fragment, but we know the previous fragment
     367              :                         // to be incomplete
     368              : 
     369            0 :                         assert(!fragment_complete);
     370            0 :                         TLOG(TLVL_WARNING) << GetTraceName() << "Got an incomplete fragment";
     371            0 :                         return artdaq::TransferInterface::RECV_TIMEOUT;
     372              :                 }
     373              : 
     374            0 :                 if (fragment_complete)
     375              :                 {
     376            0 :                         return source_rank();
     377              :                 }
     378            0 :         }
     379              : 
     380              :         return TransferInterface::RECV_TIMEOUT;
     381              : }
     382              : 
     383              : #pragma GCC diagnostic pop
     384              : 
     385            0 : int artdaq::MulticastTransfer::receiveFragmentHeader(detail::RawFragmentHeader& header, size_t receiveTimeout)
     386              : {
     387            0 :         auto ret = receiveFragment(fragment_buffer_, receiveTimeout);
     388            0 :         if (ret == source_rank())
     389              :         {
     390            0 :                 header = *reinterpret_cast<detail::RawFragmentHeader*>(fragment_buffer_.headerAddress());  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     391            0 :                 return source_rank();
     392              :         }
     393            0 :         return ret;
     394              : }
     395              : 
     396            0 : int artdaq::MulticastTransfer::receiveFragmentData(RawDataType* destination, size_t wordCount)
     397              : {
     398            0 :         if (fragment_buffer_.size() > detail::RawFragmentHeader::num_words())
     399              :         {
     400            0 :                 auto dataSize = (fragment_buffer_.size() - detail::RawFragmentHeader::num_words()) * sizeof(RawDataType);
     401            0 :                 memcpy(destination, fragment_buffer_.headerAddress() + detail::RawFragmentHeader::num_words(), dataSize);  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     402            0 :                 return source_rank();
     403              :         }
     404            0 :         return RECV_TIMEOUT;
     405              : }
     406              : 
     407              : // Reliable transport is undefined for multicast; just use copy
     408              : artdaq::TransferInterface::CopyStatus
     409            0 : artdaq::MulticastTransfer::transfer_fragment_reliable_mode(artdaq::Fragment&& f)
     410              : {
     411            0 :         return transfer_fragment_min_blocking_mode(f, 100000000);
     412              : }
     413              : 
     414              : artdaq::TransferInterface::CopyStatus
     415            0 : artdaq::MulticastTransfer::transfer_fragment_min_blocking_mode(artdaq::Fragment const& fragment,
     416              :                                                                size_t send_timeout_usec)
     417              : {
     418            0 :         assert(TransferInterface::role() == Role::kSend);
     419              : 
     420            0 :         if (fragment.sizeBytes() > max_fragment_size_words_)
     421              :         {
     422            0 :                 throw cet::exception("MulticastTransfer") << "Error in MulticastTransfer::copyFragmentTo: " << fragment.sizeBytes() << " byte fragment exceeds max_fragment_size of " << max_fragment_size_words_;  // NOLINT(cert-err60-cpp)
     423              :         }
     424              : 
     425            0 :         auto num_subfragments = static_cast<size_t>(std::ceil(fragment.sizeBytes() / static_cast<float>(subfragment_size_)));
     426              : 
     427            0 :         fill_staging_memory(fragment);
     428              : 
     429            0 :         for (size_t batch_index = 0;; batch_index++)
     430              :         {
     431            0 :                 auto first_subfragment = batch_index * subfragments_per_send_;
     432            0 :                 auto last_subfragment = (batch_index + 1) * subfragments_per_send_ >= num_subfragments ? num_subfragments - 1 : (batch_index + 1) * subfragments_per_send_ - 1;
     433              : 
     434            0 :                 std::vector<boost::asio::const_buffer> buffers;
     435              : 
     436            0 :                 book_container_of_buffers(buffers, fragment.sizeBytes(), num_subfragments, first_subfragment, last_subfragment);
     437              : 
     438            0 :                 socket_->send_to(buffers, *multicast_endpoint_);
     439              : 
     440            0 :                 usleep(pause_on_copy_usecs_);
     441              : 
     442            0 :                 if (last_subfragment == num_subfragments - 1)
     443              :                 {
     444            0 :                         break;
     445              :                 }
     446            0 :         }
     447            0 :         return CopyStatus::kSuccess;
     448              : }
     449              : 
     450              : #pragma GCC diagnostic push
     451              : #pragma GCC diagnostic ignored "-Wsign-compare"
     452              : 
     453            0 : void artdaq::MulticastTransfer::fill_staging_memory(const artdaq::Fragment& fragment)
     454              : {
     455            0 :         auto num_subfragments = static_cast<size_t>(std::ceil(fragment.sizeBytes() / static_cast<float>(subfragment_size_)));
     456            0 :         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "# of subfragments to use is " << num_subfragments;
     457              : 
     458            0 :         for (auto i_s = 0; i_s < num_subfragments; ++i_s)
     459              :         {
     460            0 :                 auto staging_memory_copyto = &staging_memory_.at(i_s * (sizeof(subfragment_identifier) + subfragment_size_));
     461              : 
     462            0 :                 subfragment_identifier sfi(fragment.sequenceID(), fragment.fragmentID(), i_s);
     463              : 
     464            0 :                 std::copy(reinterpret_cast<byte_t*>(&sfi),                                   // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     465              :                           reinterpret_cast<byte_t*>(&sfi) + sizeof(subfragment_identifier),  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic,cppcoreguidelines-pro-type-reinterpret-cast)
     466              :                           staging_memory_copyto);
     467              : 
     468            0 :                 auto low_ptr_into_fragment = fragment.headerBeginBytes() + subfragment_size_ * i_s;  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     469              : 
     470            0 :                 auto high_ptr_into_fragment = (i_s == num_subfragments - 1) ? fragment.dataEndBytes() : fragment.headerBeginBytes() + subfragment_size_ * (i_s + 1);  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     471              : 
     472            0 :                 std::copy(low_ptr_into_fragment,
     473              :                           high_ptr_into_fragment,
     474              :                           staging_memory_copyto + sizeof(subfragment_identifier));  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     475              :         }
     476            0 : }
     477              : 
     478              : #pragma GCC diagnostic pop
     479              : 
     480              : // Note that book_container_of_buffers includes, rather than excludes,
     481              : // "last_subfragment_num"; in this regard it's different than the way
     482              : // STL functions receive iterators. Note also that the lowest possible
     483              : // value for "first_subfragment_num" is 0, not 1.
     484              : 
     485              : template<typename T>
     486            0 : void artdaq::MulticastTransfer::book_container_of_buffers(std::vector<T>& buffers,
     487              :                                                           const size_t fragment_size,
     488              :                                                           const size_t total_subfragments,
     489              :                                                           const size_t first_subfragment_num,
     490              :                                                           const size_t last_subfragment_num)
     491              : {
     492            0 :         assert(staging_memory_.size() >= total_subfragments * (sizeof(subfragment_identifier) + subfragment_size_));
     493            0 :         assert(buffers.empty());
     494            0 :         assert(last_subfragment_num < total_subfragments);
     495              : 
     496            0 :         for (auto i_f = first_subfragment_num; i_f <= last_subfragment_num; ++i_f)
     497              :         {
     498            0 :                 auto bytes_to_store = (i_f == total_subfragments - 1) ? sizeof(subfragment_identifier) + (fragment_size - (total_subfragments - 1) * subfragment_size_) : sizeof(subfragment_identifier) + subfragment_size_;
     499              : 
     500            0 :                 buffers.emplace_back(&staging_memory_.at(i_f * (sizeof(subfragment_identifier) + subfragment_size_)),
     501              :                                      bytes_to_store);
     502              :         }
     503            0 : }
     504              : 
     505              : #pragma GCC diagnostic push  // Needed since profile builds will ignore the assert
     506              : #pragma GCC diagnostic ignored "-Wunused-variable"
     507              : 
     508            0 : void artdaq::MulticastTransfer::get_fragment_quantities(const boost::asio::mutable_buffer& buf, size_t& payload_size,
     509              :                                                         size_t& fragment_size,
     510              :                                                         size_t& expected_subfragments)
     511              : {
     512            0 :         auto* buffer_ptr = boost::asio::buffer_cast<byte_t*>(buf);
     513              : 
     514            0 :         auto subfragment_num = *(reinterpret_cast<size_t*>(buffer_ptr) + 2);  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast,cppcoreguidelines-pro-bounds-pointer-arithmetic)
     515              : 
     516            0 :         assert(subfragment_num == 0);
     517              : 
     518            0 :         auto* header =
     519              :             reinterpret_cast<artdaq::detail::RawFragmentHeader*>(buffer_ptr + sizeof(subfragment_identifier));  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast,cppcoreguidelines-pro-bounds-pointer-arithmetic)
     520              : 
     521            0 :         fragment_size = header->word_count * sizeof(artdaq::RawDataType);
     522              : 
     523            0 :         auto metadata_size = header->metadata_word_count * sizeof(artdaq::RawDataType);
     524            0 :         payload_size = fragment_size - metadata_size - artdaq::detail::RawFragmentHeader::num_words() * sizeof(artdaq::RawDataType);
     525              : 
     526            0 :         assert(fragment_size ==
     527              :                artdaq::detail::RawFragmentHeader::num_words() * sizeof(artdaq::RawDataType) +
     528              :                    metadata_size +
     529              :                    payload_size);
     530              : 
     531            0 :         expected_subfragments = static_cast<size_t>(std::ceil(fragment_size / static_cast<float>(subfragment_size_)));
     532            0 : }
     533              : #pragma GCC diagnostic pop
     534              : 
     535            0 : void artdaq::MulticastTransfer::set_receive_buffer_size(size_t recv_buff_size)
     536              : {
     537            0 :         if (recv_buff_size == 0)
     538              :         {
     539            0 :                 return;
     540              :         }
     541            0 :         boost::asio::socket_base::receive_buffer_size actual_recv_buff_size;
     542            0 :         socket_->get_option(actual_recv_buff_size);
     543              : 
     544            0 :         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Receive buffer size is currently " << actual_recv_buff_size.value() << " bytes, will try to change it to " << recv_buff_size;
     545              : 
     546            0 :         boost::asio::socket_base::receive_buffer_size recv_buff_option(recv_buff_size);
     547              : 
     548            0 :         boost::system::error_code ec;
     549            0 :         socket_->set_option(recv_buff_option, ec);
     550              : 
     551            0 :         if (ec.value() != 0)
     552              :         {
     553            0 :                 TLOG(TLVL_ERROR) << "boost::system::error_code with value " << ec << " was found in attempt to change receive buffer";
     554            0 :                 std::cerr << "boost::system::error_code with value " << ec << " was found in attempt to change receive buffer" << std::endl;
     555              :         }
     556              : 
     557            0 :         socket_->get_option(actual_recv_buff_size);
     558            0 :         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "After attempted change, receive buffer size is now " << actual_recv_buff_size.value();
     559              : }
     560              : 
     561              : #pragma GCC diagnostic pop
     562              : 
     563            0 : DEFINE_ARTDAQ_TRANSFER(artdaq::MulticastTransfer)
        

Generated by: LCOV version 2.0-1