Line data Source code
1 : #include "artdaq/DAQrate/TransferTest.hh"
2 :
3 : #include "artdaq-core/Data/Fragment.hh"
4 : #include "artdaq/DAQrate/DataSenderManager.hh"
5 : #include "artdaq/DAQrate/FragmentReceiverManager.hh"
6 :
7 : #define TRACE_NAME "TransferTest"
8 : #include "artdaq/DAQdata/Globals.hh"
9 :
10 : #include <future>
11 :
12 0 : artdaq::TransferTest::TransferTest(fhicl::ParameterSet psi)
13 0 : : senders_(psi.get<int>("num_senders"))
14 0 : , receivers_(psi.get<int>("num_receivers"))
15 0 : , sending_threads_(psi.get<int>("sending_threads", 1))
16 0 : , sends_each_sender_(psi.get<int>("sends_per_sender"))
17 0 : , receives_each_receiver_(0)
18 0 : , buffer_count_(psi.get<int>("buffer_count", 10))
19 0 : , error_count_max_(psi.get<int>("max_errors_before_abort", 3))
20 0 : , fragment_size_(psi.get<size_t>("fragment_size", 0x100000))
21 0 : , validate_mode_(psi.get<bool>("validate_data_mode", false))
22 0 : , partition_number_(psi.get<int>("partition_number", rand() % 0x7F)) // NOLINT(cert-msc50-cpp)
23 : {
24 0 : TLOG(TLVL_DEBUG + 35) << "CONSTRUCTOR";
25 :
26 0 : if (fragment_size_ < artdaq::detail::RawFragmentHeader::num_words() * sizeof(artdaq::RawDataType))
27 : {
28 0 : fragment_size_ = artdaq::detail::RawFragmentHeader::num_words() * sizeof(artdaq::RawDataType);
29 : }
30 :
31 0 : fhicl::ParameterSet metric_pset;
32 :
33 : try
34 : {
35 0 : metric_pset = psi.get<fhicl::ParameterSet>("metrics");
36 : }
37 0 : catch (...)
38 0 : {} // OK if there's no metrics table defined in the FHiCL
39 :
40 : try
41 : {
42 0 : std::string name = "TransferTest" + std::to_string(my_rank);
43 0 : metricMan->initialize(metric_pset, name);
44 0 : metricMan->do_start();
45 0 : }
46 0 : catch (...)
47 0 : {}
48 :
49 0 : auto type(psi.get<std::string>("transfer_plugin_type", "TCPSocket"));
50 :
51 0 : bool broadcast_mode = psi.get<bool>("broadcast_sends", false);
52 0 : if (broadcast_mode)
53 : {
54 0 : receives_each_receiver_ = senders_ * sending_threads_ * sends_each_sender_;
55 : }
56 : else
57 : {
58 0 : if (receivers_ > 0)
59 : {
60 0 : if (senders_ * sending_threads_ * sends_each_sender_ % receivers_ != 0)
61 : {
62 0 : TLOG(TLVL_DEBUG + 33) << "Adding sends so that sends_each_sender * num_sending_ranks is a multiple of num_receiving_ranks" << std::endl;
63 0 : while (senders_ * sends_each_sender_ % receivers_ != 0)
64 : {
65 0 : sends_each_sender_++;
66 : }
67 0 : receives_each_receiver_ = senders_ * sending_threads_ * sends_each_sender_ / receivers_;
68 0 : TLOG(TLVL_DEBUG + 33) << "sends_each_sender is now " << sends_each_sender_ << std::endl;
69 0 : psi.put_or_replace("sends_per_sender", sends_each_sender_);
70 : }
71 : else
72 : {
73 0 : receives_each_receiver_ = senders_ * sending_threads_ * sends_each_sender_ / receivers_;
74 : }
75 : }
76 : }
77 :
78 0 : std::string hostmap;
79 0 : if (psi.has_key("hostmap"))
80 : {
81 0 : hostmap = " host_map: @local::hostmap";
82 : }
83 :
84 0 : std::stringstream ss;
85 0 : ss << psi.to_string() << std::endl;
86 :
87 0 : ss << " sources: {";
88 0 : for (int ii = 0; ii < senders_; ++ii)
89 : {
90 0 : ss << "s" << ii << ": { transferPluginType: " << type << " source_rank: " << ii << " max_fragment_size_words : " << fragment_size_ << " buffer_count : " << buffer_count_ << " partition_number : " << partition_number_ << hostmap << " }" << std::endl;
91 : }
92 0 : ss << "}" << std::endl
93 0 : << " destinations: {";
94 0 : for (int jj = senders_; jj < senders_ + receivers_; ++jj)
95 : {
96 0 : ss << "d" << jj << ": { transferPluginType: " << type << " destination_rank: " << jj << " max_fragment_size_words : " << fragment_size_ << " buffer_count : " << buffer_count_ << " partition_number : " << partition_number_ << hostmap << " }" << std::endl;
97 : }
98 0 : ss << "}" << std::endl;
99 :
100 0 : ps_ = fhicl::ParameterSet::make(ss.str());
101 :
102 0 : TLOG(TLVL_DEBUG + 32) << "Going to configure with ParameterSet: " << ps_.to_string() << std::endl;
103 0 : }
104 :
105 0 : int artdaq::TransferTest::runTest()
106 : {
107 0 : TLOG(TLVL_INFO) << "runTest BEGIN: " << (my_rank < senders_ ? "sending" : "receiving");
108 0 : start_time_ = std::chrono::steady_clock::now();
109 0 : std::pair<size_t, double> result;
110 0 : if (my_rank >= senders_ + receivers_)
111 : {
112 0 : return 0;
113 : }
114 0 : if (my_rank < senders_)
115 : {
116 0 : std::vector<std::future<std::pair<size_t, double>>> results_futures(sending_threads_);
117 0 : for (int ii = 0; ii < sending_threads_; ++ii)
118 : {
119 0 : results_futures[ii] = std::async(std::launch::async, std::bind(&TransferTest::do_sending, this, ii));
120 : }
121 0 : for (auto& future : results_futures)
122 : {
123 0 : if (future.valid())
124 : {
125 0 : auto thisresult = future.get();
126 0 : result.first += thisresult.first;
127 0 : result.second += thisresult.second;
128 : }
129 : }
130 0 : }
131 : else
132 : {
133 0 : result = do_receiving();
134 : }
135 0 : auto duration = std::chrono::duration_cast<artdaq::TimeUtils::seconds>(std::chrono::steady_clock::now() - start_time_).count();
136 0 : TLOG(TLVL_INFO) << (my_rank < senders_ ? "Sent " : "Received ") << result.first << " bytes in " << duration << " seconds ( " << formatBytes(result.first / duration) << "/s )." << std::endl;
137 0 : TLOG(TLVL_INFO) << "Rate of " << (my_rank < senders_ ? "sending" : "receiving") << ": " << formatBytes(result.first / result.second) << "/s, " << formatHertz((my_rank < senders_ ? sends_each_sender_ : receives_each_receiver_) / duration)
138 0 : << std::endl;
139 0 : metricMan->do_stop();
140 0 : metricMan->shutdown();
141 0 : TLOG(TLVL_DEBUG + 36) << "runTest DONE";
142 0 : return return_code_;
143 : }
144 :
145 0 : std::pair<size_t, double> artdaq::TransferTest::do_sending(int index)
146 : {
147 0 : TLOG(TLVL_DEBUG + 34) << "do_sending entered RawFragmentHeader::num_words()=" << artdaq::detail::RawFragmentHeader::num_words();
148 :
149 0 : size_t totalSize = 0;
150 0 : double totalTime = 0;
151 0 : artdaq::DataSenderManager sender(ps_);
152 :
153 0 : unsigned data_size_wrds = (fragment_size_ / sizeof(artdaq::RawDataType)) - artdaq::detail::RawFragmentHeader::num_words();
154 0 : artdaq::Fragment frag(data_size_wrds);
155 :
156 0 : if (validate_mode_)
157 : {
158 0 : artdaq::RawDataType gen_seed = 0;
159 :
160 0 : std::generate_n(frag.dataBegin(), data_size_wrds, [&]() { return ++gen_seed; });
161 0 : for (size_t ii = 0; ii < frag.dataSize(); ++ii)
162 : {
163 0 : if (*(frag.dataBegin() + ii) != ii + 1)
164 : {
165 0 : TLOG(TLVL_ERROR) << "Data corruption detected! (" << (*(frag.dataBegin() + ii)) << " != " << (ii + 1) << ") Aborting!";
166 0 : return_code_ = 255;
167 0 : return std::make_pair(0, 0.0);
168 : }
169 : }
170 : }
171 :
172 0 : int metric_send_interval = sends_each_sender_ / 1000 > 1 ? sends_each_sender_ / 1000 : 1;
173 0 : auto init_time_metric = 0.0;
174 0 : auto send_time_metric = 0.0;
175 0 : auto after_time_metric = 0.0;
176 0 : auto send_size_metric = 0.0;
177 0 : auto error_count = 0;
178 :
179 0 : for (int ii = 0; ii < sends_each_sender_; ++ii)
180 : {
181 0 : auto loop_start = std::chrono::steady_clock::now();
182 0 : TLOG(TLVL_DEBUG + 34) << "sender rank " << my_rank << " #" << ii << " resized bytes=" << frag.sizeBytes();
183 0 : totalSize += frag.sizeBytes();
184 :
185 : // unsigned sndDatSz = data_size_wrds;
186 0 : frag.setSequenceID(ii * sending_threads_ + index);
187 0 : frag.setFragmentID(my_rank);
188 0 : frag.setSystemType(artdaq::Fragment::DataFragmentType);
189 : /*
190 : artdaq::Fragment::iterator it = frag.dataBegin();
191 : *it = my_rank;
192 : *++it = ii;
193 : *++it = sndDatSz;*/
194 :
195 0 : auto send_start = std::chrono::steady_clock::now();
196 0 : TLOG(TLVL_DEBUG + 32) << "Sender " << my_rank << " sending fragment " << ii;
197 0 : auto stspair = sender.sendFragment(std::move(frag));
198 0 : auto after_send = std::chrono::steady_clock::now();
199 0 : TLOG(TLVL_DEBUG + 33) << "Sender " << my_rank << " sent fragment " << ii;
200 0 : sender.RemoveRoutingTableEntry(ii * sending_threads_ + index);
201 : // usleep( (data_size_wrds*sizeof(artdaq::RawDataType))/233 );
202 :
203 0 : if (stspair.second != artdaq::TransferInterface::CopyStatus::kSuccess)
204 : {
205 0 : error_count++;
206 0 : if (error_count >= error_count_max_)
207 : {
208 0 : TLOG(TLVL_ERROR) << "Too many errors sending fragments! Aborting... (sent=" << ii << "/" << sends_each_sender_ << ")";
209 0 : return_code_ = sends_each_sender_ - ii;
210 0 : return std::make_pair(0, 0.0);
211 : }
212 : }
213 :
214 0 : frag = artdaq::Fragment(data_size_wrds); // replace/renew
215 0 : if (validate_mode_)
216 : {
217 0 : artdaq::RawDataType gen_seed = ii + 1;
218 :
219 0 : std::generate_n(frag.dataBegin(), data_size_wrds, [&]() { return ++gen_seed; });
220 0 : for (size_t jj = 0; jj < frag.dataSize(); ++jj)
221 : {
222 0 : if (*(frag.dataBegin() + jj) != (ii + 1) + jj + 1)
223 : {
224 0 : TLOG(TLVL_ERROR) << "Input Data corruption detected! (" << *(frag.dataBegin() + jj) << " != " << ii + jj + 2 << " at position " << ii << ") Aborting!";
225 0 : return_code_ = 254;
226 0 : return std::make_pair(0, 0.0);
227 : }
228 : }
229 : }
230 0 : TLOG(TLVL_DEBUG + 37) << "sender rank " << my_rank << " frag replaced";
231 :
232 0 : auto total_send_time = std::chrono::duration_cast<artdaq::TimeUtils::seconds>(after_send - send_start).count();
233 0 : totalTime += total_send_time;
234 0 : send_time_metric += total_send_time;
235 0 : send_size_metric += data_size_wrds * sizeof(artdaq::RawDataType);
236 0 : after_time_metric += std::chrono::duration_cast<artdaq::TimeUtils::seconds>(std::chrono::steady_clock::now() - after_send).count();
237 0 : init_time_metric += std::chrono::duration_cast<artdaq::TimeUtils::seconds>(send_start - loop_start).count();
238 :
239 0 : if (metricMan && ii % metric_send_interval == 0)
240 : {
241 0 : metricMan->sendMetric("send_init_time", init_time_metric, "seconds", 3, MetricMode::Accumulate);
242 0 : metricMan->sendMetric("total_send_time", send_time_metric, "seconds", 3, MetricMode::Accumulate);
243 0 : metricMan->sendMetric("after_send_time", after_time_metric, "seconds", 3, MetricMode::Accumulate);
244 0 : metricMan->sendMetric("send_rate", send_size_metric / send_time_metric, "B/s", 3, MetricMode::Average);
245 0 : init_time_metric = 0.0;
246 0 : send_time_metric = 0.0;
247 0 : after_time_metric = 0.0;
248 0 : send_size_metric = 0.0;
249 : }
250 0 : usleep(0); // Yield execution
251 : }
252 :
253 0 : return std::make_pair(totalSize, totalTime);
254 0 : } // do_sending
255 :
256 0 : std::pair<size_t, double> artdaq::TransferTest::do_receiving()
257 : {
258 0 : TLOG(TLVL_DEBUG + 34) << "do_receiving entered";
259 :
260 0 : artdaq::FragmentReceiverManager receiver(ps_);
261 0 : receiver.start_threads();
262 0 : int counter = receives_each_receiver_;
263 0 : size_t totalSize = 0;
264 0 : double totalTime = 0;
265 0 : bool first = true;
266 0 : bool nonblocking_mode = ps_.get<bool>("nonblocking_sends", false);
267 0 : std::atomic<int> activeSenders(senders_ * sending_threads_);
268 0 : auto end_loop = std::chrono::steady_clock::now();
269 0 : auto last_receive = std::chrono::steady_clock::now();
270 :
271 0 : auto recv_size_metric = 0.0;
272 0 : auto recv_time_metric = 0.0;
273 0 : auto input_wait_metric = 0.0;
274 0 : auto init_wait_metric = 0.0;
275 0 : int metric_send_interval = receives_each_receiver_ / 1000 > 1 ? receives_each_receiver_ : 1;
276 :
277 : // Only abort when there are no senders if were's > 90% done
278 0 : while ((activeSenders > 0 || (counter > receives_each_receiver_ / 10 && !nonblocking_mode)) && counter > 0)
279 : {
280 0 : auto start_loop = std::chrono::steady_clock::now();
281 0 : TLOG(TLVL_DEBUG + 34) << "do_receiving: Counter is " << counter << ", calling recvFragment (activeSenders=" << activeSenders << ")";
282 0 : int senderSlot = artdaq::TransferInterface::RECV_TIMEOUT;
283 0 : auto before_receive = std::chrono::steady_clock::now();
284 :
285 0 : auto ignoreFragPtr = receiver.recvFragment(senderSlot);
286 0 : auto after_receive = std::chrono::steady_clock::now();
287 0 : init_wait_metric += std::chrono::duration_cast<artdaq::TimeUtils::seconds>(before_receive - start_loop).count();
288 0 : size_t thisSize = 0;
289 0 : if (senderSlot >= artdaq::TransferInterface::RECV_SUCCESS && ignoreFragPtr)
290 : {
291 0 : last_receive = std::chrono::steady_clock::now();
292 0 : if (ignoreFragPtr->type() == artdaq::Fragment::EndOfDataFragmentType)
293 : {
294 0 : TLOG(TLVL_INFO) << "Receiver " << my_rank << " received EndOfData Fragment from Sender " << senderSlot;
295 0 : activeSenders--;
296 0 : TLOG(TLVL_DEBUG + 32) << "Active Senders is now " << activeSenders;
297 : }
298 0 : else if (ignoreFragPtr->type() != artdaq::Fragment::DataFragmentType)
299 : {
300 0 : TLOG(TLVL_WARNING) << "Receiver " << my_rank << " received Fragment with System type " << artdaq::detail::RawFragmentHeader::SystemTypeToString(ignoreFragPtr->type()) << " (Unexpected!)";
301 : }
302 : else
303 : {
304 0 : if (first)
305 : {
306 0 : start_time_ = std::chrono::steady_clock::now();
307 0 : first = false;
308 : }
309 0 : counter--;
310 0 : TLOG(TLVL_INFO) << "Receiver " << my_rank << " received fragment " << receives_each_receiver_ - counter
311 0 : << " with seqID " << ignoreFragPtr->sequenceID() << " from Sender " << senderSlot << " (Expecting " << counter << " more)";
312 0 : thisSize = ignoreFragPtr->size() * sizeof(artdaq::RawDataType);
313 0 : totalSize += thisSize;
314 0 : if (validate_mode_)
315 : {
316 0 : for (size_t ii = 0; ii < ignoreFragPtr->dataSize(); ++ii)
317 : {
318 0 : if (*(ignoreFragPtr->dataBegin() + ii) != ignoreFragPtr->sequenceID() + ii + 1)
319 : {
320 0 : TLOG(TLVL_ERROR) << "Output Data corruption detected! (" << *(ignoreFragPtr->dataBegin() + ii) << " != " << (ignoreFragPtr->sequenceID() + ii + 1) << " at position " << ii << ") Aborting!";
321 0 : return_code_ = -3;
322 0 : return std::make_pair(0, 0.0);
323 : }
324 : }
325 : }
326 : }
327 0 : input_wait_metric += std::chrono::duration_cast<artdaq::TimeUtils::seconds>(after_receive - end_loop).count();
328 : }
329 0 : else if (senderSlot == artdaq::TransferInterface::DATA_END)
330 : {
331 0 : TLOG(TLVL_ERROR) << "Receiver " << my_rank << " detected fatal protocol error! Reducing active sender count by one!" << std::endl;
332 0 : activeSenders--;
333 0 : TLOG(TLVL_DEBUG + 32) << "Active Senders is now " << activeSenders;
334 : }
335 0 : TLOG(TLVL_DEBUG + 34) << "do_receiving: Recv Loop end, counter is " << counter;
336 :
337 0 : auto total_recv_time = std::chrono::duration_cast<artdaq::TimeUtils::seconds>(after_receive - before_receive).count();
338 0 : recv_time_metric += total_recv_time;
339 0 : totalTime += total_recv_time;
340 0 : recv_size_metric += thisSize;
341 :
342 0 : if (metricMan && counter % metric_send_interval == 0)
343 : {
344 0 : metricMan->sendMetric("input_wait", input_wait_metric, "seconds", 3, MetricMode::Accumulate);
345 0 : metricMan->sendMetric("recv_init_time", init_wait_metric, "seconds", 3, MetricMode::Accumulate);
346 0 : metricMan->sendMetric("total_recv_time", recv_time_metric, "seconds", 3, MetricMode::Accumulate);
347 0 : metricMan->sendMetric("recv_rate", recv_size_metric / recv_time_metric, "B/s", 3, MetricMode::Average);
348 :
349 0 : input_wait_metric = 0.0;
350 0 : init_wait_metric = 0.0;
351 0 : recv_time_metric = 0.0;
352 0 : recv_size_metric = 0.0;
353 : }
354 :
355 0 : if (artdaq::TimeUtils::GetElapsedTime(last_receive) > 5.0)
356 : {
357 0 : TLOG(TLVL_ERROR) << "Senders appear to have stopped (no data for >5 seconds), aborting test! counter=" << counter;
358 0 : return std::make_pair(0, 0.0);
359 : }
360 0 : end_loop = std::chrono::steady_clock::now();
361 0 : }
362 :
363 0 : if (counter != 0 && !nonblocking_mode)
364 : {
365 0 : TLOG(TLVL_ERROR) << "Did not receive all expected Fragments! Missing " << counter << " Fragments!";
366 0 : return_code_ = counter;
367 0 : return std::make_pair(0, 0.0);
368 : }
369 :
370 0 : return std::make_pair(totalSize, totalTime);
371 0 : }
|