Line data Source code
1 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
2 : #include "artdaq/DAQdata/Globals.hh"
3 : #define TRACE_NAME (app_name + "_CommandableFragmentGenerator").c_str() // include these 2 first -
4 :
5 : #include "artdaq/Generators/CommandableFragmentGenerator.hh"
6 :
7 : #include "artdaq-core/Data/ContainerFragmentLoader.hh"
8 : #include "artdaq-core/Data/Fragment.hh"
9 : #include "artdaq-core/Utilities/TimeUtils.hh"
10 : #include "artdaq/DAQdata/TCPConnect.hh"
11 :
12 : #include "cetlib_except/exception.h"
13 : #include "fhiclcpp/ParameterSet.h"
14 :
15 : #include <boost/exception/all.hpp>
16 : #include <boost/lexical_cast.hpp>
17 : #include <boost/thread.hpp>
18 :
19 : #include <sys/poll.h>
20 : #include <algorithm>
21 : #include <chrono>
22 : #include <exception>
23 : #include <fstream>
24 : #include <iomanip>
25 : #include <iostream>
26 : #include <iterator>
27 : #include <limits>
28 : #include <memory>
29 : #include <mutex>
30 : #include <thread>
31 :
32 : #define TLVL_GETNEXT 35
33 : #define TLVL_GETNEXT_VERBOSE 36
34 : #define TLVL_CHECKSTOP 37
35 : #define TLVL_EVCOUNTERINC 38
36 : #define TLVL_GETDATALOOP 39
37 : #define TLVL_GETDATALOOP_DATABUFFWAIT 40
38 : #define TLVL_GETDATALOOP_VERBOSE 41
39 : #define TLVL_WAITFORBUFFERREADY 42
40 : #define TLVL_GETBUFFERSTATS 43
41 : #define TLVL_CHECKDATABUFFER 44
42 : #define TLVL_GETMONITORINGDATA 45
43 : #define TLVL_APPLYREQUESTS 46
44 : #define TLVL_SENDEMPTYFRAGMENTS 47
45 : #define TLVL_CHECKWINDOWS 48
46 : #define TLVL_EMPTYFRAGMENT 49
47 :
48 7 : artdaq::CommandableFragmentGenerator::CommandableFragmentGenerator(const fhicl::ParameterSet& ps)
49 7 : : mutex_()
50 14 : , useMonitoringThread_(ps.get<bool>("separate_monitoring_thread", false))
51 14 : , monitoringInterval_(ps.get<int64_t>("hardware_poll_interval_us", 0))
52 7 : , isHardwareOK_(true)
53 7 : , run_number_(-1)
54 7 : , subrun_number_(-1)
55 7 : , timeout_(std::numeric_limits<uint64_t>::max())
56 7 : , timestamp_(std::numeric_limits<uint64_t>::max())
57 7 : , should_stop_(true)
58 7 : , exception_(false)
59 14 : , latest_exception_report_("none")
60 7 : , ev_counter_(1)
61 28 : , sleep_on_stop_us_(0)
62 : {
63 21 : auto fragment_ids = ps.get<std::vector<artdaq::Fragment::fragment_id_t>>("fragment_ids", std::vector<artdaq::Fragment::fragment_id_t>());
64 :
65 14 : TLOG(TLVL_DEBUG + 33) << "artdaq::CommandableFragmentGenerator::CommandableFragmentGenerator(ps)";
66 14 : int fragment_id = ps.get<int>("fragment_id", -99);
67 :
68 7 : if (fragment_id != -99)
69 : {
70 6 : if (!fragment_ids.empty())
71 : {
72 0 : latest_exception_report_ = R"(Error in CommandableFragmentGenerator: can't both define "fragment_id" and "fragment_ids" in FHiCL document)";
73 0 : TLOG(TLVL_ERROR) << latest_exception_report_;
74 0 : throw cet::exception(latest_exception_report_);
75 : }
76 :
77 6 : fragment_ids.emplace_back(fragment_id);
78 : }
79 :
80 14 : if (ps.has_key("generated_fragments_per_event"))
81 : {
82 3 : TLOG(TLVL_WARNING) << "Ignoring deprecated configuration parameter \"generated_fragments_per_event\"";
83 : }
84 :
85 7 : int first_fragment_id = std::numeric_limits<int>::max();
86 16 : for (auto& id : fragment_ids)
87 : {
88 9 : if (id < first_fragment_id) first_fragment_id = id;
89 9 : expectedTypes_[id] = artdaq::Fragment::EmptyFragmentType;
90 : }
91 7 : instance_name_for_metrics_ = app_name + "." + boost::lexical_cast<std::string>(first_fragment_id);
92 :
93 14 : sleep_on_stop_us_ = ps.get<int>("sleep_on_stop_us", 0);
94 7 : }
95 :
96 7 : artdaq::CommandableFragmentGenerator::~CommandableFragmentGenerator()
97 : {
98 7 : joinThreads();
99 7 : }
100 :
101 19 : void artdaq::CommandableFragmentGenerator::joinThreads()
102 : {
103 19 : should_stop_ = true;
104 38 : TLOG(TLVL_DEBUG + 32) << "Joining monitoringThread";
105 : try
106 : {
107 19 : if (monitoringThread_.joinable())
108 : {
109 1 : monitoringThread_.join();
110 : }
111 : }
112 0 : catch (...)
113 : {
114 : // IGNORED
115 0 : }
116 38 : TLOG(TLVL_DEBUG + 32) << "joinThreads complete";
117 19 : }
118 :
119 38 : bool artdaq::CommandableFragmentGenerator::getNext(FragmentPtrs& output)
120 : {
121 38 : bool result = true;
122 :
123 38 : if (check_stop()) usleep(sleep_on_stop_us_);
124 38 : if (exception() || should_stop_) return false;
125 :
126 32 : if (!useMonitoringThread_ && monitoringInterval_ > 0)
127 : {
128 4 : TLOG(TLVL_GETNEXT) << "getNext: Checking whether to collect Monitoring Data";
129 2 : auto now = std::chrono::steady_clock::now();
130 :
131 2 : if (TimeUtils::GetElapsedTimeMicroseconds(lastMonitoringCall_, now) >= static_cast<size_t>(monitoringInterval_))
132 : {
133 4 : TLOG(TLVL_GETNEXT) << "getNext: Collecting Monitoring Data";
134 2 : isHardwareOK_ = checkHWStatus_();
135 4 : TLOG(TLVL_GETNEXT) << "getNext: isHardwareOK_ is now " << std::boolalpha << isHardwareOK_;
136 2 : lastMonitoringCall_ = now;
137 : }
138 : }
139 :
140 : try
141 : {
142 32 : std::lock_guard<std::mutex> lk(mutex_);
143 32 : if (!isHardwareOK_)
144 : {
145 6 : TLOG(TLVL_ERROR) << "Stopping CFG because the hardware reports bad status!";
146 2 : return false;
147 : }
148 60 : TLOG(TLVL_DEBUG + 33) << "getNext: Calling getNext_ w/ ev_counter()=" << ev_counter();
149 : try
150 : {
151 30 : result = getNext_(output);
152 : }
153 0 : catch (...)
154 : {
155 0 : throw;
156 0 : }
157 60 : TLOG(TLVL_DEBUG + 33) << "getNext: Done with getNext_ - ev_counter() now " << ev_counter();
158 62 : for (auto& dataIter : output)
159 : {
160 64 : TLOG(TLVL_GETNEXT_VERBOSE) << "getNext: getNext_() returned fragment with sequenceID = " << dataIter->sequenceID()
161 0 : << ", type = " << dataIter->typeString() << ", id = " << std::to_string(dataIter->fragmentID())
162 32 : << ", timestamp = " << dataIter->timestamp() << ", and sizeBytes = " << dataIter->sizeBytes();
163 :
164 32 : auto fragId = dataIter->fragmentID();
165 32 : auto type = dataIter->type();
166 :
167 : // ELF, 2020 July 16: System Fragments are excluded from these checks
168 32 : if (Fragment::isSystemFragmentType(type))
169 : {
170 0 : continue;
171 : }
172 :
173 32 : if (!expectedTypes_.count(fragId))
174 : {
175 0 : TLOG(TLVL_ERROR) << "Received Fragment with Fragment ID " << fragId << ", which is not in the declared list of Fragment IDs! Aborting!";
176 0 : return false;
177 : }
178 32 : if (expectedTypes_[fragId] == Fragment::EmptyFragmentType)
179 9 : expectedTypes_[fragId] = type;
180 23 : else if (expectedTypes_[fragId] != type)
181 : {
182 0 : TLOG(TLVL_WARNING) << "Received Fragment with Fragment ID " << fragId << " and type " << dataIter->typeString() << "(" << type << "), which does not match expected type for this ID (" << expectedTypes_[fragId] << ")";
183 : }
184 : }
185 32 : }
186 0 : catch (const cet::exception& e)
187 : {
188 0 : latest_exception_report_ = "cet::exception caught in getNext(): ";
189 0 : latest_exception_report_.append(e.what());
190 0 : TLOG(TLVL_ERROR) << "getNext: cet::exception caught: " << e;
191 0 : set_exception(true);
192 0 : return false;
193 0 : }
194 0 : catch (const boost::exception& e)
195 : {
196 0 : latest_exception_report_ = "boost::exception caught in getNext(): ";
197 0 : latest_exception_report_.append(boost::diagnostic_information(e));
198 0 : TLOG(TLVL_ERROR) << "getNext: boost::exception caught: " << boost::diagnostic_information(e);
199 0 : set_exception(true);
200 0 : return false;
201 0 : }
202 0 : catch (const std::exception& e)
203 : {
204 0 : latest_exception_report_ = "std::exception caught in getNext(): ";
205 0 : latest_exception_report_.append(e.what());
206 0 : TLOG(TLVL_ERROR) << "getNext: std::exception caught: " << e.what();
207 0 : set_exception(true);
208 0 : return false;
209 0 : }
210 0 : catch (...)
211 : {
212 0 : latest_exception_report_ = "Unknown exception caught in getNext().";
213 0 : TLOG(TLVL_ERROR) << "getNext: unknown exception caught";
214 0 : set_exception(true);
215 0 : return false;
216 0 : }
217 :
218 30 : if (!result)
219 : {
220 0 : TLOG(TLVL_DEBUG + 32) << "getNext: Either getNext_ or applyRequests returned false, stopping";
221 : }
222 :
223 30 : if (metricMan && !output.empty())
224 : {
225 29 : auto timestamp = output.front()->timestamp();
226 :
227 29 : if (output.size() > 1)
228 : { // Only bother sorting if >1 entry
229 7 : for (auto& outputfrag : output)
230 : {
231 5 : if (outputfrag->timestamp() > timestamp)
232 : {
233 0 : timestamp = outputfrag->timestamp();
234 : }
235 : }
236 : }
237 :
238 203 : metricMan->sendMetric("Last Timestamp", timestamp, "Ticks", 1, MetricMode::LastPoint);
239 : }
240 :
241 30 : return result;
242 : }
243 :
244 38 : bool artdaq::CommandableFragmentGenerator::check_stop()
245 : {
246 76 : TLOG(TLVL_CHECKSTOP) << "CFG::check_stop: should_stop=" << should_stop() << ", exception status =" << int(exception());
247 :
248 38 : if (!should_stop()) return false;
249 :
250 6 : return true;
251 : }
252 :
253 29 : size_t artdaq::CommandableFragmentGenerator::ev_counter_inc(size_t step)
254 : {
255 58 : TLOG(TLVL_EVCOUNTERINC) << "ev_counter_inc: Incrementing ev_counter from " << ev_counter() << " by " << step;
256 58 : return ev_counter_.fetch_add(step);
257 : } // returns the prev value
258 :
259 8 : void artdaq::CommandableFragmentGenerator::StartCmd(int run, uint64_t timeout, uint64_t timestamp)
260 : {
261 16 : TLOG(TLVL_DEBUG + 33) << "Start Command received.";
262 8 : if (run < 0)
263 : {
264 0 : TLOG(TLVL_ERROR) << "negative run number";
265 0 : throw cet::exception("CommandableFragmentGenerator") << "negative run number"; // NOLINT(cert-err60-cpp)
266 : }
267 :
268 8 : timeout_ = timeout;
269 8 : timestamp_ = timestamp;
270 8 : ev_counter_.store(1);
271 :
272 8 : should_stop_.store(false);
273 8 : exception_.store(false);
274 8 : run_number_ = run;
275 8 : subrun_number_ = 1;
276 8 : latest_exception_report_ = "none";
277 :
278 8 : start();
279 :
280 8 : std::unique_lock<std::mutex> lk(mutex_);
281 8 : if (useMonitoringThread_) startMonitoringThread();
282 16 : TLOG(TLVL_DEBUG + 33) << "Start Command complete.";
283 8 : }
284 :
285 4 : void artdaq::CommandableFragmentGenerator::StopCmd(uint64_t timeout, uint64_t timestamp)
286 : {
287 8 : TLOG(TLVL_DEBUG + 33) << "Stop Command received.";
288 :
289 4 : timeout_ = timeout;
290 4 : timestamp_ = timestamp;
291 :
292 4 : stopNoMutex();
293 4 : should_stop_.store(true);
294 4 : std::unique_lock<std::mutex> lk(mutex_);
295 4 : stop();
296 :
297 4 : joinThreads();
298 8 : TLOG(TLVL_DEBUG + 33) << "Stop Command complete.";
299 4 : }
300 :
301 1 : void artdaq::CommandableFragmentGenerator::PauseCmd(uint64_t timeout, uint64_t timestamp)
302 : {
303 2 : TLOG(TLVL_DEBUG + 33) << "Pause Command received.";
304 1 : timeout_ = timeout;
305 1 : timestamp_ = timestamp;
306 :
307 1 : pauseNoMutex();
308 1 : should_stop_.store(true);
309 1 : std::unique_lock<std::mutex> lk(mutex_);
310 :
311 1 : pause();
312 1 : }
313 :
314 1 : void artdaq::CommandableFragmentGenerator::ResumeCmd(uint64_t timeout, uint64_t timestamp)
315 : {
316 2 : TLOG(TLVL_DEBUG + 33) << "Resume Command received.";
317 1 : timeout_ = timeout;
318 1 : timestamp_ = timestamp;
319 :
320 1 : subrun_number_ += 1;
321 1 : should_stop_ = false;
322 :
323 : // no lock required: thread not started yet
324 1 : resume();
325 :
326 1 : std::unique_lock<std::mutex> lk(mutex_);
327 : // if (useDataThread_) startDataThread();
328 : // if (useMonitoringThread_) startMonitoringThread();
329 2 : TLOG(TLVL_DEBUG + 33) << "Resume Command complete.";
330 1 : }
331 :
332 0 : std::string artdaq::CommandableFragmentGenerator::ReportCmd(std::string const& which)
333 : {
334 0 : TLOG(TLVL_DEBUG + 33) << "Report Command received.";
335 0 : std::lock_guard<std::mutex> lk(mutex_);
336 :
337 : // 14-May-2015, KAB: please see the comments associated with the report()
338 : // methods in the CommandableFragmentGenerator.hh file for more information
339 : // on the use of those methods in this method.
340 :
341 : // check if the child class has something meaningful for this request
342 0 : std::string childReport = reportSpecific(which);
343 0 : if (childReport.length() > 0) { return childReport; }
344 :
345 : // handle the requests that we can take care of at this level
346 0 : if (which == "latest_exception")
347 : {
348 0 : return latest_exception_report_;
349 : }
350 :
351 : // check if the child class has provided a catch-all report function
352 0 : childReport = report();
353 0 : if (childReport.length() > 0) { return childReport; }
354 :
355 : // ELF: 5/31/2019: Let BoardReaderCore's report handle this...
356 : /*
357 : // if we haven't been able to come up with any report so far, say so
358 : std::string tmpString = "The \"" + which + "\" command is not ";
359 : tmpString.append("currently supported by the ");
360 : tmpString.append(metricsReportingInstanceName());
361 : tmpString.append(" fragment generator.");
362 : */
363 0 : TLOG(TLVL_DEBUG + 33) << "Report Command complete.";
364 0 : return ""; // tmpString;
365 0 : }
366 :
367 : // Default implemenetations of state functions
368 1 : void artdaq::CommandableFragmentGenerator::pauseNoMutex()
369 : {
370 : #pragma message "Using default implementation of CommandableFragmentGenerator::pauseNoMutex()"
371 1 : }
372 :
373 0 : void artdaq::CommandableFragmentGenerator::pause()
374 : {
375 : #pragma message "Using default implementation of CommandableFragmentGenerator::pause()"
376 0 : }
377 :
378 0 : void artdaq::CommandableFragmentGenerator::resume(){
379 : #pragma message "Using default implementation of CommandableFragmentGenerator::resume()"
380 0 : }
381 :
382 0 : std::string artdaq::CommandableFragmentGenerator::report()
383 : {
384 : #pragma message "Using default implementation of CommandableFragmentGenerator::report()"
385 0 : return "";
386 : }
387 :
388 0 : std::string artdaq::CommandableFragmentGenerator::reportSpecific(std::string const& /*unused*/)
389 : {
390 : #pragma message "Using default implementation of CommandableFragmentGenerator::reportSpecific(std::string)"
391 0 : return "";
392 : }
393 :
394 0 : bool artdaq::CommandableFragmentGenerator::checkHWStatus_()
395 : {
396 : #pragma message "Using default implementation of CommandableFragmentGenerator::checkHWStatus_()"
397 0 : return true;
398 : }
399 :
400 0 : bool artdaq::CommandableFragmentGenerator::metaCommand(std::string const& /*unused*/, std::string const& /*unused*/)
401 : {
402 : #pragma message "Using default implementation of CommandableFragmentGenerator::metaCommand(std::string, std::string)"
403 0 : return true;
404 : }
405 :
406 1 : void artdaq::CommandableFragmentGenerator::startMonitoringThread()
407 : {
408 1 : if (monitoringThread_.joinable())
409 : {
410 0 : monitoringThread_.join();
411 : }
412 3 : TLOG(TLVL_INFO) << "Starting Hardware Monitoring Thread";
413 : try
414 : {
415 1 : monitoringThread_ = boost::thread(&CommandableFragmentGenerator::getMonitoringDataLoop, this);
416 : char tname[16]; // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
417 1 : snprintf(tname, sizeof(tname) - 1, "%d-CFGMon", my_rank); // NOLINT
418 1 : tname[sizeof(tname) - 1] = '\0'; // assure term. snprintf is not too evil :)
419 1 : auto handle = monitoringThread_.native_handle();
420 1 : pthread_setname_np(handle, tname);
421 : }
422 0 : catch (const boost::exception& e)
423 : {
424 0 : TLOG(TLVL_ERROR) << "Caught boost::exception starting Hardware Monitoring thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
425 0 : throw cet::exception("ThreadError") << "Caught boost::exception starting Hardware Monitoring thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
426 0 : }
427 1 : }
428 :
429 1 : void artdaq::CommandableFragmentGenerator::getMonitoringDataLoop()
430 : {
431 15 : while (!should_stop())
432 : {
433 14 : if (should_stop() || monitoringInterval_ <= 0)
434 : {
435 0 : TLOG(TLVL_DEBUG + 32) << "getMonitoringDataLoop: should_stop() is " << std::boolalpha << should_stop()
436 0 : << " and monitoringInterval is " << monitoringInterval_ << ", returning";
437 0 : return;
438 : }
439 28 : TLOG(TLVL_GETMONITORINGDATA) << "getMonitoringDataLoop: Determining whether to call checkHWStatus_";
440 :
441 14 : auto now = std::chrono::steady_clock::now();
442 14 : if (TimeUtils::GetElapsedTimeMicroseconds(lastMonitoringCall_, now) >= static_cast<size_t>(monitoringInterval_))
443 : {
444 2 : isHardwareOK_ = checkHWStatus_();
445 4 : TLOG(TLVL_GETMONITORINGDATA) << "getMonitoringDataLoop: isHardwareOK_ is now " << std::boolalpha << isHardwareOK_;
446 2 : lastMonitoringCall_ = now;
447 : }
448 14 : usleep(monitoringInterval_ / 10);
449 : }
450 : }
|