Line data Source code
1 : #include "BrokenTransferTest.hh"
2 :
3 : #include "artdaq-core/Data/detail/RawFragmentHeader.hh"
4 : #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
5 :
6 : #include <memory>
7 : #include <thread>
8 : #include "artdaq/DAQdata/Globals.hh"
9 : #define TRACE_NAME "BrokenTransferTest"
10 :
11 : #define TLVL_MAKE_TRANSFER_PS TLVL_DEBUG + 5
12 : #define TLVL_START_TEST TLVL_DEBUG + 6
13 : #define TLVL_STOP_TEST TLVL_DEBUG + 7
14 : #define TLVL_SENDER TLVL_DEBUG + 8
15 : #define TLVL_SENDER_TOKEN_WAIT TLVL_DEBUG + 9
16 : #define TLVL_RECEIVER TLVL_DEBUG + 10
17 :
18 0 : artdaqtest::BrokenTransferTest::BrokenTransferTest(const fhicl::ParameterSet& ps)
19 0 : : sender_ready_()
20 0 : , receiver_ready_()
21 0 : , sender_current_fragment_()
22 0 : , ps_(ps)
23 0 : , test_start_time_(std::chrono::steady_clock::now())
24 0 : , test_end_time_(std::chrono::steady_clock::now())
25 0 : , test_end_requested_(false)
26 0 : , fragment_rate_hz_(ps.get<size_t>("fragment_rate_hz", 10))
27 0 : , pause_first_sender_(false)
28 0 : , pause_receiver_(false)
29 0 : , kill_first_sender_(false)
30 0 : , kill_receiver_(false)
31 0 : , reliable_mode_(ps.get<bool>("reliable_mode", true))
32 0 : , fragment_size_(ps.get<size_t>("fragment_size", 0x10000))
33 0 : , send_timeout_us_(ps.get<size_t>("send_timeout_us", 100000))
34 0 : , transfer_buffer_count_(ps.get<size_t>("transfer_buffer_count", 10))
35 0 : , event_buffer_count_(ps.get<size_t>("event_buffer_count", 20))
36 0 : , event_buffer_timeout_us_(ps.get<size_t>("event_buffer_timeout_us", 1000000))
37 0 : , send_throttle_us_(0)
38 : {
39 0 : if (fragment_rate_hz_ == 0 || fragment_rate_hz_ > 100000)
40 : {
41 0 : TLOG(TLVL_WARNING) << "Invalid rate " << fragment_rate_hz_ << " Hz specified, setting to " << (fragment_rate_hz_ == 0 ? 1 : 1000) << " Hz";
42 0 : fragment_rate_hz_ = (fragment_rate_hz_ == 0 ? 1 : 1000);
43 : }
44 0 : }
45 :
46 0 : void artdaqtest::BrokenTransferTest::TestSenderPause()
47 : {
48 0 : TLOG(TLVL_INFO) << "TestSenderPause BEGIN";
49 0 : auto start_time = std::chrono::steady_clock::now();
50 0 : start_test_();
51 0 : usleep_for_n_buffer_epochs_(2);
52 :
53 0 : TLOG(TLVL_INFO) << "Pausing First Sender";
54 0 : pause_first_sender_ = true;
55 0 : usleep_for_n_buffer_epochs_(2);
56 0 : usleep(2 * event_buffer_timeout_us_);
57 :
58 0 : TLOG(TLVL_INFO) << "Resuming First Sender";
59 0 : pause_first_sender_ = false;
60 0 : usleep_for_n_buffer_epochs_(2);
61 :
62 0 : stop_test_();
63 0 : TLOG(TLVL_INFO) << "TestSenderPause END, duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
64 0 : }
65 :
66 0 : void artdaqtest::BrokenTransferTest::TestReceiverPause()
67 : {
68 0 : TLOG(TLVL_INFO) << "TestReceiverPause BEGIN";
69 0 : auto start_time = std::chrono::steady_clock::now();
70 0 : start_test_();
71 0 : usleep_for_n_buffer_epochs_(2);
72 :
73 0 : TLOG(TLVL_INFO) << "Pausing Recevier";
74 0 : pause_receiver_ = true;
75 0 : usleep_for_n_buffer_epochs_(2);
76 0 : usleep(2 * event_buffer_timeout_us_);
77 :
78 0 : TLOG(TLVL_INFO) << "Resuming Receiver";
79 0 : pause_receiver_ = false;
80 0 : usleep_for_n_buffer_epochs_(2);
81 :
82 0 : stop_test_();
83 0 : TLOG(TLVL_INFO) << "TestReceiverPause END, duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
84 0 : }
85 :
86 0 : void artdaqtest::BrokenTransferTest::TestSenderReconnect()
87 : {
88 0 : TLOG(TLVL_INFO) << "TestSenderReconnect BEGIN";
89 0 : auto start_time = std::chrono::steady_clock::now();
90 0 : start_test_();
91 0 : usleep_for_n_buffer_epochs_(2);
92 :
93 0 : TLOG(TLVL_INFO) << "Killing first Sender";
94 0 : kill_first_sender_ = true;
95 0 : if (sender_threads_[0].joinable())
96 : {
97 0 : sender_threads_[0].join();
98 : }
99 0 : kill_first_sender_ = false;
100 :
101 0 : usleep_for_n_buffer_epochs_(2);
102 0 : usleep(2 * event_buffer_timeout_us_);
103 :
104 0 : TLOG(TLVL_INFO) << "Restarting First Sender";
105 0 : boost::thread::attributes attrs;
106 0 : attrs.set_stack_size(4096 * 2000); // 2000 KB
107 : try
108 : {
109 0 : sender_threads_[0] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_sending_, this, 0));
110 : }
111 0 : catch (const boost::exception& e)
112 : {
113 0 : TLOG(TLVL_ERROR) << "Caught boost::exception starting Sender thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
114 0 : std::cerr << "Caught boost::exception starting Sender thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
115 0 : exit(5);
116 0 : }
117 :
118 0 : usleep_for_n_buffer_epochs_(2);
119 :
120 0 : stop_test_();
121 0 : TLOG(TLVL_INFO) << "TestSenderReconnect END, duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
122 0 : }
123 :
124 0 : void artdaqtest::BrokenTransferTest::TestReceiverReconnect(int send_throttle_factor)
125 : {
126 0 : TLOG(TLVL_INFO) << "TestReceiverReconnect BEGIN";
127 0 : auto start_time = std::chrono::steady_clock::now();
128 0 : send_throttle_us_ = send_throttle_factor * 1000000 / fragment_rate_hz_;
129 0 : start_test_();
130 0 : usleep_for_n_buffer_epochs_(2);
131 :
132 0 : TLOG(TLVL_INFO) << "Killing Receiver duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
133 : ;
134 0 : kill_receiver_ = true;
135 0 : if (receiver_threads_[0].joinable())
136 : {
137 0 : receiver_threads_[0].join();
138 : }
139 0 : if (receiver_threads_[1].joinable())
140 : {
141 0 : receiver_threads_[1].join();
142 : }
143 0 : kill_receiver_ = false;
144 :
145 0 : usleep_for_n_buffer_epochs_(2);
146 0 : usleep(2 * event_buffer_timeout_us_);
147 :
148 0 : TLOG(TLVL_INFO) << "Restarting Receiver duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
149 0 : boost::thread::attributes attrs;
150 0 : attrs.set_stack_size(4096 * 2000); // 2000 KB
151 : try
152 : {
153 0 : receiver_threads_[0] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_receiving_, this, 0, 2));
154 0 : receiver_threads_[1] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_receiving_, this, 1, 2));
155 : }
156 0 : catch (const boost::exception& e)
157 : {
158 0 : TLOG(TLVL_ERROR) << "Caught boost::exception starting Receiver thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
159 0 : std::cerr << "Caught boost::exception starting Receiver thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
160 0 : exit(5);
161 0 : }
162 :
163 0 : usleep_for_n_buffer_epochs_(2);
164 :
165 0 : TLOG(TLVL_INFO) << "Stopping test, duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
166 0 : stop_test_();
167 0 : TLOG(TLVL_INFO) << "TestReceiverReconnect END, duration=" << artdaq::TimeUtils::GetElapsedTime(start_time);
168 0 : }
169 :
170 0 : fhicl::ParameterSet artdaqtest::BrokenTransferTest::make_transfer_ps_(int sender_rank, int receiver_rank, const std::string& name)
171 : {
172 0 : auto thePs = ps_.get<fhicl::ParameterSet>("default_transfer_ps", fhicl::ParameterSet());
173 :
174 0 : thePs.put_or_replace("transferPluginType", ps_.get<std::string>("transfer_to_use", "TCPSocket"));
175 0 : thePs.put_or_replace("destination_rank", receiver_rank);
176 0 : thePs.put_or_replace("source_rank", sender_rank);
177 0 : thePs.put_or_replace("buffer_count", transfer_buffer_count_);
178 0 : if (!thePs.has_key("max_fragment_size_words"))
179 : {
180 0 : thePs.put("max_fragment_size_words", fragment_size_ + artdaq::detail::RawFragmentHeader::num_words() + 1);
181 : }
182 0 : fhicl::ParameterSet outputPs;
183 :
184 0 : TLOG(TLVL_MAKE_TRANSFER_PS) << "Configuring transfer between " << sender_rank << " and " << receiver_rank << " with ParameterSet: " << thePs.to_string();
185 :
186 0 : outputPs.put(name, thePs);
187 0 : return outputPs;
188 0 : }
189 :
190 0 : void artdaqtest::BrokenTransferTest::start_test_()
191 : {
192 0 : TLOG(TLVL_START_TEST) << "start_test_ BEGIN";
193 :
194 0 : sender_ready_[0] = false;
195 0 : sender_ready_[1] = false;
196 :
197 0 : receiver_ready_[0] = false;
198 0 : receiver_ready_[1] = false;
199 :
200 0 : sender_current_fragment_[0] = 0;
201 0 : sender_current_fragment_[1] = 0;
202 :
203 0 : test_start_time_ = std::chrono::steady_clock::now();
204 0 : test_end_time_ = std::chrono::steady_clock::now();
205 :
206 0 : test_end_requested_ = false;
207 0 : pause_first_sender_ = false;
208 0 : pause_receiver_ = false;
209 0 : kill_first_sender_ = false;
210 0 : kill_receiver_ = false;
211 :
212 0 : event_buffer_.clear();
213 0 : complete_events_.clear();
214 0 : timeout_events_.clear();
215 :
216 0 : TLOG(TLVL_START_TEST) << "start_test_: Starting receiver threads";
217 0 : boost::thread::attributes attrs;
218 0 : attrs.set_stack_size(4096 * 2000); // 2000 KB
219 : try
220 : {
221 0 : receiver_threads_[0] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_receiving_, this, 0, 2));
222 0 : receiver_threads_[1] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_receiving_, this, 1, 2));
223 : }
224 0 : catch (const boost::exception& e)
225 : {
226 0 : TLOG(TLVL_ERROR) << "Caught boost::exception starting Receiver thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
227 0 : std::cerr << "Caught boost::exception starting Receiver thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
228 0 : exit(5);
229 0 : }
230 :
231 0 : TLOG(TLVL_START_TEST) << "start_test_: Waiting for receiver_ready_";
232 0 : while (!receiver_ready_[0] || !receiver_ready_[1])
233 : {
234 0 : usleep(10000);
235 : }
236 :
237 0 : TLOG(TLVL_START_TEST) << "start_test_: Starting sender threads";
238 : try
239 : {
240 0 : sender_threads_[0] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_sending_, this, 0));
241 0 : sender_threads_[1] = boost::thread(attrs, boost::bind(&BrokenTransferTest::do_sending_, this, 1));
242 : }
243 0 : catch (const boost::exception& e)
244 : {
245 0 : TLOG(TLVL_ERROR) << "Caught boost::exception starting Sender thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
246 0 : std::cerr << "Caught boost::exception starting Sender thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
247 0 : exit(5);
248 0 : }
249 :
250 0 : TLOG(TLVL_START_TEST) << "start_test_: Waiting for sender_ready_";
251 0 : while (!sender_ready_[0] || !sender_ready_[1])
252 : {
253 0 : usleep(1000);
254 : }
255 :
256 0 : TLOG(TLVL_START_TEST) << "start_test_ DONE";
257 0 : }
258 :
259 0 : void artdaqtest::BrokenTransferTest::stop_test_()
260 : {
261 0 : TLOG(TLVL_STOP_TEST) << "stop_test_ BEGIN";
262 0 : test_end_time_ = std::chrono::steady_clock::now();
263 0 : test_end_requested_ = true;
264 :
265 0 : TLOG(TLVL_STOP_TEST) << "stop_test_: Waiting for sender threads to shut down";
266 0 : while (sender_ready_[0] || sender_ready_[1])
267 : {
268 0 : usleep(1000);
269 : }
270 :
271 0 : TLOG(TLVL_STOP_TEST) << "stop_test_: Joining sender threads";
272 0 : if (sender_threads_[0].joinable())
273 : {
274 0 : sender_threads_[0].join();
275 : }
276 0 : if (sender_threads_[1].joinable())
277 : {
278 0 : sender_threads_[1].join();
279 : }
280 :
281 0 : TLOG(TLVL_STOP_TEST) << "stop_test_: Waiting for receiver threads to shut down";
282 0 : while (receiver_ready_[0] || receiver_ready_[1])
283 : {
284 0 : usleep(1000);
285 : }
286 :
287 0 : TLOG(TLVL_STOP_TEST) << "stop_test_: Joining receiver threads";
288 0 : if (receiver_threads_[0].joinable())
289 : {
290 0 : receiver_threads_[0].join();
291 : }
292 0 : if (receiver_threads_[1].joinable())
293 : {
294 0 : receiver_threads_[1].join();
295 : }
296 :
297 0 : TLOG(TLVL_INFO) << "Sent " << sender_current_fragment_[0] << " events from rank 0 and " << sender_current_fragment_[1] << " events from rank 1.";
298 :
299 0 : artdaq::Fragment::sequence_id_t expected_events = sender_current_fragment_[0];
300 0 : if (sender_current_fragment_[1] > expected_events)
301 : {
302 0 : expected_events = sender_current_fragment_[1];
303 : }
304 :
305 0 : auto complete_events = complete_events_.size();
306 0 : auto incomplete_events = timeout_events_.size();
307 0 : auto missing_events = expected_events - complete_events - incomplete_events;
308 :
309 0 : TLOG(TLVL_INFO) << "Received " << complete_events << " complete events in " << fm_(artdaq::TimeUtils::GetElapsedTime(test_start_time_), "s")
310 0 : << ", Incomplete: " << incomplete_events << ", Missing: " << missing_events;
311 0 : TLOG(TLVL_STOP_TEST) << "stop_test_ END";
312 0 : }
313 :
314 0 : void artdaqtest::BrokenTransferTest::do_sending_(int sender_rank)
315 : {
316 0 : std::unique_ptr<artdaq::TransferInterface> theTransfer = artdaq::MakeTransferPlugin(make_transfer_ps_(sender_rank, 2, "d2"),
317 0 : "d2", artdaq::TransferInterface::Role::kSend);
318 :
319 0 : TLOG(TLVL_SENDER) << "Sender " << sender_rank << " setting sender_ready_";
320 0 : sender_ready_[sender_rank] = true;
321 :
322 0 : while (sender_current_fragment_[sender_rank] < sequence_id_target_() || !test_end_requested_)
323 : {
324 0 : if (sender_rank == 0 && kill_first_sender_)
325 : {
326 0 : break;
327 : }
328 0 : while (sender_rank == 0 && pause_first_sender_)
329 : {
330 0 : std::this_thread::yield();
331 0 : usleep(10000);
332 : }
333 :
334 0 : artdaq::Fragment frag(fragment_size_);
335 0 : frag.setSequenceID(sender_current_fragment_[sender_rank]);
336 0 : frag.setFragmentID(sender_rank);
337 0 : frag.setSystemType(artdaq::Fragment::DataFragmentType);
338 :
339 0 : auto start_time = std::chrono::steady_clock::now();
340 0 : auto sts = artdaq::TransferInterface::CopyStatus::kErrorNotRequiringException;
341 :
342 0 : if (sender_tokens_[sender_rank].load() == 0)
343 : {
344 0 : TLOG(TLVL_SENDER_TOKEN_WAIT) << "Sender " << sender_rank << " waiting for token from receiver";
345 0 : while (sender_tokens_[sender_rank].load() == 0 && !test_end_requested_) { usleep(10000); }
346 0 : if (test_end_requested_)
347 : {
348 0 : continue;
349 : }
350 0 : TLOG(TLVL_SENDER_TOKEN_WAIT) << "Sender " << sender_rank << " waited " << fm_(artdaq::TimeUtils::GetElapsedTime(start_time), "s") << " for token from receiver";
351 : }
352 :
353 0 : if (reliable_mode_)
354 : {
355 0 : sts = theTransfer->transfer_fragment_reliable_mode(std::move(frag));
356 : }
357 : else
358 : {
359 0 : sts = theTransfer->transfer_fragment_min_blocking_mode(frag, send_timeout_us_);
360 : }
361 :
362 0 : if (sts != artdaq::TransferInterface::CopyStatus::kSuccess)
363 : {
364 0 : TLOG(TLVL_ERROR) << "Error sending Fragment " << sender_current_fragment_[sender_rank] << " from sender rank " << sender_rank << ": "
365 0 : << artdaq::TransferInterface::CopyStatusToString(sts);
366 : }
367 0 : auto duration = artdaq::TimeUtils::GetElapsedTime(start_time);
368 0 : TLOG(TLVL_SENDER) << "Sender " << sender_rank << " Transferred Fragment " << sender_current_fragment_[sender_rank]
369 0 : << " with size " << fragment_size_ << " words in " << fm_(duration, "s")
370 0 : << " (approx " << fm_(static_cast<double>(fragment_size_ * sizeof(artdaq::detail::RawFragmentHeader::RawDataType)) / duration, "B/s")
371 0 : << ") throttle " << send_throttle_us_;
372 0 : ++sender_current_fragment_[sender_rank];
373 0 : sender_tokens_[sender_rank]--;
374 0 : throttle_sender_(sender_rank);
375 0 : }
376 :
377 0 : TLOG(TLVL_SENDER) << "Sender " << sender_rank << " shutting down...";
378 0 : theTransfer.reset(nullptr);
379 0 : sender_ready_[sender_rank] = false;
380 0 : TLOG(TLVL_SENDER) << "Sender " << sender_rank << " DONE";
381 0 : }
382 :
383 0 : void artdaqtest::BrokenTransferTest::do_receiving_(int sender_rank, int receiver_rank)
384 : {
385 : std::unique_ptr<artdaq::TransferInterface> theTransfer =
386 0 : artdaq::MakeTransferPlugin(make_transfer_ps_(sender_rank, receiver_rank, "s" + std::to_string(sender_rank)),
387 0 : "s" + std::to_string(sender_rank), artdaq::TransferInterface::Role::kReceive);
388 0 : artdaq::FragmentPtr dropFrag = nullptr;
389 :
390 0 : TLOG(TLVL_RECEIVER) << "Receiver " << sender_rank << "->" << receiver_rank << " setting receiver_ready_";
391 0 : receiver_ready_[sender_rank] = true;
392 0 : sender_tokens_[sender_rank] = event_buffer_count_;
393 :
394 0 : while (!event_buffer_.empty() || !test_end_requested_ || sender_ready_[0] || sender_ready_[1])
395 : {
396 0 : if (kill_receiver_)
397 : {
398 0 : break;
399 : }
400 0 : while (pause_receiver_)
401 : {
402 0 : std::this_thread::yield();
403 0 : usleep(10000);
404 : }
405 :
406 : artdaq::detail::RawFragmentHeader hdr;
407 0 : auto rank = theTransfer->receiveFragmentHeader(hdr, 100000);
408 :
409 0 : if (rank == artdaq::TransferInterface::RECV_TIMEOUT || event_buffer_.count(hdr.sequence_id) == 0)
410 : {
411 0 : std::unique_lock<std::mutex> lk(event_buffer_mutex_);
412 : do
413 : {
414 0 : event_buffer_cv_.wait_for(lk, std::chrono::microseconds(10000));
415 :
416 0 : auto it = event_buffer_.begin();
417 0 : while (it != event_buffer_.end())
418 : {
419 0 : if (artdaq::TimeUtils::GetElapsedTimeMicroseconds(it->second.open_time) > event_buffer_timeout_us_)
420 : {
421 0 : TLOG(TLVL_WARNING) << "Receiver " << sender_rank << "->" << receiver_rank << ": Event " << it->first
422 0 : << " has timed out after " << artdaq::TimeUtils::GetElapsedTime(it->second.open_time) << " s, removing...";
423 0 : timeout_events_.insert(it->first);
424 0 : it = event_buffer_.erase(it);
425 0 : sender_tokens_[0]++;
426 0 : sender_tokens_[1]++;
427 : }
428 : else
429 : {
430 0 : ++it;
431 : }
432 : }
433 0 : } while (event_buffer_.size() > event_buffer_count_);
434 0 : }
435 :
436 0 : if (rank != sender_rank)
437 : {
438 0 : continue;
439 : }
440 :
441 0 : artdaq::RawDataType* ptr = nullptr;
442 0 : bool first = true;
443 : {
444 0 : std::unique_lock<std::mutex> lk(event_buffer_mutex_);
445 0 : if (timeout_events_.count(hdr.sequence_id) != 0u)
446 : {
447 0 : TLOG(TLVL_WARNING) << "Event " << hdr.sequence_id << " has timed out, discarding";
448 0 : if (!dropFrag || dropFrag->size() < hdr.word_count)
449 : {
450 0 : dropFrag = std::make_unique<artdaq::Fragment>(hdr.word_count - hdr.num_words());
451 : }
452 0 : ptr = dropFrag->headerAddress() + hdr.num_words(); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
453 : }
454 : else
455 : {
456 0 : if (event_buffer_.count(hdr.sequence_id) == 0u)
457 : {
458 0 : event_buffer_[hdr.sequence_id].open_time = std::chrono::steady_clock::now();
459 0 : event_buffer_[hdr.sequence_id].first_frag.reset(new artdaq::Fragment(hdr.word_count - hdr.num_words()));
460 0 : ptr = event_buffer_[hdr.sequence_id].first_frag->headerAddress() + hdr.num_words(); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
461 0 : TLOG(TLVL_RECEIVER) << "Receiver " << sender_rank << "->" << receiver_rank << " opened event " << hdr.sequence_id
462 0 : << " with Fragment from rank " << sender_rank;
463 : }
464 : else
465 : {
466 0 : event_buffer_[hdr.sequence_id].second_frag.reset(new artdaq::Fragment(hdr.word_count - hdr.num_words()));
467 0 : ptr = event_buffer_[hdr.sequence_id].second_frag->headerAddress() + hdr.num_words(); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
468 0 : first = false;
469 : }
470 : }
471 0 : }
472 :
473 0 : rank = theTransfer->receiveFragmentData(ptr, hdr.word_count - hdr.num_words());
474 0 : if (rank != sender_rank)
475 : {
476 0 : TLOG(TLVL_ERROR) << "Error receiving Fragment data after header received successfully!";
477 0 : exit(1);
478 : }
479 :
480 0 : if (!first)
481 : {
482 0 : TLOG(TLVL_RECEIVER) << "Receiver " << sender_rank << "->" << receiver_rank << " completed event " << hdr.sequence_id
483 0 : << " in " << fm_(artdaq::TimeUtils::GetElapsedTime(event_buffer_[hdr.sequence_id].open_time), "s") << ".";
484 :
485 0 : std::unique_lock<std::mutex> lk(event_buffer_mutex_);
486 0 : complete_events_.insert(hdr.sequence_id);
487 0 : event_buffer_.erase(hdr.sequence_id);
488 0 : event_buffer_cv_.notify_one();
489 0 : sender_tokens_[0]++;
490 0 : sender_tokens_[1]++;
491 0 : }
492 : }
493 :
494 0 : TLOG(TLVL_RECEIVER) << "Receiver " << sender_rank << "->" << receiver_rank << " shutting down...";
495 0 : theTransfer->flush_buffers();
496 :
497 0 : std::lock_guard<std::mutex> lk(event_buffer_mutex_);
498 0 : theTransfer.reset(nullptr);
499 0 : receiver_ready_[sender_rank] = false;
500 0 : TLOG(TLVL_RECEIVER) << "Receiver " << sender_rank << "->" << receiver_rank << " DONE";
501 0 : }
502 :
503 0 : void artdaqtest::BrokenTransferTest::throttle_sender_(int sender_rank)
504 : {
505 0 : if (send_throttle_us_ != 0 && sender_current_fragment_[sender_rank] >= sequence_id_target_() - fragment_rate_hz_)
506 : {
507 0 : usleep(send_throttle_us_);
508 : }
509 0 : }
510 :
511 0 : artdaq::Fragment::sequence_id_t artdaqtest::BrokenTransferTest::sequence_id_target_()
512 : {
513 0 : auto ret = 1 + (artdaq::TimeUtils::GetElapsedTimeMicroseconds(test_start_time_) * fragment_rate_hz_ / 1000000);
514 0 : if (test_end_requested_)
515 : {
516 0 : ret = 1 + (artdaq::TimeUtils::GetElapsedTimeMicroseconds(test_start_time_, test_end_time_) * fragment_rate_hz_ / 1000000);
517 : }
518 : // TLOG(TLVL_DEBUG) << "sequence_id_target_ is " << ret;
519 0 : return ret;
520 : }
521 :
522 0 : std::string artdaqtest::BrokenTransferTest::fm_(double data, const std::string& units, int logt)
523 : {
524 0 : if (data < 1 && logt > -3)
525 : {
526 0 : return fm_(data * 1000, units, logt - 1);
527 : }
528 0 : if (data > 1000 && logt < 3)
529 : {
530 0 : return fm_(data / 1000, units, logt + 1);
531 : }
532 :
533 0 : std::stringstream o;
534 0 : o << std::fixed << std::setprecision(2) << data << " ";
535 0 : switch (logt)
536 : {
537 0 : case -3:
538 0 : o << "n";
539 0 : break;
540 0 : case -2:
541 0 : o << "u";
542 0 : break;
543 0 : case -1:
544 0 : o << "m";
545 0 : break;
546 0 : case 0:
547 : default:
548 0 : break;
549 0 : case 1:
550 0 : o << "K";
551 0 : break;
552 0 : case 2:
553 0 : o << "M";
554 0 : break;
555 0 : case 3:
556 0 : o << "G";
557 0 : break;
558 : }
559 0 : o << units;
560 0 : return o.str();
561 0 : }
|