Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh" // include these 2 first -
2 : #define TRACE_NAME (app_name + "_BoardReaderCore").c_str()
3 :
4 : #include "artdaq-core/Core/MonitoredQuantity.hh"
5 : #include "artdaq-core/Data/Fragment.hh"
6 : #include "artdaq-core/Utilities/ExceptionHandler.hh"
7 : #include "artdaq/Application/BoardReaderCore.hh"
8 : #include "artdaq/Application/TaskType.hh"
9 : #include "artdaq/Generators/makeCommandableFragmentGenerator.hh"
10 :
11 : #include "cetlib_except/exception.h"
12 : #include "fhiclcpp/ParameterSet.h"
13 :
14 : #include <boost/lexical_cast.hpp>
15 :
16 : #include <pthread.h>
17 : #include <sched.h>
18 : #include <algorithm>
19 : #include <memory>
20 : #include <thread>
21 :
22 : const std::string artdaq::BoardReaderCore::
23 : FRAGMENTS_PROCESSED_STAT_KEY("BoardReaderCoreFragmentsProcessed");
24 : const std::string artdaq::BoardReaderCore::
25 : INPUT_WAIT_STAT_KEY("BoardReaderCoreInputWaitTime");
26 : const std::string artdaq::BoardReaderCore::BUFFER_WAIT_STAT_KEY("BoardReaderCoreBufferWaitTime");
27 : const std::string artdaq::BoardReaderCore::REQUEST_WAIT_STAT_KEY("BoardReaderCoreRequestWaitTime");
28 : const std::string artdaq::BoardReaderCore::
29 : OUTPUT_WAIT_STAT_KEY("BoardReaderCoreOutputWaitTime");
30 : const std::string artdaq::BoardReaderCore::
31 : FRAGMENTS_PER_READ_STAT_KEY("BoardReaderCoreFragmentsPerRead");
32 :
33 : std::unique_ptr<artdaq::DataSenderManager> artdaq::BoardReaderCore::sender_ptr_ = nullptr;
34 :
35 0 : artdaq::BoardReaderCore::BoardReaderCore(Commandable& parent_application)
36 0 : : parent_application_(parent_application)
37 : /*, local_group_comm_(local_group_comm)*/
38 0 : , generator_ptr_(nullptr)
39 0 : , run_id_(art::RunID::flushRun())
40 0 : , fragment_count_(0)
41 0 : , stop_requested_(false)
42 0 : , pause_requested_(false)
43 : {
44 0 : TLOG(TLVL_DEBUG + 32) << "Constructor";
45 0 : statsHelper_.addMonitoredQuantityName(FRAGMENTS_PROCESSED_STAT_KEY);
46 0 : statsHelper_.addMonitoredQuantityName(INPUT_WAIT_STAT_KEY);
47 0 : statsHelper_.addMonitoredQuantityName(BUFFER_WAIT_STAT_KEY);
48 0 : statsHelper_.addMonitoredQuantityName(REQUEST_WAIT_STAT_KEY);
49 0 : statsHelper_.addMonitoredQuantityName(OUTPUT_WAIT_STAT_KEY);
50 0 : statsHelper_.addMonitoredQuantityName(FRAGMENTS_PER_READ_STAT_KEY);
51 0 : }
52 :
53 0 : artdaq::BoardReaderCore::~BoardReaderCore()
54 : {
55 0 : TLOG(TLVL_DEBUG + 32) << "Destructor";
56 0 : TLOG(TLVL_DEBUG + 32) << "Stopping Request Receiver BEGIN";
57 0 : request_receiver_ptr_.reset(nullptr);
58 0 : TLOG(TLVL_DEBUG + 32) << "Stopping Request Receiver END";
59 0 : }
60 :
61 0 : bool artdaq::BoardReaderCore::initialize(fhicl::ParameterSet const& pset, uint64_t /*unused*/, uint64_t /*unused*/)
62 : {
63 0 : TLOG(TLVL_DEBUG + 32) << "initialize method called with "
64 0 : << "ParameterSet = \"" << pset.to_string() << "\".";
65 :
66 : // pull out the relevant parts of the ParameterSet
67 0 : fhicl::ParameterSet daq_pset;
68 : try
69 : {
70 0 : daq_pset = pset.get<fhicl::ParameterSet>("daq");
71 : }
72 0 : catch (...)
73 : {
74 0 : TLOG(TLVL_ERROR)
75 0 : << "Unable to find the DAQ parameters in the initialization "
76 0 : << "ParameterSet: \"" + pset.to_string() + "\".";
77 0 : return false;
78 0 : }
79 0 : fhicl::ParameterSet fr_pset;
80 : try
81 : {
82 0 : fr_pset = daq_pset.get<fhicl::ParameterSet>("fragment_receiver");
83 0 : data_pset_ = fr_pset;
84 : }
85 0 : catch (...)
86 : {
87 0 : TLOG(TLVL_ERROR)
88 0 : << "Unable to find the fragment_receiver parameters in the DAQ "
89 0 : << "initialization ParameterSet: \"" + daq_pset.to_string() + "\".";
90 0 : return false;
91 0 : }
92 :
93 : // pull out the Metric part of the ParameterSet
94 0 : fhicl::ParameterSet metric_pset;
95 : try
96 : {
97 0 : metric_pset = daq_pset.get<fhicl::ParameterSet>("metrics");
98 : }
99 0 : catch (...)
100 0 : {} // OK if there's no metrics table defined in the FHiCL
101 :
102 0 : if (metric_pset.is_empty())
103 : {
104 0 : TLOG(TLVL_INFO) << "No metric plugins appear to be defined";
105 : }
106 : try
107 : {
108 0 : metricMan->initialize(metric_pset, app_name);
109 : }
110 0 : catch (...)
111 : {
112 0 : ExceptionHandler(ExceptionHandlerRethrow::no,
113 : "Error loading metrics in BoardReaderCore::initialize()");
114 0 : }
115 :
116 0 : if (daq_pset.has_key("rank"))
117 : {
118 0 : if (my_rank >= 0 && daq_pset.get<int>("rank") != my_rank)
119 : {
120 0 : TLOG(TLVL_WARNING) << "BoardReader rank specified at startup is different than rank specified at configure! Using rank received at configure!";
121 : }
122 0 : my_rank = daq_pset.get<int>("rank");
123 : }
124 0 : if (my_rank == -1)
125 : {
126 0 : TLOG(TLVL_ERROR) << "BoardReader rank not specified at startup or in configuration! Aborting";
127 0 : throw cet::exception("RankNotSpecifiedError") << "BoardReader rank not specified at startup or in configuration! Aborting";
128 : }
129 :
130 : // create the requested CommandableFragmentGenerator
131 0 : auto frag_gen_name = fr_pset.get<std::string>("generator", "");
132 0 : if (frag_gen_name.length() == 0)
133 : {
134 0 : TLOG(TLVL_ERROR)
135 0 : << "No fragment generator (parameter name = \"generator\") was "
136 0 : << "specified in the fragment_receiver ParameterSet. The "
137 0 : << "DAQ initialization PSet was \"" << daq_pset.to_string() << "\".";
138 0 : return false;
139 : }
140 :
141 : try
142 : {
143 0 : generator_ptr_ = artdaq::makeCommandableFragmentGenerator(frag_gen_name, fr_pset);
144 : }
145 0 : catch (...)
146 : {
147 0 : std::stringstream exception_string;
148 : exception_string << "Exception thrown during initialization of fragment generator of type \""
149 0 : << frag_gen_name << "\"";
150 :
151 0 : ExceptionHandler(ExceptionHandlerRethrow::no, exception_string.str());
152 :
153 0 : TLOG(TLVL_DEBUG + 32) << "FHiCL parameter set used to initialize the fragment generator which threw an exception: " << fr_pset.to_string();
154 :
155 0 : return false;
156 0 : }
157 :
158 : try
159 : {
160 0 : fragment_buffer_ptr_ = std::make_shared<FragmentBuffer>(fr_pset);
161 : }
162 0 : catch (...)
163 : {
164 0 : std::stringstream exception_string;
165 0 : exception_string << "Exception thrown during initialization of Fragment Buffer";
166 :
167 0 : ExceptionHandler(ExceptionHandlerRethrow::no, exception_string.str());
168 :
169 0 : TLOG(TLVL_DEBUG + 32) << "FHiCL parameter set used to initialize the fragment buffer which threw an exception: " << fr_pset.to_string();
170 :
171 0 : return false;
172 0 : }
173 :
174 0 : std::shared_ptr<RequestBuffer> request_buffer = std::make_shared<RequestBuffer>(fr_pset.get<artdaq::Fragment::sequence_id_t>("request_increment", 1));
175 :
176 : try
177 : {
178 0 : request_receiver_ptr_.reset(new RequestReceiver(fr_pset, request_buffer));
179 0 : generator_ptr_->SetRequestBuffer(request_buffer);
180 0 : generator_ptr_->SetFragmentBuffer(fragment_buffer_ptr_);
181 0 : fragment_buffer_ptr_->SetRequestBuffer(request_buffer);
182 : }
183 0 : catch (...)
184 : {
185 0 : ExceptionHandler(ExceptionHandlerRethrow::no, "Exception thrown during initialization of request receiver");
186 :
187 0 : TLOG(TLVL_DEBUG + 32) << "FHiCL parameter set used to initialize the request receiver which threw an exception: " << fr_pset.to_string();
188 :
189 0 : return false;
190 0 : }
191 0 : metricMan->setPrefix(generator_ptr_->metricsReportingInstanceName());
192 :
193 0 : rt_priority_ = fr_pset.get<int>("rt_priority", 0);
194 :
195 : // fetch the monitoring parameters and create the MonitoredQuantity instances
196 0 : statsHelper_.createCollectors(fr_pset, 100, 30.0, 60.0, FRAGMENTS_PER_READ_STAT_KEY);
197 :
198 : // check if we should skip the sequence ID test...
199 0 : skip_seqId_test_ = (fr_pset.get<bool>("skip_seqID_test", false) || generator_ptr_->fragmentIDs().size() > 1 || fragment_buffer_ptr_->request_mode() != RequestMode::Ignored);
200 :
201 0 : verbose_ = fr_pset.get<bool>("verbose", true);
202 :
203 0 : return true;
204 0 : }
205 :
206 0 : bool artdaq::BoardReaderCore::start(art::RunID id, uint64_t timeout, uint64_t timestamp)
207 : {
208 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Starting run " << id.run();
209 0 : stop_requested_.store(false);
210 0 : pause_requested_.store(false);
211 :
212 0 : fragment_count_ = 0;
213 0 : prev_seq_id_ = 0;
214 0 : statsHelper_.resetStatistics();
215 :
216 0 : fragment_buffer_ptr_->Reset(false);
217 :
218 0 : metricMan->do_start();
219 :
220 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Pausing for 1s";
221 0 : usleep(1000000);
222 :
223 0 : generator_ptr_->StartCmd(id.run(), timeout, timestamp);
224 0 : run_id_ = id;
225 :
226 0 : request_receiver_ptr_->SetRunNumber(static_cast<uint32_t>(id.run()));
227 0 : request_receiver_ptr_->startRequestReception();
228 :
229 0 : running_ = true;
230 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Start transition (Started run) for run " << run_id_.run()
231 0 : << ", timeout = " << timeout << ", timestamp = " << timestamp;
232 0 : return true;
233 : }
234 :
235 0 : bool artdaq::BoardReaderCore::stop(uint64_t timeout, uint64_t timestamp)
236 : {
237 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Stopping run " << run_id_.run() << " after " << fragment_count_ << " fragments.";
238 0 : stop_requested_.store(true);
239 :
240 0 : TLOG(TLVL_DEBUG + 32) << "Stopping Request reception BEGIN";
241 0 : request_receiver_ptr_->stopRequestReception();
242 0 : TLOG(TLVL_DEBUG + 32) << "Stopping Request reception END";
243 :
244 0 : TLOG(TLVL_DEBUG + 32) << "Stopping CommandableFragmentGenerator BEGIN";
245 0 : generator_ptr_->StopCmd(timeout, timestamp);
246 0 : TLOG(TLVL_DEBUG + 32) << "Stopping CommandableFragmentGenerator END";
247 :
248 0 : TLOG(TLVL_DEBUG + 32) << "Stopping FragmentBuffer";
249 0 : fragment_buffer_ptr_->Stop();
250 :
251 0 : TLOG(TLVL_DEBUG + 32) << "Stopping DataSenderManager";
252 0 : if (sender_ptr_)
253 : {
254 0 : sender_ptr_->StopSender();
255 : }
256 :
257 0 : running_ = false;
258 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Stop transition for run " << run_id_.run();
259 0 : return true;
260 : }
261 :
262 0 : bool artdaq::BoardReaderCore::pause(uint64_t timeout, uint64_t timestamp)
263 : {
264 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Pausing run " << run_id_.run() << " after " << fragment_count_ << " fragments.";
265 0 : pause_requested_.store(true);
266 0 : generator_ptr_->PauseCmd(timeout, timestamp);
267 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Pause transition for run " << run_id_.run();
268 0 : return true;
269 : }
270 :
271 0 : bool artdaq::BoardReaderCore::resume(uint64_t timeout, uint64_t timestamp)
272 : {
273 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Resuming run " << run_id_.run();
274 0 : pause_requested_.store(false);
275 0 : metricMan->do_start();
276 0 : generator_ptr_->ResumeCmd(timeout, timestamp);
277 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Resume transition for run " << run_id_.run();
278 0 : return true;
279 : }
280 :
281 0 : bool artdaq::BoardReaderCore::shutdown(uint64_t /*unused*/)
282 : {
283 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Starting Shutdown transition";
284 0 : generator_ptr_->joinThreads(); // Cleanly shut down the CommandableFragmentGenerator
285 0 : generator_ptr_.reset(nullptr);
286 0 : metricMan->shutdown();
287 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed Shutdown transition";
288 0 : return true;
289 : }
290 :
291 0 : bool artdaq::BoardReaderCore::soft_initialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp)
292 : {
293 0 : TLOG(TLVL_DEBUG + 32) << "soft_initialize method called with "
294 0 : << "ParameterSet = \"" << pset.to_string()
295 0 : << "\". Forwarding to initialize.";
296 0 : return initialize(pset, timeout, timestamp);
297 : }
298 :
299 0 : bool artdaq::BoardReaderCore::reinitialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp)
300 : {
301 0 : TLOG(TLVL_DEBUG + 32) << "reinitialize method called with "
302 0 : << "ParameterSet = \"" << pset.to_string()
303 0 : << "\". Forwarding to initalize.";
304 0 : return initialize(pset, timeout, timestamp);
305 : }
306 :
307 0 : void artdaq::BoardReaderCore::receive_fragments()
308 : {
309 0 : if (rt_priority_ > 0)
310 : {
311 : #pragma GCC diagnostic push
312 : #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
313 0 : sched_param s_param = {};
314 0 : s_param.sched_priority = rt_priority_;
315 0 : if (pthread_setschedparam(pthread_self(), SCHED_RR, &s_param))
316 0 : TLOG(TLVL_WARNING) << "setting realtime priority failed";
317 : #pragma GCC diagnostic pop
318 : }
319 :
320 : // try-catch block here?
321 :
322 : // how to turn RT PRI off?
323 0 : if (rt_priority_ > 0)
324 : {
325 : #pragma GCC diagnostic push
326 : #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
327 0 : sched_param s_param = {};
328 0 : s_param.sched_priority = rt_priority_;
329 0 : int status = pthread_setschedparam(pthread_self(), SCHED_RR, &s_param);
330 0 : if (status != 0)
331 : {
332 0 : TLOG(TLVL_ERROR)
333 0 : << "Failed to set realtime priority to " << rt_priority_
334 0 : << ", return code = " << status;
335 : }
336 : #pragma GCC diagnostic pop
337 : }
338 :
339 0 : TLOG(TLVL_DEBUG + 32) << "Waiting for first fragment.";
340 : artdaq::MonitoredQuantityStats::TIME_POINT_T startTime, after_input, after_buffer;
341 0 : artdaq::FragmentPtrs frags;
342 :
343 0 : receiver_thread_active_ = true;
344 :
345 0 : auto wait_start = std::chrono::steady_clock::now();
346 0 : while (!running_ && TimeUtils::GetElapsedTime(wait_start) < start_transition_timeout_)
347 : {
348 0 : usleep(10000);
349 : }
350 0 : if (!running_)
351 : {
352 0 : TLOG(TLVL_ERROR) << "Timeout (" << start_transition_timeout_ << " s) while waiting for Start after receive_fragments thread started!";
353 0 : receiver_thread_active_ = false;
354 : }
355 :
356 0 : while (receiver_thread_active_)
357 : {
358 0 : startTime = artdaq::MonitoredQuantity::getCurrentTime();
359 :
360 0 : TLOG(TLVL_DEBUG + 35) << "receive_fragments getNext start";
361 0 : receiver_thread_active_ = generator_ptr_->getNext(frags);
362 0 : TLOG(TLVL_DEBUG + 35) << "receive_fragments getNext done (receiver_thread_active_=" << receiver_thread_active_ << ")";
363 :
364 : // 08-May-2015, KAB & JCF: if the generator getNext() method returns false
365 : // (which indicates that the data flow has stopped) *and* the reason that
366 : // it has stopped is because there was an exception that wasn't handled by
367 : // the experiment-specific FragmentGenerator class, we move to the
368 : // InRunError state so that external observers (e.g. RunControl or
369 : // DAQInterface) can see that there was a problem.
370 0 : if (!receiver_thread_active_ && generator_ptr_ && generator_ptr_->exception())
371 : {
372 0 : parent_application_.in_run_failure();
373 : }
374 :
375 0 : after_input = artdaq::MonitoredQuantity::getCurrentTime();
376 :
377 0 : if (!receiver_thread_active_) { break; }
378 0 : statsHelper_.addSample(FRAGMENTS_PER_READ_STAT_KEY, frags.size());
379 :
380 0 : if (frags.size() > 0)
381 : {
382 0 : TLOG(TLVL_DEBUG + 35) << "receive_fragments AddFragmentsToBuffer start";
383 0 : fragment_buffer_ptr_->AddFragmentsToBuffer(std::move(frags));
384 0 : TLOG(TLVL_DEBUG + 35) << "receive_fragments AddFragmentsToBuffer done";
385 : }
386 :
387 0 : after_buffer = artdaq::MonitoredQuantity::getCurrentTime();
388 0 : TLOG(TLVL_DEBUG + 34) << "receive_fragments INPUT_WAIT=" << (after_input - startTime) << ", BUFFER_WAIT=" << (after_buffer - after_input);
389 0 : statsHelper_.addSample(INPUT_WAIT_STAT_KEY, after_input - startTime);
390 0 : statsHelper_.addSample(BUFFER_WAIT_STAT_KEY, after_buffer - after_input);
391 0 : if (statsHelper_.statsRollingWindowHasMoved()) { sendMetrics_(); }
392 0 : frags.clear();
393 : }
394 :
395 : // 11-May-2015, KAB: call MetricManager::do_stop whenever we exit the
396 : // processing fragments loop so that metrics correctly go to zero when
397 : // there is no data flowing
398 0 : metricMan->do_stop();
399 :
400 0 : TLOG(TLVL_DEBUG + 32) << "receive_fragments loop end";
401 0 : }
402 0 : void artdaq::BoardReaderCore::send_fragments()
403 : {
404 0 : if (rt_priority_ > 0)
405 : {
406 : #pragma GCC diagnostic push
407 : #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
408 0 : sched_param s_param = {};
409 0 : s_param.sched_priority = rt_priority_;
410 0 : if (pthread_setschedparam(pthread_self(), SCHED_RR, &s_param) != 0)
411 : {
412 0 : TLOG(TLVL_WARNING) << "setting realtime priority failed";
413 : }
414 : #pragma GCC diagnostic pop
415 : }
416 :
417 : // try-catch block here?
418 :
419 : // how to turn RT PRI off?
420 0 : if (rt_priority_ > 0)
421 : {
422 : #pragma GCC diagnostic push
423 : #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
424 0 : sched_param s_param = {};
425 0 : s_param.sched_priority = rt_priority_;
426 0 : int status = pthread_setschedparam(pthread_self(), SCHED_RR, &s_param);
427 0 : if (status != 0)
428 : {
429 0 : TLOG(TLVL_ERROR)
430 0 : << "Failed to set realtime priority to " << rt_priority_
431 0 : << ", return code = " << status;
432 : }
433 : #pragma GCC diagnostic pop
434 : }
435 :
436 0 : TLOG(TLVL_DEBUG + 32) << "Initializing DataSenderManager. my_rank=" << my_rank;
437 0 : sender_ptr_ = std::make_unique<artdaq::DataSenderManager>(data_pset_);
438 :
439 0 : TLOG(TLVL_DEBUG + 32) << "Waiting for first fragment.";
440 : artdaq::MonitoredQuantityStats::TIME_POINT_T startTime;
441 : double delta_time;
442 0 : artdaq::FragmentPtrs frags;
443 0 : auto targetFragCount = generator_ptr_->fragmentIDs().size();
444 :
445 0 : sender_thread_active_ = true;
446 :
447 0 : auto wait_start = std::chrono::steady_clock::now();
448 0 : while (!running_ && TimeUtils::GetElapsedTime(wait_start) < start_transition_timeout_)
449 : {
450 0 : usleep(10000);
451 : }
452 0 : if (!running_)
453 : {
454 0 : TLOG(TLVL_ERROR) << "Timeout (" << start_transition_timeout_ << " s) while waiting for Start after send_fragments thread started!";
455 0 : sender_thread_active_ = false;
456 : }
457 :
458 0 : while (sender_thread_active_)
459 : {
460 0 : startTime = artdaq::MonitoredQuantity::getCurrentTime();
461 :
462 0 : TLOG(TLVL_DEBUG + 35) << "send_fragments applyRequests start";
463 0 : sender_thread_active_ = fragment_buffer_ptr_->applyRequests(frags);
464 0 : TLOG(TLVL_DEBUG + 35) << "send_fragments applyRequests done (sender_thread_active_=" << sender_thread_active_ << ")";
465 : // 08-May-2015, KAB & JCF: if the generator getNext() method returns false
466 : // (which indicates that the data flow has stopped) *and* the reason that
467 : // it has stopped is because there was an exception that wasn't handled by
468 : // the experiment-specific FragmentGenerator class, we move to the
469 : // InRunError state so that external observers (e.g. RunControl or
470 : // DAQInterface) can see that there was a problem.
471 0 : if (!sender_thread_active_ && generator_ptr_ && generator_ptr_->exception())
472 : {
473 0 : parent_application_.in_run_failure();
474 : }
475 :
476 0 : delta_time = artdaq::MonitoredQuantity::getCurrentTime() - startTime;
477 :
478 0 : TLOG(TLVL_DEBUG + 34) << "send_fragments REQUEST_WAIT=" << delta_time;
479 0 : statsHelper_.addSample(REQUEST_WAIT_STAT_KEY, delta_time);
480 :
481 0 : if (!sender_thread_active_) { break; }
482 :
483 0 : for (auto& fragPtr : frags)
484 : {
485 0 : if (fragPtr == nullptr)
486 : {
487 0 : TLOG(TLVL_WARNING) << "Encountered a bad fragment pointer in fragment " << fragment_count_ << ". "
488 0 : << "This is most likely caused by a problem with the Fragment Generator!";
489 0 : continue;
490 0 : }
491 0 : if (fragment_count_ == 0)
492 : {
493 0 : TLOG(TLVL_DEBUG + 32) << "Received first Fragment from Fragment Generator, sequence ID " << fragPtr->sequenceID() << ", size = " << fragPtr->sizeBytes() << " bytes.";
494 : }
495 :
496 0 : if (artdaq::Fragment::isBroadcastFragmentType(fragPtr->type()))
497 : {
498 : // Just broadcast any system Fragments in the output
499 0 : artdaq::Fragment::sequence_id_t sequence_id = fragPtr->sequenceID();
500 0 : statsHelper_.addSample(FRAGMENTS_PROCESSED_STAT_KEY, fragPtr->sizeBytes());
501 :
502 0 : startTime = artdaq::MonitoredQuantity::getCurrentTime();
503 0 : TLOG(TLVL_DEBUG + 36) << "send_fragments seq=" << sequence_id << " sendFragment start";
504 0 : auto res = sender_ptr_->sendFragment(std::move(*fragPtr));
505 0 : TLOG(TLVL_DEBUG + 36) << "send_fragments seq=" << sequence_id << " sendFragment done (dest=" << res.first << ", sts=" << TransferInterface::CopyStatusToString(res.second) << ")";
506 0 : ++fragment_count_;
507 0 : statsHelper_.addSample(OUTPUT_WAIT_STAT_KEY,
508 0 : artdaq::MonitoredQuantity::getCurrentTime() - startTime);
509 0 : continue;
510 0 : }
511 :
512 0 : artdaq::Fragment::sequence_id_t sequence_id = fragPtr->sequenceID();
513 0 : Globals::SetMFIteration("Sequence ID " + std::to_string(sequence_id));
514 0 : statsHelper_.addSample(FRAGMENTS_PROCESSED_STAT_KEY, fragPtr->sizeBytes());
515 :
516 : /*if ((fragment_count_ % 250) == 0)
517 : {
518 : TLOG(TLVL_DEBUG + 32)
519 : << "Sending fragment " << fragment_count_
520 : << " with sequence id " << sequence_id << ".";
521 : }*/
522 :
523 : // check for continous sequence IDs
524 0 : if (!skip_seqId_test_ && abs(static_cast<int64_t>(sequence_id) - static_cast<int64_t>(prev_seq_id_)) > 1)
525 : {
526 0 : TLOG(TLVL_WARNING)
527 0 : << "Missing sequence IDs: current sequence ID = "
528 0 : << sequence_id << ", previous sequence ID = "
529 0 : << prev_seq_id_ << ".";
530 : }
531 0 : prev_seq_id_ = sequence_id;
532 :
533 0 : startTime = artdaq::MonitoredQuantity::getCurrentTime();
534 0 : TLOG(TLVL_DEBUG + 36) << "send_fragments seq=" << sequence_id << " sendFragment start";
535 0 : auto res = sender_ptr_->sendFragment(std::move(*fragPtr));
536 0 : if (sender_ptr_->GetSentSequenceIDCount(sequence_id) == targetFragCount)
537 : {
538 0 : sender_ptr_->RemoveRoutingTableEntry(sequence_id);
539 : }
540 0 : TLOG(TLVL_DEBUG + 36) << "send_fragments seq=" << sequence_id << " sendFragment done (dest=" << res.first << ", sts=" << TransferInterface::CopyStatusToString(res.second) << ")";
541 0 : ++fragment_count_;
542 0 : statsHelper_.addSample(OUTPUT_WAIT_STAT_KEY,
543 0 : artdaq::MonitoredQuantity::getCurrentTime() - startTime);
544 :
545 0 : bool readyToReport = statsHelper_.readyToReport();
546 0 : if (readyToReport)
547 : {
548 0 : TLOG(TLVL_INFO) << buildStatisticsString_();
549 : }
550 :
551 : // Turn on lvls (mem and/or slow) 3,13,14 to log every send.
552 0 : TLOG(((fragment_count_ == 1) ? TLVL_DEBUG + 32
553 : : (((fragment_count_ % 250) == 0 || readyToReport) ? TLVL_DEBUG + 36 : TLVL_DEBUG + 37)))
554 0 : << ((fragment_count_ == 1)
555 0 : ? "Sent first Fragment"
556 0 : : "Sending fragment " + std::to_string(fragment_count_))
557 0 : << " with SeqID " << sequence_id << ".";
558 : }
559 0 : if (statsHelper_.statsRollingWindowHasMoved()) { sendMetrics_(); }
560 0 : frags.clear();
561 0 : std::this_thread::yield();
562 : }
563 :
564 0 : sender_ptr_.reset(nullptr);
565 :
566 : // 11-May-2015, KAB: call MetricManager::do_stop whenever we exit the
567 : // processing fragments loop so that metrics correctly go to zero when
568 : // there is no data flowing
569 0 : metricMan->do_stop();
570 :
571 0 : TLOG(TLVL_DEBUG + 32) << "send_fragments loop end";
572 0 : }
573 :
574 0 : std::string artdaq::BoardReaderCore::report(std::string const& which) const
575 : {
576 0 : std::string resultString;
577 :
578 : // pass the request to the FragmentGenerator instance, if it's available
579 0 : if (generator_ptr_ != nullptr && which != "core")
580 : {
581 0 : resultString = generator_ptr_->ReportCmd(which);
582 0 : if (resultString.length() > 0) { return resultString; }
583 : }
584 :
585 : // handle the request at this level, if we can
586 : // --> nothing here yet
587 :
588 : // if we haven't been able to come up with any report so far, say so
589 0 : std::string tmpString = app_name + " run number = ";
590 0 : tmpString.append(boost::lexical_cast<std::string>(run_id_.run()));
591 :
592 0 : tmpString.append(", Sent Fragment count = ");
593 0 : tmpString.append(boost::lexical_cast<std::string>(fragment_count_));
594 0 : if (which == "core")
595 : {
596 : // do nothing
597 : }
598 : //-----------------------------------------------------------------------------
599 : // P.Murat: add statistics report, the const/non const confusion to be cleaned up
600 : // by the maintainers
601 : //-----------------------------------------------------------------------------
602 0 : else if (which == "stats")
603 : {
604 0 : auto non_const_this = (artdaq::BoardReaderCore*)this;
605 0 : tmpString += ", " + non_const_this->buildStatisticsString_();
606 : }
607 : else
608 : {
609 0 : tmpString.append(". Command=\"" + which + "\" is not currently supported.");
610 : }
611 0 : return tmpString;
612 0 : }
613 :
614 0 : bool artdaq::BoardReaderCore::metaCommand(std::string const& command, std::string const& arg)
615 : {
616 0 : TLOG(TLVL_DEBUG + 32) << "metaCommand method called with "
617 0 : << "command = \"" << command << "\""
618 0 : << ", arg = \"" << arg << "\""
619 0 : << ".";
620 :
621 0 : if (generator_ptr_)
622 : {
623 0 : return generator_ptr_->metaCommand(command, arg);
624 : }
625 :
626 0 : return true;
627 : }
628 :
629 0 : std::string artdaq::BoardReaderCore::buildStatisticsString_()
630 : {
631 0 : std::ostringstream oss;
632 0 : double fragmentsGeneratedCount = 1.0;
633 0 : double fragmentsOutputCount = 1.0;
634 0 : oss << app_name << " statistics:" << std::endl;
635 :
636 0 : oss << " Fragments read: ";
637 0 : artdaq::MonitoredQuantityPtr mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(FRAGMENTS_PER_READ_STAT_KEY);
638 0 : if (mqPtr.get() != nullptr)
639 : {
640 0 : artdaq::MonitoredQuantityStats stats;
641 0 : mqPtr->getStats(stats);
642 0 : oss << stats.recentValueSum << " fragments generated at "
643 0 : << stats.recentSampleRate << " getNext calls/sec, fragment rate = "
644 0 : << stats.recentValueRate << " fragments/sec, monitor window = "
645 0 : << stats.recentDuration << " sec, min::max read size = "
646 0 : << stats.recentValueMin
647 0 : << "::"
648 0 : << stats.recentValueMax
649 0 : << " fragments";
650 0 : fragmentsGeneratedCount = std::max(double(stats.recentSampleCount), 1.0);
651 0 : oss << " Average times per fragment: ";
652 0 : if (stats.recentSampleRate > 0.0)
653 : {
654 0 : oss << " elapsed time = "
655 0 : << (1.0 / stats.recentSampleRate) << " sec";
656 : }
657 0 : }
658 :
659 0 : oss << std::endl;
660 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(FRAGMENTS_PROCESSED_STAT_KEY);
661 0 : if (mqPtr.get() != nullptr)
662 : {
663 0 : artdaq::MonitoredQuantityStats stats;
664 0 : mqPtr->getStats(stats);
665 0 : oss << " Fragment output statistics: "
666 0 : << stats.recentSampleCount << " fragments sent at "
667 0 : << stats.recentSampleRate << " fragments/sec, effective data rate = "
668 0 : << (stats.recentValueRate / 1024.0 / 1024.0) << " MB/sec, monitor window = "
669 0 : << stats.recentDuration << " sec, min::max event size = "
670 0 : << (stats.recentValueMin / 1024.0 / 1024.0)
671 0 : << "::"
672 0 : << (stats.recentValueMax / 1024.0 / 1024.0)
673 0 : << " MB" << std::endl;
674 0 : fragmentsOutputCount = std::max(double(stats.recentSampleCount), 1.0);
675 0 : }
676 :
677 : // 31-Dec-2014, KAB - Just a reminder that using "fragmentCount" in the
678 : // denominator of the calculations below is important because the way that
679 : // the accumulation of these statistics is done is not fragment-by-fragment
680 : // but read-by-read (where each read can contain multiple fragments).
681 : // 29-Aug-2016, KAB - BRSYNC_WAIT and OUTPUT_WAIT are now done fragment-by-
682 : // fragment, but we'll leave the calculation the same. (The alternative
683 : // would be to use recentValueAverage().)
684 :
685 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(INPUT_WAIT_STAT_KEY);
686 0 : if (mqPtr.get() != nullptr)
687 : {
688 0 : oss << " Input wait time = "
689 0 : << (mqPtr->getRecentValueSum() / fragmentsGeneratedCount) << " s/fragment";
690 : }
691 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(BUFFER_WAIT_STAT_KEY);
692 0 : if (mqPtr.get() != 0)
693 : {
694 0 : oss << ", buffer wait time = "
695 0 : << (mqPtr->getRecentValueSum() / fragmentsGeneratedCount) << " s/fragment";
696 : }
697 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(REQUEST_WAIT_STAT_KEY);
698 0 : if (mqPtr.get() != 0)
699 : {
700 0 : oss << ", request wait time = "
701 0 : << (mqPtr->getRecentValueSum() / fragmentsOutputCount) << " s/fragment";
702 : }
703 :
704 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(OUTPUT_WAIT_STAT_KEY);
705 0 : if (mqPtr.get() != nullptr)
706 : {
707 0 : oss << ", output wait time = "
708 0 : << (mqPtr->getRecentValueSum() / fragmentsOutputCount) << " s/fragment";
709 : }
710 : //-----------------------------------------------------------------------------
711 : // 2024-01-13 P.Murat: add SHM data
712 : //-----------------------------------------------------------------------------
713 0 : oss << fragment_buffer_ptr_->getStatReport();
714 :
715 0 : return oss.str();
716 0 : }
717 :
718 0 : void artdaq::BoardReaderCore::sendMetrics_()
719 : {
720 : // TLOG(TLVL_DEBUG + 32) << "Sending metrics " << __LINE__ ;
721 0 : double fragmentCount = 1.0;
722 0 : artdaq::MonitoredQuantityPtr mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(FRAGMENTS_PROCESSED_STAT_KEY);
723 0 : if (mqPtr.get() != nullptr)
724 : {
725 0 : artdaq::MonitoredQuantityStats stats;
726 0 : mqPtr->getStats(stats);
727 0 : fragmentCount = std::max(double(stats.recentSampleCount), 1.0);
728 0 : metricMan->sendMetric("Fragment Count", stats.fullSampleCount, "fragments", 1, MetricMode::LastPoint);
729 0 : metricMan->sendMetric("Fragment Rate", stats.recentSampleRate, "fragments/sec", 1, MetricMode::Average);
730 0 : metricMan->sendMetric("Average Fragment Size", stats.recentValueAverage, "bytes/fragment", 2, MetricMode::Average);
731 0 : metricMan->sendMetric("Data Rate", stats.recentValueRate, "bytes/sec", 2, MetricMode::Average);
732 0 : }
733 :
734 : // 31-Dec-2014, KAB - Just a reminder that using "fragmentCount" in the
735 : // denominator of the calculations below is important because the way that
736 : // the accumulation of these statistics is done is not fragment-by-fragment
737 : // but read-by-read (where each read can contain multiple fragments).
738 : // 29-Aug-2016, KAB - BRSYNC_WAIT and OUTPUT_WAIT are now done fragment-by-
739 : // fragment, but we'll leave the calculation the same. (The alternative
740 : // would be to use recentValueAverage().)
741 :
742 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(INPUT_WAIT_STAT_KEY);
743 0 : if (mqPtr.get() != nullptr)
744 : {
745 0 : metricMan->sendMetric("Avg Input Wait Time", (mqPtr->getRecentValueSum() / fragmentCount), "seconds/fragment", 3, MetricMode::Average);
746 : }
747 :
748 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(BUFFER_WAIT_STAT_KEY);
749 0 : if (mqPtr.get() != 0)
750 : {
751 0 : metricMan->sendMetric("Avg Buffer Wait Time", (mqPtr->getRecentValueSum() / fragmentCount), "seconds/fragment", 3, MetricMode::Average);
752 : }
753 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(REQUEST_WAIT_STAT_KEY);
754 0 : if (mqPtr.get() != 0)
755 : {
756 0 : metricMan->sendMetric("Avg Request Response Wait Time", (mqPtr->getRecentValueSum() / fragmentCount), "seconds/fragment", 3, MetricMode::Average);
757 : }
758 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(OUTPUT_WAIT_STAT_KEY);
759 0 : if (mqPtr.get() != nullptr)
760 : {
761 0 : metricMan->sendMetric("Avg Output Wait Time", (mqPtr->getRecentValueSum() / fragmentCount), "seconds/fragment", 3, MetricMode::Average);
762 : }
763 :
764 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(FRAGMENTS_PER_READ_STAT_KEY);
765 0 : if (mqPtr.get() != nullptr)
766 : {
767 0 : metricMan->sendMetric("Avg Frags Per Read", mqPtr->getRecentValueAverage(), "fragments/read", 4, MetricMode::Average);
768 : }
769 0 : }
|