Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh"
2 : #define TRACE_NAME (app_name + "_MulticastTransfer").c_str()
3 :
4 : #include "artdaq/TransferPlugins/TransferInterface.hh"
5 :
6 : #include "artdaq-core/Data/Fragment.hh"
7 : #include "artdaq-core/Utilities/ExceptionHandler.hh"
8 :
9 : #include "cetlib_except/exception.h"
10 : #include "fhiclcpp/ParameterSet.h"
11 :
12 : #include <boost/asio.hpp>
13 :
14 : #include <bitset>
15 : #include <cassert>
16 : #include <iostream>
17 : #include <string>
18 : #include <type_traits>
19 : #include <vector>
20 :
21 : #pragma GCC diagnostic push
22 : #pragma GCC diagnostic ignored "-Wunused-parameter"
23 :
24 : namespace artdaq {
25 : /**
26 : * \brief MulticastTransfer is a TransferInterface implementation plugin that transfers data using Multicast
27 : */
28 : class MulticastTransfer : public TransferInterface
29 : {
30 : public:
31 : using byte_t = artdaq::Fragment::byte_t; ///< Copy Fragment::byte_t into local scope
32 :
33 : /**
34 : * \brief Default destructor
35 : */
36 0 : ~MulticastTransfer() override = default;
37 :
38 : /**
39 : * \brief MulticastTransfer Constructor
40 : * \param ps ParameterSet used to configure MulticastTransfer
41 : * \param role Role of this MulticastTransfer instance (kSend or kReceive)
42 : *
43 : * \verbatim
44 : * MulticastTransfer accepts the following Parameters:
45 : * "subfragment_size" (REQUIRED): Size of the sub-Fragments
46 : * "subfragments_per_send" (REQUIRED): How many sub-Fragments to send in each batch
47 : * "pause_on_copy_usecs" (Default: 0): Pause after sending a batch of sub-Fragments for this many microseconds
48 : * "multicast_port" (REQUIRED): Port number to connect to
49 : * "multicast_address" (REQUIRED): Multicast address to send to/receive from
50 : * "local_address" (REQUIRED): Local origination address for multicast
51 : * "receive_buffer_size" (Default: 0): The UDP receive buffer size. 0 uses automatic size.
52 : * \endverbatim
53 : * MulticastTransfer also requires all Parameters for configuring a TransferInterface
54 : */
55 : MulticastTransfer(fhicl::ParameterSet const& ps, Role role);
56 :
57 : /**
58 : * \brief Receive a Fragment using Multicast
59 : * \param[out] fragment Received Fragment
60 : * \param receiveTimeout Timeout for receive, in microseconds
61 : * \return Rank of sender or RECV_TIMEOUT
62 : */
63 : int receiveFragment(artdaq::Fragment& fragment,
64 : size_t receiveTimeout) override;
65 :
66 : /**
67 : * \brief Receive a Fragment Header from the transport mechanism
68 : * \param[out] header Received Fragment Header
69 : * \param receiveTimeout Timeout for receive
70 : * \return The rank the Fragment was received from (should be source_rank), or RECV_TIMEOUT
71 : */
72 : int receiveFragmentHeader(detail::RawFragmentHeader& header, size_t receiveTimeout) override;
73 :
74 : /**
75 : * \brief Receive the body of a Fragment to the given destination pointer
76 : * \param destination Pointer to memory region where Fragment data should be stored
77 : * \param wordCount Number of words of Fragment data to receive
78 : * \return The rank the Fragment was received from (should be source_rank), or RECV_TIMEOUT
79 : */
80 : int receiveFragmentData(RawDataType* destination, size_t wordCount) override;
81 :
82 : /**
83 : * \brief Copy a Fragment to the destination. Multicast is always unreliable
84 : * \param fragment Fragment to copy
85 : * \param send_timeout_usec How long to try to send before discarding data
86 : * \return CopyStatus detailing result of copy
87 : */
88 : CopyStatus transfer_fragment_min_blocking_mode(artdaq::Fragment const& fragment, size_t send_timeout_usec) override;
89 :
90 : /**
91 : * \brief Move a Fragment to the destination. Multicast is always unreliable
92 : * \param fragment Fragment to move
93 : * \return CopyStatus detailing result of copy
94 : */
95 : CopyStatus transfer_fragment_reliable_mode(artdaq::Fragment&& fragment) override;
96 :
97 : /**
98 : * \brief Determine whether the TransferInterface plugin is able to send/receive data
99 : * \return True if the TransferInterface plugin is currently able to send/receive data
100 : */
101 0 : bool isRunning() override { return socket_ != nullptr; }
102 :
103 : /**
104 : * \brief Flush any in-flight data. This should be used by the receiver after the receive loop has
105 : * ended.
106 : */
107 0 : void flush_buffers() override {}
108 :
109 : private:
110 : MulticastTransfer(MulticastTransfer const&) = delete;
111 : MulticastTransfer(MulticastTransfer&&) = delete;
112 : MulticastTransfer& operator=(MulticastTransfer const&) = delete;
113 : MulticastTransfer& operator=(MulticastTransfer&&) = delete;
114 :
115 : void fill_staging_memory(const artdaq::Fragment& frag);
116 :
117 : template<typename T>
118 : void book_container_of_buffers(std::vector<T>& buffers,
119 : size_t fragment_size,
120 : size_t total_subfragments,
121 : size_t first_subfragment_num,
122 : size_t last_subfragment_num);
123 :
124 : void get_fragment_quantities(const boost::asio::mutable_buffer& buf, size_t& payload_size, size_t& fragment_size,
125 : size_t& expected_subfragments);
126 :
127 : void set_receive_buffer_size(size_t recv_buff_size);
128 :
129 : class subfragment_identifier
130 : {
131 : public:
132 0 : subfragment_identifier(size_t sequenceID, size_t fragmentID, size_t subfragment_number)
133 0 : : sequenceID_(sequenceID)
134 0 : , fragmentID_(fragmentID)
135 0 : , subfragment_number_(subfragment_number) {}
136 :
137 : size_t sequenceID() const { return sequenceID_; }
138 : size_t fragmentID() const { return fragmentID_; }
139 : size_t subfragment_number() const { return subfragment_number_; }
140 :
141 : private:
142 : size_t sequenceID_;
143 : size_t fragmentID_;
144 : size_t subfragment_number_;
145 : };
146 :
147 : std::unique_ptr<boost::asio::io_service> io_service_;
148 :
149 : std::unique_ptr<boost::asio::ip::udp::endpoint> local_endpoint_;
150 : std::unique_ptr<boost::asio::ip::udp::endpoint> multicast_endpoint_;
151 : std::unique_ptr<boost::asio::ip::udp::endpoint> opposite_endpoint_;
152 :
153 : std::unique_ptr<boost::asio::ip::udp::socket> socket_;
154 :
155 : size_t subfragment_size_;
156 : size_t subfragments_per_send_;
157 :
158 : size_t pause_on_copy_usecs_;
159 : Fragment fragment_buffer_;
160 :
161 : std::vector<byte_t> staging_memory_;
162 :
163 : std::vector<boost::asio::mutable_buffer> receive_buffers_;
164 : };
165 : } // namespace artdaq
166 :
167 0 : artdaq::MulticastTransfer::MulticastTransfer(fhicl::ParameterSet const& pset, Role role)
168 : : TransferInterface(pset, role)
169 0 : , io_service_(std::make_unique<std::remove_reference<decltype(*io_service_)>::type>())
170 0 : , local_endpoint_(nullptr)
171 0 : , multicast_endpoint_(nullptr)
172 0 : , opposite_endpoint_(std::make_unique<std::remove_reference<decltype(*opposite_endpoint_)>::type>())
173 0 : , socket_(nullptr)
174 0 : , subfragment_size_(pset.get<size_t>("subfragment_size"))
175 0 : , subfragments_per_send_(pset.get<size_t>("subfragments_per_send"))
176 0 : , pause_on_copy_usecs_(pset.get<size_t>("pause_on_copy_usecs", 0))
177 : {
178 : try
179 : {
180 0 : portMan->UpdateConfiguration(pset);
181 0 : auto port = portMan->GetMulticastTransferPort(source_rank());
182 0 : auto multicast_address = boost::asio::ip::address::from_string(portMan->GetMulticastTransferGroupAddress());
183 0 : auto local_address = boost::asio::ip::address::from_string(pset.get<std::string>("local_address"));
184 :
185 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "multicast address is set to " << multicast_address;
186 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "local address is set to " << local_address;
187 :
188 0 : if (TransferInterface::role() == Role::kSend)
189 : {
190 0 : local_endpoint_ = std::make_unique<std::remove_reference<decltype(*local_endpoint_)>::type>(local_address, 0);
191 0 : multicast_endpoint_ = std::make_unique<std::remove_reference<decltype(*multicast_endpoint_)>::type>(multicast_address, port);
192 :
193 0 : socket_ = std::make_unique<std::remove_reference<decltype(*socket_)>::type>(*io_service_,
194 0 : multicast_endpoint_->protocol());
195 0 : socket_->bind(*local_endpoint_);
196 : }
197 : else
198 : { // TransferInterface::role() == Role::kReceive
199 :
200 : // Create the socket so that multiple may be bound to the same address.
201 :
202 0 : local_endpoint_ = std::make_unique<std::remove_reference<decltype(*local_endpoint_)>::type>(local_address, port);
203 0 : socket_ = std::make_unique<std::remove_reference<decltype(*socket_)>::type>(*io_service_,
204 0 : local_endpoint_->protocol());
205 :
206 0 : boost::system::error_code ec;
207 :
208 0 : socket_->set_option(boost::asio::ip::udp::socket::reuse_address(true), ec);
209 :
210 0 : if (ec.value() != 0)
211 : {
212 0 : TLOG(TLVL_ERROR) << "boost::system::error_code with value " << ec << " was found in setting reuse_address option";
213 : }
214 :
215 0 : set_receive_buffer_size(pset.get<size_t>("receive_buffer_size", 0));
216 :
217 0 : socket_->bind(boost::asio::ip::udp::endpoint(multicast_address, port));
218 :
219 : // Join the multicast group.
220 :
221 0 : socket_->set_option(boost::asio::ip::multicast::join_group(multicast_address), ec);
222 :
223 0 : if (ec.value() != 0)
224 : {
225 0 : TLOG(TLVL_ERROR) << "boost::system::error_code with value " << ec << " was found in attempt to join multicast group";
226 : }
227 : }
228 : }
229 0 : catch (...)
230 : {
231 0 : ExceptionHandler(ExceptionHandlerRethrow::yes, "Problem setting up the socket in MulticastTransfer");
232 0 : }
233 :
234 : auto max_subfragments =
235 0 : static_cast<size_t>(std::ceil(max_fragment_size_words_ / static_cast<float>(subfragment_size_)));
236 :
237 0 : staging_memory_.resize(max_subfragments * (sizeof(subfragment_identifier) + subfragment_size_));
238 :
239 0 : if (TransferInterface::role() == Role::kReceive)
240 : {
241 0 : book_container_of_buffers(receive_buffers_, max_fragment_size_words_, max_subfragments, 0, max_subfragments - 1);
242 : }
243 :
244 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "max_subfragments is " << max_subfragments;
245 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Staging buffer size is " << staging_memory_.size();
246 0 : }
247 :
248 : #pragma GCC diagnostic push
249 : #pragma GCC diagnostic ignored "-Wunused-variable"
250 :
251 0 : int artdaq::MulticastTransfer::receiveFragment(artdaq::Fragment& fragment,
252 : size_t receiveTimeout)
253 : {
254 0 : assert(TransferInterface::role() == Role::kReceive);
255 :
256 0 : if (fragment.dataSizeBytes() > 0)
257 : {
258 0 : throw cet::exception("MulticastTransfer") << "Error in MulticastTransfer::receiveFragmentFrom: " // NOLINT(cert-err60-cpp)
259 0 : << "nonzero payload found in fragment passed as argument";
260 : }
261 :
262 : static bool print_warning = true;
263 :
264 0 : if (print_warning)
265 : {
266 0 : TLOG(TLVL_WARNING) << "Please note that MulticastTransfer::receiveFragmentFrom does not use its receiveTimeout argument";
267 0 : print_warning = false;
268 : }
269 :
270 0 : fragment.resizeBytes(max_fragment_size_words_ - sizeof(artdaq::detail::RawFragmentHeader));
271 :
272 : static auto current_sequenceID = std::numeric_limits<Fragment::sequence_id_t>::max();
273 : static auto current_fragmentID = std::numeric_limits<Fragment::fragment_id_t>::max();
274 :
275 0 : size_t fragment_size = 0;
276 0 : size_t expected_subfragments = 0;
277 0 : size_t current_subfragments = 0;
278 0 : bool fragment_complete = false;
279 0 : bool last_fragment_truncated = false;
280 :
281 : while (true)
282 : {
283 0 : auto bytes_received = socket_->receive_from(receive_buffers_, *opposite_endpoint_);
284 :
285 0 : size_t bytes_processed = 0;
286 :
287 0 : for (auto& buf : receive_buffers_)
288 : {
289 0 : auto buf_size = boost::asio::buffer_size(buf);
290 0 : auto size_t_ptr = boost::asio::buffer_cast<const size_t*>(buf);
291 0 : auto seqID = *size_t_ptr;
292 0 : auto fragID = *(size_t_ptr + 1); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
293 0 : auto subfragID = *(size_t_ptr + 2); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
294 :
295 0 : if (seqID != current_sequenceID || fragID != current_fragmentID)
296 : {
297 : // JCF, Jun-22-2016
298 : // Code currently operates under the assumption that all subfragments from the call are from the same fragment
299 :
300 0 : assert(bytes_processed == 0);
301 :
302 0 : if (current_subfragments < expected_subfragments)
303 : {
304 0 : last_fragment_truncated = true;
305 :
306 0 : if (expected_subfragments != std::numeric_limits<size_t>::max())
307 : {
308 0 : TLOG(TLVL_WARNING) << "Warning: only received " << current_subfragments << " subfragments for fragment with seqID = " << current_sequenceID << ", fragID = " << current_fragmentID << " (expected " << expected_subfragments << ")";
309 : }
310 : else
311 : {
312 0 : TLOG(TLVL_WARNING) << "Warning: only received " << current_subfragments << " subfragments for fragment with seqID = " << current_sequenceID << ", fragID = " << current_fragmentID << ", # of expected subfragments is unknown as fragment header was not received)";
313 : }
314 : }
315 :
316 0 : current_subfragments = 0;
317 0 : fragment_size = std::numeric_limits<size_t>::max();
318 0 : expected_subfragments = std::numeric_limits<size_t>::max();
319 0 : current_sequenceID = seqID;
320 0 : current_fragmentID = fragID;
321 : }
322 :
323 0 : auto ptr_into_fragment = fragment.headerBeginBytes() + subfragID * subfragment_size_; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
324 :
325 0 : auto ptr_into_buffer = boost::asio::buffer_cast<const byte_t*>(buf) + sizeof(subfragment_identifier);
326 :
327 0 : std::copy(ptr_into_buffer, ptr_into_buffer + buf_size - sizeof(subfragment_identifier), ptr_into_fragment); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
328 :
329 0 : if (subfragID == 0)
330 : {
331 0 : if (buf_size >= sizeof(subfragment_identifier) + sizeof(artdaq::detail::RawFragmentHeader))
332 : {
333 0 : auto payload_size = std::numeric_limits<size_t>::max();
334 0 : get_fragment_quantities(buf, payload_size, fragment_size, expected_subfragments);
335 :
336 0 : fragment.resizeBytes(payload_size);
337 : }
338 : else
339 : {
340 0 : throw cet::exception("MulticastTransfer") << "Buffer size is too small to completely contain an artdaq::Fragment header; " // NOLINT(cert-err60-cpp)
341 0 : << "please increase the default size";
342 : }
343 : }
344 :
345 0 : current_subfragments++;
346 :
347 0 : if (current_subfragments == expected_subfragments)
348 : {
349 0 : fragment_complete = true;
350 : }
351 :
352 0 : bytes_processed += buf_size;
353 :
354 0 : if (bytes_processed >= bytes_received)
355 : {
356 0 : break;
357 : }
358 : }
359 :
360 0 : if (last_fragment_truncated)
361 : {
362 : // JCF, 7-7-2017
363 :
364 : // Don't yet have code to handle the scenario where the set of
365 : // subfragments received in the last iteration of the loop was
366 : // its own complete fragment, but we know the previous fragment
367 : // to be incomplete
368 :
369 0 : assert(!fragment_complete);
370 0 : TLOG(TLVL_WARNING) << GetTraceName() << "Got an incomplete fragment";
371 0 : return artdaq::TransferInterface::RECV_TIMEOUT;
372 : }
373 :
374 0 : if (fragment_complete)
375 : {
376 0 : return source_rank();
377 : }
378 0 : }
379 :
380 : return TransferInterface::RECV_TIMEOUT;
381 : }
382 :
383 : #pragma GCC diagnostic pop
384 :
385 0 : int artdaq::MulticastTransfer::receiveFragmentHeader(detail::RawFragmentHeader& header, size_t receiveTimeout)
386 : {
387 0 : auto ret = receiveFragment(fragment_buffer_, receiveTimeout);
388 0 : if (ret == source_rank())
389 : {
390 0 : header = *reinterpret_cast<detail::RawFragmentHeader*>(fragment_buffer_.headerAddress()); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
391 0 : return source_rank();
392 : }
393 0 : return ret;
394 : }
395 :
396 0 : int artdaq::MulticastTransfer::receiveFragmentData(RawDataType* destination, size_t wordCount)
397 : {
398 0 : if (fragment_buffer_.size() > detail::RawFragmentHeader::num_words())
399 : {
400 0 : auto dataSize = (fragment_buffer_.size() - detail::RawFragmentHeader::num_words()) * sizeof(RawDataType);
401 0 : memcpy(destination, fragment_buffer_.headerAddress() + detail::RawFragmentHeader::num_words(), dataSize); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
402 0 : return source_rank();
403 : }
404 0 : return RECV_TIMEOUT;
405 : }
406 :
407 : // Reliable transport is undefined for multicast; just use copy
408 : artdaq::TransferInterface::CopyStatus
409 0 : artdaq::MulticastTransfer::transfer_fragment_reliable_mode(artdaq::Fragment&& f)
410 : {
411 0 : return transfer_fragment_min_blocking_mode(f, 100000000);
412 : }
413 :
414 : artdaq::TransferInterface::CopyStatus
415 0 : artdaq::MulticastTransfer::transfer_fragment_min_blocking_mode(artdaq::Fragment const& fragment,
416 : size_t send_timeout_usec)
417 : {
418 0 : assert(TransferInterface::role() == Role::kSend);
419 :
420 0 : if (fragment.sizeBytes() > max_fragment_size_words_)
421 : {
422 0 : throw cet::exception("MulticastTransfer") << "Error in MulticastTransfer::copyFragmentTo: " << fragment.sizeBytes() << " byte fragment exceeds max_fragment_size of " << max_fragment_size_words_; // NOLINT(cert-err60-cpp)
423 : }
424 :
425 0 : auto num_subfragments = static_cast<size_t>(std::ceil(fragment.sizeBytes() / static_cast<float>(subfragment_size_)));
426 :
427 0 : fill_staging_memory(fragment);
428 :
429 0 : for (size_t batch_index = 0;; batch_index++)
430 : {
431 0 : auto first_subfragment = batch_index * subfragments_per_send_;
432 0 : auto last_subfragment = (batch_index + 1) * subfragments_per_send_ >= num_subfragments ? num_subfragments - 1 : (batch_index + 1) * subfragments_per_send_ - 1;
433 :
434 0 : std::vector<boost::asio::const_buffer> buffers;
435 :
436 0 : book_container_of_buffers(buffers, fragment.sizeBytes(), num_subfragments, first_subfragment, last_subfragment);
437 :
438 0 : socket_->send_to(buffers, *multicast_endpoint_);
439 :
440 0 : usleep(pause_on_copy_usecs_);
441 :
442 0 : if (last_subfragment == num_subfragments - 1)
443 : {
444 0 : break;
445 : }
446 0 : }
447 0 : return CopyStatus::kSuccess;
448 : }
449 :
450 : #pragma GCC diagnostic push
451 : #pragma GCC diagnostic ignored "-Wsign-compare"
452 :
453 0 : void artdaq::MulticastTransfer::fill_staging_memory(const artdaq::Fragment& fragment)
454 : {
455 0 : auto num_subfragments = static_cast<size_t>(std::ceil(fragment.sizeBytes() / static_cast<float>(subfragment_size_)));
456 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "# of subfragments to use is " << num_subfragments;
457 :
458 0 : for (auto i_s = 0; i_s < num_subfragments; ++i_s)
459 : {
460 0 : auto staging_memory_copyto = &staging_memory_.at(i_s * (sizeof(subfragment_identifier) + subfragment_size_));
461 :
462 0 : subfragment_identifier sfi(fragment.sequenceID(), fragment.fragmentID(), i_s);
463 :
464 0 : std::copy(reinterpret_cast<byte_t*>(&sfi), // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
465 : reinterpret_cast<byte_t*>(&sfi) + sizeof(subfragment_identifier), // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic,cppcoreguidelines-pro-type-reinterpret-cast)
466 : staging_memory_copyto);
467 :
468 0 : auto low_ptr_into_fragment = fragment.headerBeginBytes() + subfragment_size_ * i_s; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
469 :
470 0 : auto high_ptr_into_fragment = (i_s == num_subfragments - 1) ? fragment.dataEndBytes() : fragment.headerBeginBytes() + subfragment_size_ * (i_s + 1); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
471 :
472 0 : std::copy(low_ptr_into_fragment,
473 : high_ptr_into_fragment,
474 : staging_memory_copyto + sizeof(subfragment_identifier)); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
475 : }
476 0 : }
477 :
478 : #pragma GCC diagnostic pop
479 :
480 : // Note that book_container_of_buffers includes, rather than excludes,
481 : // "last_subfragment_num"; in this regard it's different than the way
482 : // STL functions receive iterators. Note also that the lowest possible
483 : // value for "first_subfragment_num" is 0, not 1.
484 :
485 : template<typename T>
486 0 : void artdaq::MulticastTransfer::book_container_of_buffers(std::vector<T>& buffers,
487 : const size_t fragment_size,
488 : const size_t total_subfragments,
489 : const size_t first_subfragment_num,
490 : const size_t last_subfragment_num)
491 : {
492 0 : assert(staging_memory_.size() >= total_subfragments * (sizeof(subfragment_identifier) + subfragment_size_));
493 0 : assert(buffers.empty());
494 0 : assert(last_subfragment_num < total_subfragments);
495 :
496 0 : for (auto i_f = first_subfragment_num; i_f <= last_subfragment_num; ++i_f)
497 : {
498 0 : auto bytes_to_store = (i_f == total_subfragments - 1) ? sizeof(subfragment_identifier) + (fragment_size - (total_subfragments - 1) * subfragment_size_) : sizeof(subfragment_identifier) + subfragment_size_;
499 :
500 0 : buffers.emplace_back(&staging_memory_.at(i_f * (sizeof(subfragment_identifier) + subfragment_size_)),
501 : bytes_to_store);
502 : }
503 0 : }
504 :
505 : #pragma GCC diagnostic push // Needed since profile builds will ignore the assert
506 : #pragma GCC diagnostic ignored "-Wunused-variable"
507 :
508 0 : void artdaq::MulticastTransfer::get_fragment_quantities(const boost::asio::mutable_buffer& buf, size_t& payload_size,
509 : size_t& fragment_size,
510 : size_t& expected_subfragments)
511 : {
512 0 : auto* buffer_ptr = boost::asio::buffer_cast<byte_t*>(buf);
513 :
514 0 : auto subfragment_num = *(reinterpret_cast<size_t*>(buffer_ptr) + 2); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast,cppcoreguidelines-pro-bounds-pointer-arithmetic)
515 :
516 0 : assert(subfragment_num == 0);
517 :
518 0 : auto* header =
519 : reinterpret_cast<artdaq::detail::RawFragmentHeader*>(buffer_ptr + sizeof(subfragment_identifier)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast,cppcoreguidelines-pro-bounds-pointer-arithmetic)
520 :
521 0 : fragment_size = header->word_count * sizeof(artdaq::RawDataType);
522 :
523 0 : auto metadata_size = header->metadata_word_count * sizeof(artdaq::RawDataType);
524 0 : payload_size = fragment_size - metadata_size - artdaq::detail::RawFragmentHeader::num_words() * sizeof(artdaq::RawDataType);
525 :
526 0 : assert(fragment_size ==
527 : artdaq::detail::RawFragmentHeader::num_words() * sizeof(artdaq::RawDataType) +
528 : metadata_size +
529 : payload_size);
530 :
531 0 : expected_subfragments = static_cast<size_t>(std::ceil(fragment_size / static_cast<float>(subfragment_size_)));
532 0 : }
533 : #pragma GCC diagnostic pop
534 :
535 0 : void artdaq::MulticastTransfer::set_receive_buffer_size(size_t recv_buff_size)
536 : {
537 0 : if (recv_buff_size == 0)
538 : {
539 0 : return;
540 : }
541 0 : boost::asio::socket_base::receive_buffer_size actual_recv_buff_size;
542 0 : socket_->get_option(actual_recv_buff_size);
543 :
544 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Receive buffer size is currently " << actual_recv_buff_size.value() << " bytes, will try to change it to " << recv_buff_size;
545 :
546 0 : boost::asio::socket_base::receive_buffer_size recv_buff_option(recv_buff_size);
547 :
548 0 : boost::system::error_code ec;
549 0 : socket_->set_option(recv_buff_option, ec);
550 :
551 0 : if (ec.value() != 0)
552 : {
553 0 : TLOG(TLVL_ERROR) << "boost::system::error_code with value " << ec << " was found in attempt to change receive buffer";
554 0 : std::cerr << "boost::system::error_code with value " << ec << " was found in attempt to change receive buffer" << std::endl;
555 : }
556 :
557 0 : socket_->get_option(actual_recv_buff_size);
558 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "After attempted change, receive buffer size is now " << actual_recv_buff_size.value();
559 : }
560 :
561 : #pragma GCC diagnostic pop
562 :
563 0 : DEFINE_ARTDAQ_TRANSFER(artdaq::MulticastTransfer)
|