Line data Source code
1 : ////////////////////////////////////////////////////////////////////////
2 : // genToArt
3 : //
4 : // This application is intended to invoke a configurable set of fragment
5 : // generators, and funnel the result to an invocation of art. This is
6 : // not an MPI application (see caveat below), so is not intended for
7 : // high performance production DAQ scenarios -- for that, see the pmt
8 : // application driver and its associated applcations boardreader and
9 : // eventbuilder.
10 : ////////////////////////////////////////////////////////////////////////
11 :
12 : #include "TRACE/tracemf.h"
13 : #define TRACE_NAME "genToArt"
14 :
15 : #include "artdaq-core/Data/Fragment.hh"
16 : #include "artdaq-core/Data/detail/RawFragmentHeader.hh"
17 : #include "artdaq-core/Plugins/FragmentGenerator.hh"
18 : #include "artdaq-core/Plugins/makeFragmentGenerator.hh"
19 : #include "artdaq-core/Utilities/SimpleLookupPolicy.hh"
20 : #include "artdaq/DAQdata/GenericFragmentSimulator.hh"
21 : #include "artdaq/DAQrate/SharedMemoryEventManager.hh"
22 : #include "artdaq/Generators/CommandableFragmentGenerator.hh"
23 : #include "canvas/Utilities/Exception.h"
24 : #include "cetlib/container_algorithms.h"
25 : #include "fhiclcpp/ParameterSet.h"
26 :
27 : #include <boost/program_options.hpp>
28 :
29 : #include <deque>
30 : #include <iostream>
31 : #include <map>
32 : #include <memory>
33 : #include <string>
34 : #include <utility>
35 :
36 : namespace bpo = boost::program_options;
37 :
38 : namespace {
39 : /**
40 : * \brief Process the command line
41 : * \param argc Number of arguments
42 : * \param argv Array of arguments as strings
43 : * \param[out] vm Output boost::program_options::variables_map
44 : * \return 0 if success, -1 if excpetion, 1 if help was requested, and 2 if missing required arguments
45 : */
46 3 : int process_cmd_line(int argc, char** argv,
47 : bpo::variables_map& vm)
48 : {
49 3 : std::ostringstream descstr;
50 : descstr << *argv
51 3 : << " <-c <config-file>> <other-options> [<source-file>]+";
52 3 : bpo::options_description desc(descstr.str());
53 3 : desc.add_options()("config,c", bpo::value<std::string>(), "Configuration file.")("help,h", "produce help message");
54 : try
55 : {
56 3 : bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm);
57 3 : bpo::notify(vm);
58 : }
59 0 : catch (bpo::error const& e)
60 : {
61 0 : TLOG(TLVL_ERROR) << "Exception from command line processing in " << *argv
62 0 : << ": " << e.what();
63 0 : return -1;
64 0 : }
65 9 : if (vm.count("help") != 0u)
66 : {
67 0 : std::cout << desc << std::endl;
68 0 : return 1;
69 : }
70 9 : if (vm.count("config") == 0u)
71 : {
72 0 : TLOG(TLVL_ERROR) << "Exception from command line processing in " << *argv
73 0 : << ": no configuration file given.\n"
74 0 : << "For usage and an options list, please do '"
75 0 : << *argv << " --help"
76 0 : << "'.";
77 0 : return 2;
78 : }
79 3 : return 0;
80 3 : }
81 :
82 : /**
83 : * \brief ThrottledGenerator: ensure that we only get one fragment per type
84 : * at a time from the generator.
85 : */
86 : class ThrottledGenerator
87 : {
88 : public:
89 : /**
90 : * \brief ThrottledGenerator Constructor
91 : * \param generator Name of the generator plugin to load
92 : * \param ps ParameterSet for configuring the FragmentGenerator
93 : */
94 : ThrottledGenerator(std::string const& generator,
95 : fhicl::ParameterSet const& ps);
96 :
97 : /**
98 : * \brief Get the next fragment from the generator
99 : * \param[out] newFrags New Fragment objects are added to this list
100 : * \return Whether there is more data forthcoming
101 : */
102 : bool getNext(artdaq::FragmentPtrs& newFrags);
103 :
104 : /**
105 : * \brief Get the number of Fragment IDs handled by this generator
106 : * \return
107 : */
108 : size_t numFragIDs() const;
109 :
110 : /**
111 : * \brief Send start signal to FragmentGenerator, if it's a CommandableFragmentGenerator
112 : * \param run Run number to pass to StartCmd
113 : * \param timeout Timeout to pass to StartCmd
114 : * \param timestamp Timestamp to pass to StartCmd
115 : */
116 3 : void start(int run, uint64_t timeout, uint64_t timestamp) const
117 : {
118 3 : auto gen_ptr = dynamic_cast<artdaq::CommandableFragmentGenerator*>(generator_.get());
119 3 : if (gen_ptr != nullptr)
120 : {
121 1 : gen_ptr->StartCmd(run, timeout, timestamp);
122 : }
123 3 : }
124 : /**
125 : * \brief Send stop signal to FragmentGenerator, if it's a CommandableFragmentGenerator
126 : * \param timeout Timeout to pass to StopCmd
127 : * \param timestamp Timestamp to pass to StopCmd
128 : */
129 3 : void stop(uint64_t timeout, uint64_t timestamp) const
130 : {
131 3 : auto gen_ptr = dynamic_cast<artdaq::CommandableFragmentGenerator*>(generator_.get());
132 3 : if (gen_ptr != nullptr)
133 : {
134 1 : gen_ptr->StopCmd(timeout, timestamp);
135 : }
136 3 : }
137 :
138 : private:
139 : bool generateFragments_();
140 :
141 : std::unique_ptr<artdaq::FragmentGenerator> generator_;
142 : size_t const numFragIDs_;
143 : std::map<artdaq::Fragment::fragment_id_t,
144 : std::deque<artdaq::FragmentPtr>>
145 : frags_;
146 : };
147 :
148 3 : ThrottledGenerator::
149 : ThrottledGenerator(std::string const& generator,
150 3 : fhicl::ParameterSet const& ps)
151 3 : : generator_(artdaq::makeFragmentGenerator(generator, ps))
152 3 : , numFragIDs_(generator_->fragmentIDs().size())
153 :
154 : {
155 3 : assert(generator_);
156 3 : }
157 :
158 60 : bool ThrottledGenerator::
159 : getNext(artdaq::FragmentPtrs& newFrags)
160 : {
161 60 : if ((!frags_.empty()) && (!frags_.begin()->second.empty()))
162 : { // Something stored.
163 100 : for (auto& fQp : frags_)
164 : {
165 70 : assert(fQp.second.size());
166 70 : newFrags.emplace_back(std::move(fQp.second.front()));
167 70 : fQp.second.pop_front();
168 : }
169 : }
170 : else
171 : { // Need fresh fragments.
172 30 : return generateFragments_() && getNext(newFrags);
173 : }
174 30 : return true;
175 : }
176 :
177 30 : bool ThrottledGenerator::
178 : generateFragments_()
179 : {
180 30 : artdaq::FragmentPtrs incomingFrags;
181 30 : bool result{false};
182 60 : while ((result = generator_->getNext(incomingFrags)) &&
183 30 : incomingFrags.empty())
184 : {
185 : }
186 100 : for (auto&& frag : incomingFrags)
187 : {
188 70 : frags_[frag->fragmentID()].emplace_back(std::move(frag));
189 : }
190 30 : return result;
191 30 : }
192 :
193 : size_t
194 3 : ThrottledGenerator::
195 : numFragIDs() const
196 : {
197 3 : return numFragIDs_;
198 : }
199 :
200 : // artdaq::FragmentGenerator &
201 : // ThrottledGenerator::
202 : // generator() const
203 : // {
204 : // return *generator_;
205 : // }
206 :
207 : /**
208 : * \brief Run the test, instantiating configured generators and an EventStore
209 : * \param pset ParameterSet used to configure genToArt
210 : * \return Art return code, of 15 if EventStore::endOfData fails
211 : *
212 : * \verbatim
213 : * genToArt accepts the following Parameters:
214 : * "reset_sequenceID" (Default: true): Set the sequence IDs on generated Fragment objects to the expected value
215 : * "genToArt" (REQUIRED): FHiCL table containing genToArt parameters
216 : * "fragment_receivers" (REQUIRED): List of FHiCL tables configuring the Fragment receivers
217 : * Each table should contain parameter "generator", the FragmentGenerator plugin to load, and any other parameters that generator requires
218 : * "event_builder" (Default: {}): ParameterSet for EventStore. See documentation for configuration parameters.
219 : * "run_number" (REQUIRED): Run number to use
220 : * "events_to_generate" (Default: -1): Number of events to generate
221 : *
222 : * \endverbatim
223 : */
224 3 : int process_data(fhicl::ParameterSet const& pset)
225 : {
226 3 : auto const gta_pset = pset.get<fhicl::ParameterSet>("genToArt");
227 :
228 : // Make the generators based on the configuration.
229 3 : std::vector<ThrottledGenerator> generators;
230 :
231 3 : auto const fr_pset = gta_pset.get<std::vector<fhicl::ParameterSet>>("fragment_receivers");
232 3 : generators.reserve(fr_pset.size());
233 6 : for (auto const& gen_ps : fr_pset)
234 : {
235 6 : generators.emplace_back(gen_ps.get<std::string>("generator"),
236 : gen_ps);
237 : }
238 :
239 3 : artdaq::FragmentPtrs frags;
240 9 : auto eb_pset = gta_pset.get<fhicl::ParameterSet>("event_builder", {});
241 3 : size_t expected_frags_per_event = 0;
242 6 : for (auto& gen : generators)
243 : {
244 3 : gen.start(1000, 0, 0);
245 3 : expected_frags_per_event += gen.numFragIDs();
246 : }
247 3 : eb_pset.put_or_replace<size_t>("expected_fragments_per_event", expected_frags_per_event);
248 :
249 3 : artdaq::SharedMemoryEventManager store(eb_pset, pset);
250 6 : store.startRun(gta_pset.get<int>("run_number", 1000));
251 :
252 : auto const events_to_generate =
253 6 : gta_pset.get<artdaq::Fragment::sequence_id_t>("events_to_generate", 0xFFFFFFFFFFFFFFFF);
254 6 : auto const reset_sequenceID = pset.get<bool>("reset_sequenceID", true);
255 3 : bool done = false;
256 33 : for (artdaq::Fragment::sequence_id_t event_count = 1;
257 33 : (events_to_generate == 0xFFFFFFFFFFFFFFFF || event_count <= events_to_generate) && (!done);
258 : ++event_count)
259 : {
260 60 : for (auto& gen : generators)
261 : {
262 30 : done |= !gen.getNext(frags);
263 : }
264 60 : TLOG(TLVL_DEBUG + 33) << "There are " << frags.size() << " Fragments in event " << event_count << ".";
265 30 : artdaq::Fragment::sequence_id_t current_sequence_id = -1;
266 100 : for (auto& val : frags)
267 : {
268 70 : if (reset_sequenceID)
269 : {
270 140 : TLOG(TLVL_DEBUG + 32) << "Setting fragment sequence id to " << event_count;
271 70 : val->setSequenceID(event_count);
272 : }
273 70 : if (current_sequence_id ==
274 : static_cast<artdaq::Fragment::sequence_id_t>(-1))
275 : {
276 30 : current_sequence_id = val->sequenceID();
277 : }
278 40 : else if (val->sequenceID() != current_sequence_id)
279 : {
280 0 : throw art::Exception(art::errors::DataCorruption) // NOLINT(cert-err60-cpp)
281 : << "Data corruption: apparently related fragments have "
282 : << " different sequence IDs: "
283 0 : << val->sequenceID()
284 : << " and "
285 : << current_sequence_id
286 0 : << ".\n";
287 : }
288 :
289 70 : auto start_time = std::chrono::steady_clock::now();
290 70 : bool sts = false;
291 70 : auto loop_count = 0;
292 140 : while (!sts)
293 : {
294 70 : artdaq::FragmentPtr tempFrag;
295 70 : sts = store.AddFragment(std::move(val), 1000000, tempFrag);
296 70 : if (!sts && event_count <= 10 && loop_count > 100)
297 : {
298 0 : TLOG(TLVL_ERROR) << "Fragment was not added after " << artdaq::TimeUtils::GetElapsedTime(start_time) << " s. Check art thread status!";
299 0 : store.endOfData();
300 0 : exit(1);
301 : }
302 70 : val = std::move(tempFrag);
303 70 : if (!sts)
304 : {
305 0 : loop_count++;
306 0 : usleep(10000);
307 : }
308 70 : }
309 : }
310 30 : frags.clear();
311 60 : TLOG(TLVL_DEBUG + 33) << "Event " << event_count << " END";
312 : }
313 6 : for (auto& gen : generators)
314 : {
315 3 : gen.stop(0, 0);
316 : }
317 :
318 3 : bool endSucceeded = store.endOfData();
319 3 : if (endSucceeded)
320 : {
321 3 : return 0;
322 : }
323 :
324 0 : return 15;
325 3 : }
326 : } // namespace
327 :
328 3 : int main(int argc, char* argv[])
329 : try
330 : {
331 3 : artdaq::configureMessageFacility("genToArt");
332 : // Command line handling.
333 3 : bpo::variables_map vm;
334 3 : auto result = process_cmd_line(argc, argv, vm);
335 3 : if (result != 0)
336 : {
337 0 : return (result);
338 : }
339 : // Read FHiCL configuration file.
340 3 : if (getenv("FHICL_FILE_PATH") == nullptr)
341 : {
342 0 : TLOG(TLVL_ERROR)
343 0 : << "INFO: environment variable FHICL_FILE_PATH was not set. Using \".\"\n";
344 0 : setenv("FHICL_FILE_PATH", ".", 0);
345 : }
346 6 : artdaq::SimpleLookupPolicy lookup_policy("FHICL_FILE_PATH");
347 3 : auto pset = fhicl::ParameterSet::make(vm["config"].as<std::string>(), lookup_policy);
348 3 : return process_data(pset);
349 3 : }
350 0 : catch (std::exception& x)
351 : {
352 0 : TLOG(TLVL_ERROR) << "Exception (type std::exception) caught in genToArt: " << x.what() << '\n';
353 0 : return 1;
354 0 : }
355 0 : catch (...)
356 : {
357 0 : return -1;
358 0 : }
|