Line data Source code
1 : #ifndef artdaq_Application_CommandableFragmentGenerator_hh
2 : #define artdaq_Application_CommandableFragmentGenerator_hh
3 :
4 : #include "fhiclcpp/types/Sequence.h" // Must pre-empt fhiclcpp/types/Atom.h
5 :
6 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
7 : #include "artdaq-core/Data/Fragment.hh"
8 :
9 : #include "artdaq-core/Plugins/FragmentGenerator.hh"
10 : #include "artdaq/DAQrate/FragmentBuffer.hh"
11 : #include "artdaq/DAQrate/RequestBuffer.hh"
12 :
13 : #include "fhiclcpp/types/Atom.h"
14 : #include "fhiclcpp/types/Comment.h"
15 : #include "fhiclcpp/types/ConfigurationTable.h"
16 : #include "fhiclcpp/types/Name.h"
17 : namespace fhicl {
18 : class ParameterSet;
19 : }
20 :
21 : #include <boost/thread.hpp>
22 :
23 : #include <array>
24 : #include <atomic>
25 : #include <chrono>
26 : #include <condition_variable>
27 : #include <list>
28 : #include <memory>
29 : #include <mutex>
30 : #include <queue>
31 :
32 : // Socket Includes
33 : #include <arpa/inet.h>
34 : #include <netinet/in.h>
35 : #include <sys/socket.h>
36 : #include <sys/types.h>
37 : #include <unistd.h>
38 :
39 : namespace artdaq {
40 :
41 : /**
42 : * \brief CommandableFragmentGenerator is a FragmentGenerator-derived
43 : * abstract class that defines the interface for a FragmentGenerator
44 : * designed as a state machine with start, stop, etc., transition
45 : * commands.
46 : *
47 : * Users of classes derived from
48 : * CommandableFragmentGenerator will call these transitions via the
49 : * publically defined StartCmd(), StopCmd(), etc.; these public
50 : * functions contain functionality considered properly universal to
51 : * all CommandableFragmentGenerator-derived classes, including calls
52 : * to private virtual functions meant to be overridden in derived
53 : * classes. The same applies to this class's implementation of the
54 : * FragmentGenerator::getNext() pure virtual function, which is
55 : * declared final (i.e., non-overridable in derived classes) and which
56 : * itself calls a pure virtual getNext_() function to be implemented
57 : * in derived classes.
58 : *
59 : * State-machine related interface functions will be called only from a
60 : * single thread. getNext() will be called only from a single
61 : * thread. The thread from which state-machine interfaces functions are
62 : * called may be a different thread from the one that calls getNext().
63 : *
64 : * John F., 3/24/14
65 : *
66 : * After some discussion with Kurt, CommandableFragmentGenerator has
67 : * been updated such that it now contains a member vector
68 : * fragment_ids_ ; if "fragment_id" is set in the FHiCL document
69 : * controlling a class derived from CommandableFragmentGenerator,
70 : * fragment_ids_ will be booked as a length-1 vector, and the value in
71 : * this vector will be returned by fragment_id(). fragment_id() will
72 : * throw an exception if the length of the vector isn't 1. If
73 : * "fragment_ids" is set in the FHiCL document, then fragment_ids_ is
74 : * filled with the values in the list which "fragment_ids" refers to,
75 : * otherwise it is set to the empty vector (this is what should happen
76 : * if the user sets the "fragment_id" variable in the FHiCL document,
77 : * otherwise exceptions will end up thrown due to the logical
78 : * conflict). If neither "fragment_id" nor "fragment_ids" is set in
79 : * the FHiCL document, writers of classes derived from this one will
80 : * be expected to override the virtual fragmentIDs() function with
81 : * their own code (the CompositeDriver class is an example of this)
82 : */
83 : class CommandableFragmentGenerator : public FragmentGenerator
84 : {
85 : public:
86 : /// <summary>
87 : /// Configuration of the CommandableFragmentGenerator. May be used for parameter validation
88 : /// </summary>
89 : struct Config
90 : {
91 : /// "generator" (REQUIRED) Name of the CommandableFragmentGenerator plugin to load
92 : fhicl::Atom<std::string> generator_type{fhicl::Name{"generator"}, fhicl::Comment{"Name of the CommandableFragmentGenerator plugin to load"}};
93 : /// "expected_fragment_type" (Default: 231, EmptyFragmentType) : The type of Fragments this CFG will be generating. "Empty" will auto - detect type based on Fragments generated.
94 : fhicl::Atom<Fragment::type_t> expected_fragment_type{fhicl::Name{"expected_fragment_type"}, fhicl::Comment{"The type of Fragments this CFG will be generating. \"Empty\" will auto-detect type based on Fragments generated."}, Fragment::type_t(Fragment::EmptyFragmentType)};
95 : /// "sleep_on_no_data_us" (Default: 0 (no sleep)) : How long to sleep after calling getNext_ if no data is returned
96 : fhicl::Atom<size_t> sleep_on_no_data_us{fhicl::Name{"sleep_on_no_data_us"}, fhicl::Comment{"How long to sleep after calling getNext_ if no data is returned"}, 0};
97 : /// "separate_monitoring_thread" (Default: false) : Whether a thread that calls the checkHWStatus_ method should be created
98 : fhicl::Atom<bool> separate_monitoring_thread{fhicl::Name{"separate_monitoring_thread"}, fhicl::Comment{"Whether a thread that calls the checkHWStatus_ method should be created"}, false};
99 : /// "hardware_poll_interval_us" (Default: 0) : If a separate monitoring thread is used, how often should it call checkHWStatus_
100 : fhicl::Atom<int64_t> hardware_poll_interval_us{fhicl::Name{"hardware_poll_interval_us"}, fhicl::Comment{"If a separate monitoring thread is used, how often should it call checkHWStatus_"}, 0};
101 : /// "fragment_ids" (Default: empty vector) : A list of Fragment IDs created by this CommandableFragmentGenerator
102 : /// Note that only one of fragment_ids and fragment_id should be specified in the configuration
103 : fhicl::Sequence<Fragment::fragment_id_t> fragment_ids{fhicl::Name("fragment_ids"), fhicl::Comment("A list of Fragment IDs created by this CommandableFragmentGenerator")};
104 : /// "fragment_id" (Default: -99) : The Fragment ID created by this CommandableFragmentGenerator
105 : /// Note that only one of fragment_ids and fragment_id should be specified in the configuration
106 : fhicl::Atom<int> fragment_id{fhicl::Name{"fragment_id"}, fhicl::Comment{"The Fragment ID created by this CommandableFragmentGenerator"}, -99};
107 : /// "sleep_on_stop_us" (Default: 0) : How long to sleep before returning when stop transition is called
108 : fhicl::Atom<int> sleep_on_stop_us{fhicl::Name{"sleep_on_stop_us"}, fhicl::Comment{"How long to sleep before returning when stop transition is called"}, 0};
109 : };
110 : /// Used for ParameterSet validation (if desired)
111 : using Parameters = fhicl::WrappedTable<Config>;
112 :
113 : /**
114 : * \brief CommandableFragmentGenerator Constructor
115 : * \param ps ParameterSet used to configure CommandableFragmentGenerator. See artdaq::CommandableFragmentGenerator::Config.
116 : */
117 : explicit CommandableFragmentGenerator(const fhicl::ParameterSet& ps);
118 :
119 : /**
120 : * \brief CommandableFragmentGenerator Destructor
121 : *
122 : * Joins all threads before returning
123 : */
124 : virtual ~CommandableFragmentGenerator();
125 :
126 : /**
127 : * \brief Join any data-taking threads. Should be called when destructing CommandableFragmentGenerator
128 : *
129 : * Join any data-taking threads. Should be called when destructing CommandableFragmentGenerator
130 : * Sets flags so that threads stop operations.
131 : */
132 : void joinThreads();
133 :
134 : /**
135 : * \brief getNext calls either applyRequests or getNext_ to get any data that is ready to be sent to the EventBuilders
136 : * \param output FragmentPtrs object containing Fragments ready for transmission
137 : * \return Whether getNext completed without exceptions
138 : */
139 : bool getNext(FragmentPtrs& output) override final;
140 :
141 : /**
142 : * \brief Function that launches the monitoring thread (getMonitoringDataLoop())
143 : */
144 : void startMonitoringThread();
145 :
146 : /**
147 : * \brief This function regularly calls checkHWStatus_(), and sets the isHardwareOK flag accordingly.
148 : */
149 : void getMonitoringDataLoop();
150 :
151 : /**
152 : * \brief Get the list of Fragment IDs handled by this CommandableFragmentGenerator
153 : * \return A std::vector<Fragment::fragment_id_t> containing the Fragment IDs handled by this CommandableFragmentGenerator
154 : */
155 70 : std::vector<Fragment::fragment_id_t> fragmentIDs() override
156 : {
157 70 : std::vector<Fragment::fragment_id_t> output;
158 :
159 144 : for (auto& id : expectedTypes_)
160 : {
161 74 : output.push_back(id.first);
162 : }
163 :
164 70 : return output;
165 0 : }
166 :
167 : /**
168 : * \brief Get the current value of the event counter
169 : * \return The current value of the event counter
170 : */
171 48 : size_t ev_counter() const { return ev_counter_.load(); }
172 :
173 : //
174 : // State-machine related interface below.
175 : //
176 :
177 : /**
178 : * \brief Start the CommandableFragmentGenerator
179 : * \param run Run ID of the new run
180 : * \param timeout Timeout for transition
181 : * \param timestamp Timestamp of transition
182 : *
183 : * After a call to 'StartCmd', all Fragments returned by getNext()
184 : * will be marked as part of a Run with the given run number, and
185 : * with subrun number 1. Calling StartCmd also resets the event
186 : * number to 1. After a call to StartCmd(), and until a call to
187 : * StopCmd, getNext() -- and hence the virtual function it calls,
188 : * getNext_() -- should return true as long as datataking is meant
189 : * to take place, even if a particular call returns no fragments.
190 : */
191 : void StartCmd(int run, uint64_t timeout, uint64_t timestamp);
192 :
193 : /**
194 : * \brief Stop the CommandableFragmentGenerator
195 : * \param timeout Timeout for transition
196 : * \param timestamp Timestamp of transition
197 : *
198 : * After a call to StopCmd(), getNext() will eventually return
199 : * false. This may not happen for several calls, if the
200 : * implementation has data to be 'drained' from the system.
201 : */
202 : void StopCmd(uint64_t timeout, uint64_t timestamp);
203 :
204 : /**
205 : * \brief Pause the CommandableFragmentGenerator
206 : * \param timeout Timeout for transition
207 : * \param timestamp Timestamp of transition
208 : *
209 : * A call to PauseCmd() is advisory. It is an indication that the
210 : * BoardReader should stop the incoming flow of data, if it can do
211 : * so.
212 : */
213 : void PauseCmd(uint64_t timeout, uint64_t timestamp);
214 :
215 : /**
216 : * \brief Resume the CommandableFragmentGenerator
217 : * \param timeout Timeout for transition
218 : * \param timestamp Timestamp of transition
219 : *
220 : * After a call to ResumeCmd(), the next Fragments returned from
221 : * getNext() will be part of a new SubRun.
222 : */
223 : void ResumeCmd(uint64_t timeout, uint64_t timestamp);
224 :
225 : /**
226 : * \brief Get a report about a user-specified run-time quantity
227 : * \param which Which quantity to report
228 : * \return The report about the specified quantity
229 : *
230 : * CommandableFragmentGenerator only implements "latest_exception",
231 : * a report on the last exception received. However, child classes
232 : * can override the reportSpecific function to provide additional
233 : * reports.
234 : */
235 : std::string ReportCmd(std::string const& which = "");
236 :
237 : /**
238 : * \brief Get the name used when reporting metrics
239 : * \return The name used when reporting metrics
240 : */
241 0 : virtual std::string metricsReportingInstanceName() const
242 : {
243 0 : return instance_name_for_metrics_;
244 : }
245 :
246 : // The following functions are not yet implemented, and their
247 : // signatures may be subject to change.
248 :
249 : // John F., 12/6/13 -- do we want Reset and Shutdown commands?
250 : // Kurt B., 15-Feb-2014. For the moment, I suspect that we don't
251 : // want a Shutdown command. FragmentGenerator instances are
252 : // Constructed at Initialization time, and they are destructed
253 : // at Shutdown time. So, any shutdown operations that need to be
254 : // done should be put in the FragmentGenerator child class
255 : // destructors. If we find that want shutdown (or initialization)
256 : // operations that are different from destruction (construction),
257 : // then we'll have to add InitCmd and ShutdownCmd methods.
258 :
259 : // virtual void ResetCmd() final {}
260 : // virtual void ShutdownCmd() final {}
261 :
262 : /**
263 : * \brief Get the current value of the exception flag
264 : * \return The current value of the exception flag
265 : */
266 38 : bool exception() const { return exception_.load(); }
267 :
268 : /**
269 : * \brief The meta-command is used for implementing user-specific commands in a CommandableFragmentGenerator
270 : * \param command Name of the command to run
271 : * \param arg Argument(s) for command
272 : * \return true if command succeeded or if command not supported
273 : */
274 : virtual bool metaCommand(std::string const& command, std::string const& arg);
275 :
276 : /**
277 : * @brief Set the shared_ptr to the RequestBuffer
278 : * @param buffer shared_ptr to the RequestBuffer
279 : */
280 2 : void SetRequestBuffer(std::shared_ptr<RequestBuffer> buffer) { requestBuffer_ = buffer; }
281 :
282 0 : void SetFragmentBuffer(std::shared_ptr<FragmentBuffer> buffer) { fragmentBuffer_ = buffer; }
283 :
284 : protected:
285 : // John F., 12/6/13 -- need to figure out which of these getter
286 : // functions should be promoted to "public"
287 :
288 : // John F., 1/21/15 -- after more than a year, there hasn't been a
289 : // single complaint that a CommandableFragmentGenerator-derived
290 : // class hasn't allowed its users to access these quantities, so
291 : // they're probably fine as is
292 :
293 : /**
294 : * \brief Get the current Run number
295 : * \return The current Run number
296 : */
297 0 : int run_number() const { return run_number_; }
298 : /**
299 : * \brief Get the current Subrun number
300 : * \return The current Subrun number
301 : */
302 : int subrun_number() const { return subrun_number_; }
303 : /**
304 : * \brief Timeout of last command
305 : * \return Timeout of last command
306 : */
307 0 : uint64_t timeout() const { return timeout_; }
308 : /**
309 : * \brief Timestamp of last command
310 : * \return Timestamp of last command
311 : */
312 0 : uint64_t timestamp() const { return timestamp_; }
313 :
314 : /**
315 : * \brief Get the Fragment ID of this Fragment generator
316 : * \throws cet::exception("FragmentID") if there is more that one Fragment ID configured for this Fragment Generator
317 : * \return Fragment ID for the Fragment Generator
318 : */
319 0 : artdaq::Fragment::fragment_id_t fragment_id() const
320 : {
321 0 : if (expectedTypes_.size() > 1) throw cet::exception("FragmentID") << "fragment_id() was called, indicating that Fragment Generator was expecting one and only one Fragment ID, but " << expectedTypes_.size() << " were declared!"; // NOLINT(cert-err60-cpp)
322 0 : return (*expectedTypes_.begin()).first;
323 : }
324 :
325 : /**
326 : * \brief Get the current value of the should_stop flag
327 : * \return The current value of the should_stop flag
328 : */
329 67 : bool should_stop() const { return should_stop_.load(); }
330 :
331 : /**
332 : * \brief Routine used by applyRequests to make sure that all outstanding requests have been fulfilled before returning
333 : * \return The logical AND of should_stop, mode is not Ignored, and requests list size equal to 0
334 : */
335 : bool check_stop();
336 :
337 : /**
338 : * \brief Increment the event counter
339 : * \param step Amount to increment the event counter by
340 : * \return The previous value of the event counter
341 : */
342 : size_t ev_counter_inc(size_t step = 1); // returns the prev value
343 :
344 : /**
345 : * \brief Control the exception flag
346 : * \param exception Whether an excpetion has occurred
347 : */
348 0 : void set_exception(bool exception) { exception_.store(exception); }
349 :
350 : /**
351 : * \brief Sets the name for metrics reporting
352 : * \param name The new name for metrics reporting
353 : */
354 0 : void metricsReportingInstanceName(std::string const& name)
355 : {
356 0 : instance_name_for_metrics_ = name;
357 0 : }
358 :
359 : // John F., 12/10/13
360 : // Is there a better way to handle mutex_ than leaving it a protected variable?
361 :
362 : // John F., 1/21/15
363 : // Translation above is "should mutex_ be a private variable,
364 : // accessible via a getter function". Probably, but at this point
365 : // it's not worth breaking code by implementing this.
366 :
367 : std::mutex mutex_; ///< Mutex used to ensure that multiple transition commands do not run at the same time
368 :
369 : /**
370 : * @brief Get the shared_ptr to the RequestBuffer
371 : * @return shared_ptr to the RequestBuffer
372 : */
373 0 : std::shared_ptr<RequestBuffer> GetRequestBuffer() { return requestBuffer_; }
374 :
375 : std::shared_ptr<FragmentBuffer> GetFragmentBuffer() { return fragmentBuffer_; }
376 :
377 : private:
378 : CommandableFragmentGenerator(CommandableFragmentGenerator const&) = delete;
379 : CommandableFragmentGenerator(CommandableFragmentGenerator&&) = delete;
380 : CommandableFragmentGenerator& operator=(CommandableFragmentGenerator const&) = delete;
381 : CommandableFragmentGenerator& operator=(CommandableFragmentGenerator&&) = delete;
382 :
383 : // FHiCL-configurable variables. Note that the C++ variable names
384 : // are the FHiCL variable names with a "_" appended
385 :
386 : // Socket parameters
387 :
388 : std::map<Fragment::fragment_id_t, Fragment::type_t> expectedTypes_;
389 :
390 : bool useMonitoringThread_;
391 : boost::thread monitoringThread_;
392 : int64_t monitoringInterval_; // Microseconds
393 : std::chrono::steady_clock::time_point lastMonitoringCall_;
394 : std::atomic<bool> isHardwareOK_;
395 :
396 : // In order to support the state-machine related behavior, all
397 : // CommandableFragmentGenerators must be able to remember a run number and a
398 : // subrun number.
399 : int run_number_, subrun_number_;
400 :
401 : // JCF, 8/28/14
402 :
403 : // Provide a user-adjustable timeout for the start transition
404 : uint64_t timeout_;
405 :
406 : // JCF, 8/21/14
407 :
408 : // In response to a need to synchronize various components using
409 : // different fragment generators in an experiment, keep a record
410 : // of a timestamp (see Redmine Issue #6783 for more)
411 :
412 : uint64_t timestamp_;
413 :
414 : std::atomic<bool> should_stop_, exception_;
415 : std::string latest_exception_report_;
416 : std::atomic<size_t> ev_counter_;
417 :
418 : std::string instance_name_for_metrics_;
419 :
420 : // Depending on what sleep_on_stop_us_ is set to, this gives the
421 : // stopping thread the chance to gather the required lock
422 :
423 : int sleep_on_stop_us_;
424 :
425 : // So that derived classes can access information about requests
426 : std::shared_ptr<RequestBuffer> requestBuffer_;
427 : std::shared_ptr<FragmentBuffer> fragmentBuffer_;
428 :
429 : protected:
430 : /// <summary>
431 : /// Obtain the next group of Fragments, if any are available. Return false if readout cannot continue,
432 : /// if we are 'stopped', or if we are not running in state-machine mode.
433 : /// Note that getNext_() must return n of each fragmentID declared by fragmentIDs_().
434 : /// </summary>
435 : /// <param name="output">Reference to list of Fragment pointers to which additional Fragments should be added</param>
436 : /// <returns>True if readout should continue, false otherwise</returns>
437 : virtual bool getNext_(FragmentPtrs& output) = 0;
438 :
439 : /// <summary>
440 : /// Check any relavent hardware status registers. Return false if
441 : /// an error condition exists that should halt data-taking.
442 : /// This function should probably make MetricManager calls.
443 : /// </summary>
444 : /// <returns>False if a condition exists that should halt all data-taking, true otherwise</returns>
445 : virtual bool checkHWStatus_();
446 :
447 : //
448 : // State-machine related implementor interface below.
449 : //
450 :
451 : /// <summary>
452 : /// If a CommandableFragmentGenerator subclass is reading from a
453 : /// file, and start() is called, any run-, subrun-, and
454 : /// event-numbers in the data read from the file must be
455 : /// over-written by the specified run number, etc. After a call to
456 : /// StartCmd(), and until a call to StopCmd(), getNext_() is
457 : /// expected to return true as long as datataking is intended.
458 : ///
459 : /// This is a pure virtual function, and must be overriden by Fragment Generator implementations
460 : /// </summary>
461 : virtual void start() = 0;
462 :
463 : /// <summary>
464 : /// On call to StopCmd, stopNoMutex() is called prior to StopCmd
465 : /// acquiring the mutex
466 : ///
467 : /// This is a pure virtual function, and must be overriden by Fragment Generator implementations
468 : /// </summary>
469 : virtual void stopNoMutex() = 0;
470 :
471 : /// <summary>
472 : /// If a CommandableFragmentGenerator subclass is reading from a file, calling
473 : /// stop() should arrange that the next call to getNext_() returns
474 : /// false, rather than allowing getNext_() to read to the end of the
475 : /// file.
476 : ///
477 : /// This is a pure virtual function, and must be overriden by Fragment Generator implementations
478 : /// </summary>
479 : virtual void stop() = 0;
480 :
481 : /// <summary>
482 : /// On call to PauseCmd, pauseNoMutex() is called prior to PauseCmd
483 : /// acquiring the mutex
484 : /// </summary>
485 : virtual void pauseNoMutex();
486 :
487 : /// <summary>
488 : /// If a CommandableFragmentGenerator subclass is reading from hardware, the
489 : /// implementation of pause() should tell the hardware to stop
490 : /// sending data.
491 : /// </summary>
492 : virtual void pause();
493 :
494 : /// <summary>
495 : /// The subrun number will be incremented *before* a call to resume.
496 : /// </summary>
497 : virtual void resume();
498 :
499 : /// <summary>
500 : /// Let's say that the contract with the report() functions is that they
501 : /// return a non-empty string if they have something useful to report,
502 : /// but if they don't know how to handle a given request, they simply
503 : /// return an empty string and the ReportCmd() takes care of saying
504 : /// "the xyz command is not currently supported".
505 : /// For backward compatibility, we keep the report function that takes
506 : /// no arguments and add one that takes a "which" argument. In the
507 : /// ReportCmd function, we'll call the more specific one first.
508 : /// </summary>
509 : /// <returns>Default report from the FragmentGenerator</returns>
510 : virtual std::string report();
511 :
512 : /// <summary>
513 : /// Report the status of a specific quantity
514 : /// </summary>
515 : /// <param name="what">Name of the quantity to report</param>
516 : /// <returns>Value of requested quantity. Null string if what is unsupported.</returns>
517 : virtual std::string reportSpecific(std::string const& what);
518 : };
519 : } // namespace artdaq
520 :
521 : #endif /* artdaq_Application_CommandableFragmentGenerator_hh */
|