Line data Source code
1 : //
2 : // artdaqDriver is a program for testing the behavior of the generic
3 : // RawInput source. Run 'artdaqDriver --help' to get a description of the
4 : // expected command-line parameters.
5 : //
6 : //
7 : // The current version generates simple data fragments, for testing
8 : // that data are transmitted without corruption from the
9 : // artdaq::Eventevent_manager through to the artdaq::RawInput source.
10 : //
11 : #include "TRACE/tracemf.h"
12 : #define TRACE_NAME "artdaqDriver"
13 : #include "artdaq-core/Data/Fragment.hh"
14 :
15 : #include "artdaq-core/Plugins/FragmentGenerator.hh"
16 : #include "artdaq-core/Plugins/makeFragmentGenerator.hh"
17 : #include "artdaq-core/Utilities/ExceptionHandler.hh"
18 : #include "artdaq-utilities/Plugins/MetricManager.hh"
19 : #include "artdaq/Application/LoadParameterSet.hh"
20 : #include "artdaq/ArtModules/detail/ArtConfig.hh"
21 : #include "artdaq/DAQdata/GenericFragmentSimulator.hh"
22 : #include "artdaq/DAQdata/Globals.hh"
23 : #include "artdaq/DAQrate/SharedMemoryEventManager.hh"
24 : #include "artdaq/Generators/makeCommandableFragmentGenerator.hh"
25 :
26 : #include "fhiclcpp/ParameterSet.h"
27 :
28 : #include <boost/program_options.hpp>
29 :
30 : #include <csignal>
31 : #include <iostream>
32 : #include <memory>
33 : #include <utility>
34 :
35 : volatile int events_to_generate;
36 0 : void sig_handler(int /*unused*/) { events_to_generate = -1; }
37 :
38 : template<typename B, typename D>
39 : std::unique_ptr<D>
40 : dynamic_unique_ptr_cast(std::unique_ptr<B>& p);
41 :
42 0 : int main(int argc, char* argv[])
43 : try
44 : {
45 : struct Config
46 : {
47 : fhicl::Atom<int> run_number{fhicl::Name{"run_number"}, fhicl::Comment{"Run number to use for output file"}, 1};
48 : fhicl::Atom<bool> debug_cout{fhicl::Name{"debug_cout"}, fhicl::Comment{"Whether to print debug messages to console"}, false};
49 : fhicl::Atom<uint64_t> transition_timeout{fhicl::Name{"transition_timeout"}, fhicl::Comment{"Timeout to use (in seconds) for automatic transitions"}, 30};
50 : fhicl::Table<artdaq::CommandableFragmentGenerator::Config> generator{fhicl::Name{"fragment_receiver"}};
51 : fhicl::Table<artdaq::MetricManager::Config> metrics{fhicl::Name{"metrics"}};
52 : fhicl::Table<artdaq::SharedMemoryEventManager::Config> event_builder{fhicl::Name{"event_builder"}};
53 : fhicl::Atom<int> events_to_generate{fhicl::Name{"events_to_generate"}, fhicl::Comment{"Number of events to generate and process"}, 0};
54 : fhicl::TableFragment<art::Config> art_config;
55 : };
56 0 : auto pset = LoadParameterSet<Config>(argc, argv, "driver", "The artdaqDriver executable runs a Fragment Generator and an art process, acting as a \"unit integration\" test for a data source");
57 :
58 0 : int run = pset.get<int>("run_number", 1);
59 0 : bool debug = pset.get<bool>("debug_cout", false);
60 0 : auto timeout = pset.get<uint64_t>("transition_timeout", 30);
61 0 : uint64_t timestamp = 0;
62 :
63 0 : app_name = "artdaqDriver";
64 :
65 0 : artdaq::configureMessageFacility(app_name.c_str(), true, debug);
66 :
67 0 : auto fragment_receiver_pset = pset.get<fhicl::ParameterSet>("fragment_receiver");
68 :
69 : std::unique_ptr<artdaq::FragmentGenerator>
70 0 : gen(artdaq::makeFragmentGenerator(fragment_receiver_pset.get<std::string>("generator"),
71 0 : fragment_receiver_pset));
72 :
73 : std::unique_ptr<artdaq::CommandableFragmentGenerator> commandable_gen =
74 0 : dynamic_unique_ptr_cast<artdaq::FragmentGenerator, artdaq::CommandableFragmentGenerator>(gen);
75 :
76 0 : my_rank = 0;
77 : // pull out the Metric part of the ParameterSet
78 0 : fhicl::ParameterSet metric_pset;
79 : try
80 : {
81 0 : metric_pset = pset.get<fhicl::ParameterSet>("metrics");
82 : }
83 0 : catch (...)
84 0 : {} // OK if there's no metrics table defined in the FHiCL
85 :
86 0 : if (metric_pset.is_empty())
87 : {
88 0 : TLOG(TLVL_INFO) << "No metric plugins appear to be defined";
89 : }
90 : try
91 : {
92 0 : metricMan->initialize(metric_pset, app_name);
93 0 : metricMan->do_start();
94 : }
95 0 : catch (...)
96 : {
97 0 : }
98 0 : artdaq::FragmentPtrs frags;
99 : //////////////////////////////////////////////////////////////////////
100 : // Note: we are constrained to doing all this here rather than
101 : // encapsulated neatly in a function due to the lifetime issues
102 : // associated with async threads and std::string::c_str().
103 0 : auto event_builder_pset = pset.get<fhicl::ParameterSet>("event_builder");
104 0 : auto art_pset = pset.get<fhicl::ParameterSet>("art", pset);
105 :
106 0 : artdaq::SharedMemoryEventManager event_manager(event_builder_pset, art_pset);
107 : //////////////////////////////////////////////////////////////////////
108 :
109 0 : int events_to_generate = pset.get<int>("events_to_generate", 0);
110 0 : int event_count = 0;
111 0 : artdaq::Fragment::sequence_id_t previous_sequence_id = -1;
112 :
113 0 : if (commandable_gen)
114 : {
115 0 : commandable_gen->StartCmd(run, timeout, timestamp);
116 : }
117 :
118 0 : TLOG(50) << "driver main before event_manager.startRun";
119 0 : event_manager.startRun(run);
120 :
121 : // Read or generate fragments as rapidly as possible, and feed them
122 : // into the Eventevent_manager. The throughput resulting from this design
123 : // choice is likely to have the fragment reading (or generation)
124 : // speed as the limiting factor
125 0 : while ((commandable_gen && commandable_gen->getNext(frags)) ||
126 0 : (gen && gen->getNext(frags)))
127 : {
128 0 : TLOG(50) << "driver main: getNext returned frags.size()=" << frags.size() << " current event_count=" << event_count;
129 0 : for (auto& val : frags)
130 : {
131 0 : if (val->sequenceID() != previous_sequence_id)
132 : {
133 0 : ++event_count;
134 0 : previous_sequence_id = val->sequenceID();
135 : }
136 0 : if (events_to_generate != 0 && event_count > events_to_generate)
137 : {
138 0 : if (commandable_gen)
139 : {
140 0 : commandable_gen->StopCmd(timeout, timestamp);
141 : }
142 0 : break;
143 : }
144 :
145 0 : auto start_time = std::chrono::steady_clock::now();
146 0 : bool sts = false;
147 0 : auto loop_count = 0;
148 0 : while (!sts)
149 : {
150 0 : artdaq::FragmentPtr tempFrag;
151 0 : sts = event_manager.AddFragment(std::move(val), 1000000, tempFrag);
152 0 : if (!sts && event_count <= 10 && loop_count > 100)
153 : {
154 0 : TLOG(TLVL_ERROR) << "Fragment was not added after " << artdaq::TimeUtils::GetElapsedTime(start_time) << " s. Check art thread status!";
155 0 : event_manager.endOfData();
156 0 : exit(1);
157 : }
158 0 : val = std::move(tempFrag);
159 0 : if (!sts)
160 : {
161 0 : loop_count++;
162 : // usleep(10000);
163 : }
164 0 : }
165 : }
166 0 : frags.clear();
167 :
168 0 : if (events_to_generate != 0 && event_count >= events_to_generate)
169 : {
170 0 : if (commandable_gen)
171 : {
172 0 : commandable_gen->StopCmd(timeout, timestamp);
173 : }
174 0 : break;
175 : }
176 : }
177 :
178 0 : if (commandable_gen)
179 : {
180 0 : commandable_gen->joinThreads();
181 : }
182 :
183 : #if 0
184 : volatile bool keep_looping = true;
185 : while (keep_looping)
186 : {
187 : usleep(10000);
188 : }
189 : #endif
190 :
191 0 : TLOG(TLVL_INFO) << "Fragments generated, waiting for art to process them.";
192 0 : auto art_wait_start_time = std::chrono::steady_clock::now();
193 0 : auto last_delta_time = std::chrono::steady_clock::now();
194 0 : auto last_count = event_manager.size() - event_manager.WriteReadyCount(false);
195 :
196 0 : while (last_count > 0 && artdaq::TimeUtils::GetElapsedTime(last_delta_time) < 5.0)
197 : {
198 0 : auto this_count = event_manager.size() - event_manager.WriteReadyCount(false);
199 0 : if (this_count != last_count)
200 : {
201 0 : last_delta_time = std::chrono::steady_clock::now();
202 0 : last_count = this_count;
203 : }
204 0 : usleep(1000);
205 : }
206 :
207 0 : TLOG(TLVL_INFO) << "Ending Run, waited " << std::setprecision(2) << artdaq::TimeUtils::GetElapsedTime(art_wait_start_time) << " seconds for art to process events. (" << last_count << " buffers remain).";
208 0 : event_manager.endRun();
209 0 : usleep(artdaq::TimeUtils::GetElapsedTimeMicroseconds(art_wait_start_time)); // Wait as long again for EndRun message to go through
210 :
211 0 : TLOG(TLVL_INFO) << "Shutting down art";
212 0 : bool endSucceeded = false;
213 0 : int attemptsToEnd = 1;
214 0 : endSucceeded = event_manager.endOfData();
215 0 : while (!endSucceeded && attemptsToEnd < 3)
216 : {
217 0 : ++attemptsToEnd;
218 0 : endSucceeded = event_manager.endOfData();
219 : }
220 0 : if (!endSucceeded)
221 : {
222 0 : TLOG(TLVL_ERROR) << "Failed to shut down the reader and the SharedMemoryEventManager "
223 0 : << "because the endOfData marker could not be pushed "
224 0 : << "onto the queue.";
225 : }
226 :
227 0 : metricMan->do_stop();
228 :
229 0 : artdaq::Globals::CleanUpGlobals();
230 0 : return 0;
231 0 : }
232 0 : catch (std::string& x)
233 : {
234 0 : std::cerr << "Exception (type string) caught in artdaqDriver: " << x << '\n';
235 0 : return 1;
236 0 : }
237 0 : catch (char const* m)
238 : {
239 0 : std::cerr << "Exception (type char const*) caught in artdaqDriver: ";
240 0 : if (m != nullptr)
241 : {
242 0 : std::cerr << m;
243 : }
244 : else
245 : {
246 0 : std::cerr << "[the value was a null pointer, so no message is available]";
247 : }
248 0 : std::cerr << '\n';
249 0 : }
250 0 : catch (...)
251 : {
252 0 : artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no,
253 : "Exception caught in artdaqDriver");
254 0 : exit(-2);
255 0 : }
256 :
257 : template<typename B, typename D>
258 : std::unique_ptr<D>
259 0 : dynamic_unique_ptr_cast(std::unique_ptr<B>& p)
260 : {
261 0 : D* result = dynamic_cast<D*>(p.release());
262 :
263 0 : if (result)
264 : {
265 0 : return std::unique_ptr<D>(result);
266 : }
267 0 : return nullptr;
268 : }
|