Line data Source code
1 : #include <memory>
2 :
3 : #include "artdaq/DAQdata/Globals.hh"
4 : #define TRACE_NAME (app_name + "_BundleTransfer").c_str()
5 :
6 : #include "artdaq-core/Data/ContainerFragmentLoader.hh"
7 : #include "artdaq/TransferPlugins/TCPSocketTransfer.hh"
8 : #include "artdaq/TransferPlugins/TransferInterface.hh"
9 :
10 : #include <boost/thread.hpp>
11 :
12 : namespace artdaq {
13 : /**
14 : * \brief The BundleTransfer TransferInterface plugin automatically combines smaller Fragments to transfer a lower rate of larger Fragments, which TCP is better able to handle.
15 : */
16 : class BundleTransfer : public TransferInterface
17 : {
18 : public:
19 : /**
20 : * \brief BundleTransfer Constructor
21 : * \param pset ParameterSet used to configure BundleTransfer
22 : * \param role Role of this TransferInterface, either kReceive or kSend
23 : */
24 : BundleTransfer(const fhicl::ParameterSet& pset, Role role);
25 :
26 : /**
27 : * \brief BundleTransfer default Destructor
28 : */
29 : ~BundleTransfer() override;
30 :
31 : /**
32 : * \brief Receive a Fragment, using the underlying transfer plugin
33 : * \param fragment Output Fragment
34 : * \param receiveTimeout Time to wait before returning TransferInterface::RECV_TIMEOUT
35 : * \return Rank of sender
36 : */
37 0 : int receiveFragment(artdaq::Fragment& fragment,
38 : size_t receiveTimeout) override
39 : {
40 0 : if (bundle_fragment_ == nullptr)
41 : {
42 0 : receive_bundle_fragment_(receiveTimeout);
43 0 : if (current_rank_ < RECV_SUCCESS) return current_rank_;
44 : }
45 :
46 0 : ContainerFragment cf(*bundle_fragment_);
47 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Retrieving Fragment " << (current_block_index_ + 1) << " of " << cf.block_count();
48 0 : fragment.resizeBytes(cf.fragSize(current_block_index_) - sizeof(detail::RawFragmentHeader));
49 0 : memcpy(fragment.headerAddress(), static_cast<const uint8_t*>(cf.dataBegin()) + cf.fragmentIndex(current_block_index_), cf.fragSize(current_block_index_));
50 0 : current_block_index_++;
51 0 : if (current_block_index_ >= cf.block_count()) // Index vs. count!
52 : {
53 0 : bundle_fragment_.reset(nullptr);
54 : }
55 0 : return current_rank_;
56 0 : }
57 :
58 : /**
59 : * \brief Receive a Fragment Header from the transport mechanism
60 : * \param[out] header Received Fragment Header
61 : * \param receiveTimeout Timeout for receive
62 : * \return The rank the Fragment was received from (should be source_rank), or RECV_TIMEOUT
63 : */
64 0 : int receiveFragmentHeader(detail::RawFragmentHeader& header, size_t receiveTimeout) override
65 : {
66 0 : if (bundle_fragment_ == nullptr)
67 : {
68 0 : receive_bundle_fragment_(receiveTimeout);
69 0 : if (current_rank_ < RECV_SUCCESS) return current_rank_;
70 : }
71 0 : ContainerFragment cf(*bundle_fragment_);
72 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Retrieving Fragment Header " << (current_block_index_ + 1) << " of " << cf.block_count();
73 0 : memcpy(&header, static_cast<const uint8_t*>(cf.dataBegin()) + cf.fragmentIndex(current_block_index_), sizeof(detail::RawFragmentHeader));
74 0 : return current_rank_;
75 0 : }
76 :
77 : /**
78 : * \brief Receive the body of a Fragment to the given destination pointer
79 : * \param destination Pointer to memory region where Fragment data should be stored
80 : * \param wordCount Number of words of Fragment data to receive
81 : * \return The rank the Fragment was received from (should be source_rank), or RECV_TIMEOUT
82 : */
83 0 : int receiveFragmentData(RawDataType* destination, size_t /*wordCount*/) override
84 : {
85 0 : if (bundle_fragment_ == nullptr) // Should be impossible!
86 : {
87 0 : return RECV_TIMEOUT;
88 : }
89 0 : ContainerFragment cf(*bundle_fragment_);
90 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Retrieving Fragment Data " << (current_block_index_ + 1) << " of " << cf.block_count();
91 0 : memcpy(destination, static_cast<const uint8_t*>(cf.dataBegin()) + cf.fragmentIndex(current_block_index_) + sizeof(detail::RawFragmentHeader), cf.fragSize(current_block_index_) - sizeof(detail::RawFragmentHeader));
92 0 : current_block_index_++;
93 0 : if (current_block_index_ >= cf.block_count()) // Index vs. count!
94 : {
95 0 : bundle_fragment_.reset(nullptr);
96 : }
97 0 : return current_rank_;
98 0 : }
99 :
100 : /**
101 : * \brief Send a Fragment in non-reliable mode, using the underlying transfer plugin
102 : * \param fragment The Fragment to send
103 : * \param send_timeout_usec How long to wait before aborting. Defaults to size_t::MAX_VALUE
104 : * \return A TransferInterface::CopyStatus result variable
105 : */
106 0 : CopyStatus transfer_fragment_min_blocking_mode(artdaq::Fragment const& fragment, size_t send_timeout_usec) override
107 : {
108 0 : TLOG(TLVL_DEBUG + 35) << GetTraceName() << "transfer_fragment_min_blocking_mode START";
109 0 : last_send_call_reliable_ = false;
110 0 : last_send_timeout_usec_ = send_timeout_usec;
111 : {
112 0 : std::unique_lock<std::mutex> lk(fragment_mutex_);
113 0 : if (current_buffer_size_bytes_ > max_hold_size_bytes_)
114 : {
115 0 : fragment_cv_.wait_for(lk, std::chrono::microseconds(send_timeout_usec), [&] { return current_buffer_size_bytes_ < max_hold_size_bytes_; });
116 : }
117 :
118 0 : if (current_buffer_size_bytes_ > max_hold_size_bytes_)
119 : {
120 0 : TLOG(TLVL_WARNING) << GetTraceName() << "Dropping data due to timeout in min_blocking_mode";
121 0 : return CopyStatus::kTimeout;
122 : }
123 :
124 0 : TLOG(TLVL_DEBUG + 35) << GetTraceName() << "transfer_fragment_min_blocking_mode after wait for buffer";
125 : // Always send along Broadcast Fragments immediately
126 0 : if (Fragment::isBroadcastFragmentType(fragment.type()))
127 : {
128 0 : system_fragment_cached_ = true;
129 : }
130 :
131 0 : current_buffer_size_bytes_ += fragment.sizeBytes();
132 : // Eww, we have to copy
133 0 : fragment_buffer_.emplace_back(fragment);
134 0 : }
135 0 : TLOG(TLVL_DEBUG + 35) << GetTraceName() << "transfer_fragment_min_blocking_mode END";
136 0 : return last_copy_status_; // Might be a lie, but we're going to send from the thread proc
137 : }
138 :
139 : /**
140 : * \brief Send a Fragment in reliable mode, using the underlying transfer plugin
141 : * \param fragment The Fragment to send
142 : * \return A TransferInterface::CopyStatus result variable
143 : */
144 0 : CopyStatus transfer_fragment_reliable_mode(artdaq::Fragment&& fragment) override
145 : {
146 0 : TLOG(TLVL_DEBUG + 36) << GetTraceName() << "transfer_fragment_reliable_mode START";
147 0 : last_send_call_reliable_ = true;
148 : {
149 0 : std::unique_lock<std::mutex> lk(fragment_mutex_);
150 0 : while (current_buffer_size_bytes_ > max_hold_size_bytes_)
151 : {
152 0 : fragment_cv_.wait(lk, [&] { return current_buffer_size_bytes_ < max_hold_size_bytes_; });
153 : }
154 :
155 0 : TLOG(TLVL_DEBUG + 36) << GetTraceName() << "transfer_fragment_reliable_mode after wait for buffer";
156 :
157 : // Always send along Broadcast Fragments immediately
158 0 : if (Fragment::isBroadcastFragmentType(fragment.type()))
159 : {
160 0 : system_fragment_cached_ = true;
161 : }
162 :
163 0 : current_buffer_size_bytes_ += fragment.sizeBytes();
164 0 : fragment_buffer_.emplace_back(std::move(fragment));
165 0 : }
166 0 : TLOG(TLVL_DEBUG + 36) << GetTraceName() << "transfer_fragment_reliable_mode END";
167 0 : return last_copy_status_; // Might be a lie, but we're going to send from the thread proc
168 : }
169 :
170 : /**
171 : * \brief Determine whether the TransferInterface plugin is able to send/receive data
172 : * \return True if the TransferInterface plugin is currently able to send/receive data
173 : */
174 0 : bool isRunning() override { return running_; }
175 :
176 : /**
177 : * \brief Flush any in-flight data. This should be used by the receiver after the receive loop has
178 : * ended.
179 : */
180 0 : void flush_buffers() override { theTransfer_->flush_buffers(); }
181 :
182 : private:
183 : BundleTransfer(BundleTransfer const&) = delete;
184 : BundleTransfer(BundleTransfer&&) = delete;
185 : BundleTransfer& operator=(BundleTransfer const&) = delete;
186 : BundleTransfer& operator=(BundleTransfer&&) = delete;
187 :
188 : private:
189 : std::unique_ptr<TransferInterface> theTransfer_;
190 : size_t send_threshold_bytes_;
191 : size_t max_hold_size_bytes_;
192 : int max_hold_time_us_;
193 : FragmentPtr bundle_fragment_{nullptr};
194 : Fragments fragment_buffer_;
195 : size_t current_block_index_{0};
196 : int current_rank_ = 0;
197 : CopyStatus last_copy_status_{CopyStatus::kSuccess};
198 :
199 : std::chrono::steady_clock::time_point send_fragment_started_;
200 : std::atomic<size_t> current_buffer_size_bytes_{0};
201 : std::unique_ptr<boost::thread> send_timeout_thread_;
202 : std::atomic<bool> system_fragment_cached_{false};
203 : std::atomic<bool> send_timeout_thread_running_{false};
204 : std::atomic<bool> last_send_call_reliable_{true};
205 : std::atomic<size_t> last_send_timeout_usec_{1000000};
206 : std::atomic<bool> running_{true};
207 : std::mutex fragment_mutex_;
208 : std::condition_variable fragment_cv_;
209 :
210 : bool check_send_(bool force);
211 : void start_timeout_thread_();
212 : void send_timeout_thread_proc_();
213 : bool send_bundle_fragment_(bool forceSend = false);
214 : void receive_bundle_fragment_(size_t receiveTimeout);
215 : };
216 : } // namespace artdaq
217 :
218 0 : artdaq::BundleTransfer::BundleTransfer(const fhicl::ParameterSet& pset, Role role)
219 : : TransferInterface(pset, role)
220 0 : , send_threshold_bytes_(pset.get<size_t>("send_threshold_bytes", 10 * 0x100000)) // 10 MB
221 0 : , max_hold_size_bytes_(pset.get<size_t>("max_hold_size_bytes", 1000 * 0x100000)) // 1000 MB
222 0 : , max_hold_time_us_(pset.get<int>("max_hold_time_us", 100000)) // 0.1 s
223 : {
224 0 : TLOG(TLVL_INFO) << GetTraceName() << "Begin BundleTransfer constructor";
225 0 : TLOG(TLVL_INFO) << GetTraceName() << "Constructing TCPSocketTransfer";
226 0 : theTransfer_ = std::make_unique<TCPSocketTransfer>(pset, role);
227 :
228 0 : if (role == Role::kSend)
229 : {
230 0 : start_timeout_thread_();
231 : }
232 0 : }
233 :
234 0 : artdaq::BundleTransfer::~BundleTransfer()
235 : {
236 0 : if (role_ == Role::kSend)
237 : {
238 0 : send_timeout_thread_running_ = false;
239 0 : if (send_timeout_thread_ && send_timeout_thread_->joinable())
240 : {
241 0 : send_timeout_thread_->join();
242 : }
243 0 : send_bundle_fragment_(true);
244 : }
245 0 : running_ = false;
246 0 : }
247 :
248 0 : void artdaq::BundleTransfer::start_timeout_thread_()
249 : {
250 0 : if (send_timeout_thread_ && send_timeout_thread_->joinable())
251 : {
252 0 : send_timeout_thread_->join();
253 : }
254 0 : send_timeout_thread_running_ = true;
255 0 : TLOG(TLVL_INFO) << GetTraceName() << "Starting Send Timeout Thread";
256 :
257 : try
258 : {
259 0 : send_timeout_thread_ = std::make_unique<boost::thread>(&BundleTransfer::send_timeout_thread_proc_, this);
260 : char tname[16]; // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
261 0 : snprintf(tname, sizeof(tname) - 1, "%d-SNDTMO", my_rank); // NOLINT
262 0 : tname[sizeof(tname) - 1] = '\0'; // assure term. snprintf is not too evil :)
263 0 : auto handle = send_timeout_thread_->native_handle();
264 0 : pthread_setname_np(handle, tname);
265 : }
266 0 : catch (const boost::exception& e)
267 : {
268 0 : TLOG(TLVL_ERROR) << GetTraceName() << "Caught boost::exception starting Send Timeout thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
269 0 : std::cerr << GetTraceName() << "Caught boost::exception starting Send Timeout thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
270 0 : exit(5);
271 0 : }
272 0 : }
273 :
274 0 : void artdaq::BundleTransfer::send_timeout_thread_proc_()
275 : {
276 0 : while (send_timeout_thread_running_)
277 : {
278 0 : if (!send_bundle_fragment_())
279 : {
280 0 : usleep(5000);
281 : }
282 : }
283 0 : }
284 :
285 0 : bool artdaq::BundleTransfer::check_send_(bool force)
286 : {
287 0 : if (force)
288 : {
289 0 : TLOG(TLVL_DEBUG + 37) << GetTraceName() << "check_send_: Send is forced, returning true";
290 0 : return true;
291 : }
292 :
293 0 : if (system_fragment_cached_.load())
294 : {
295 0 : TLOG(TLVL_DEBUG + 37) << GetTraceName() << "check_send_: System Fragment in cache, returning true";
296 0 : return true;
297 : }
298 :
299 0 : if (std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - send_fragment_started_).count() >= max_hold_time_us_)
300 : {
301 0 : TLOG(TLVL_DEBUG + 37) << GetTraceName() << "check_send_: Send timeout reached, returning true";
302 0 : return true;
303 : }
304 :
305 0 : if (current_buffer_size_bytes_ >= send_threshold_bytes_)
306 : {
307 0 : TLOG(TLVL_DEBUG + 37) << GetTraceName() << "check_send_: Buffer is full, returning true";
308 0 : return true;
309 : }
310 :
311 0 : TLOG(TLVL_DEBUG + 37) << GetTraceName() << "check_send_: returning false";
312 0 : return false;
313 : }
314 :
315 0 : bool artdaq::BundleTransfer::send_bundle_fragment_(bool forceSend)
316 : {
317 : {
318 0 : std::unique_lock<std::mutex> lk(fragment_mutex_);
319 :
320 0 : bool send_fragment = check_send_(forceSend);
321 :
322 0 : if (send_fragment && fragment_buffer_.size() > 0)
323 : {
324 0 : TLOG(TLVL_DEBUG + 38) << GetTraceName() << "Swapping in new buffer";
325 0 : Fragments temp_buffer;
326 0 : size_t size = current_buffer_size_bytes_;
327 0 : fragment_buffer_.swap(temp_buffer);
328 0 : send_fragment_started_ = std::chrono::steady_clock::now();
329 0 : system_fragment_cached_ = false;
330 0 : current_buffer_size_bytes_ = 0;
331 0 : lk.unlock();
332 0 : TLOG(TLVL_DEBUG + 38) << GetTraceName() << "Notifying waiters";
333 0 : fragment_cv_.notify_one();
334 :
335 0 : TLOG(TLVL_DEBUG + 38) << GetTraceName() << "Setting up Bundle Fragment";
336 0 : bundle_fragment_.reset(new artdaq::Fragment(temp_buffer.front().sequenceID() + 1, temp_buffer.front().fragmentID()));
337 0 : bundle_fragment_->setTimestamp(temp_buffer.front().timestamp());
338 0 : bundle_fragment_->reserve(size / sizeof(artdaq::RawDataType));
339 :
340 0 : TLOG(TLVL_DEBUG + 38) << GetTraceName() << "Filling Bundle Fragment, sz = " << temp_buffer.size();
341 0 : ContainerFragmentLoader container_fragment(*bundle_fragment_);
342 0 : container_fragment.set_missing_data(false); // Buffer mode is never missing data, even if there IS no data.
343 0 : container_fragment.addFragments(temp_buffer, true);
344 0 : temp_buffer.clear();
345 :
346 0 : TLOG(TLVL_DEBUG + 38) << GetTraceName() << "Sending Fragment, reliable mode " << last_send_call_reliable_.load();
347 0 : CopyStatus sts = CopyStatus::kSuccess;
348 0 : if (last_send_call_reliable_)
349 : {
350 0 : sts = theTransfer_->transfer_fragment_reliable_mode(std::move(*bundle_fragment_.get()));
351 0 : bundle_fragment_.reset(nullptr);
352 : }
353 : else
354 : {
355 0 : while (sts != CopyStatus::kSuccess && send_timeout_thread_running_)
356 : {
357 0 : sts = theTransfer_->transfer_fragment_min_blocking_mode(*bundle_fragment_.get(), last_send_timeout_usec_);
358 : }
359 0 : bundle_fragment_.reset(nullptr);
360 : }
361 0 : last_copy_status_ = sts;
362 0 : if (sts != CopyStatus::kSuccess)
363 : {
364 0 : auto sts_string = sts == CopyStatus::kTimeout ? "timeout" : "other error";
365 0 : TLOG(TLVL_WARNING) << GetTraceName() << "Transfer of Bundle fragment returned status " << sts_string;
366 : }
367 :
368 0 : TLOG(TLVL_DEBUG + 38) << GetTraceName() << "Done sending Bundle Fragment";
369 :
370 0 : return true; // Status of actual transfer
371 0 : }
372 0 : }
373 0 : return false; // Waiting on more data
374 : }
375 :
376 0 : void artdaq::BundleTransfer::receive_bundle_fragment_(size_t receiveTimeout)
377 : {
378 0 : std::lock_guard<std::mutex> lk(fragment_mutex_);
379 0 : bundle_fragment_.reset(new artdaq::Fragment(1));
380 :
381 0 : TLOG(TLVL_DEBUG + 34) << GetTraceName() << "Going to receive next bundle fragment";
382 0 : current_rank_ = theTransfer_->receiveFragment(*bundle_fragment_, receiveTimeout);
383 0 : TLOG(TLVL_DEBUG + 34) << GetTraceName() << "Done with receiveFragment, current_rank_ = " << current_rank_;
384 :
385 0 : if (current_rank_ < RECV_SUCCESS)
386 : {
387 0 : bundle_fragment_.reset(nullptr);
388 : }
389 0 : current_block_index_ = 0;
390 0 : }
391 :
392 0 : DEFINE_ARTDAQ_TRANSFER(artdaq::BundleTransfer)
|