Line data Source code
1 : #include "TRACE/tracemf.h"
2 : #include "artdaq/DAQdata/Globals.hh" // include these 2 first -
3 : #define TRACE_NAME (app_name + "_DispatcherCore").c_str()
4 :
5 : #include "artdaq/Application/DispatcherCore.hh"
6 :
7 : #include "fhiclcpp/ParameterSet.h"
8 :
9 : #include <boost/algorithm/string.hpp>
10 : #include <boost/exception/all.hpp>
11 : #include <boost/throw_exception.hpp>
12 : #include <boost/tokenizer.hpp>
13 :
14 : #include <bitset>
15 : #include <cerrno>
16 : #include <csignal>
17 : #include <iomanip>
18 : #include <set>
19 : #include <sstream>
20 : #include <string>
21 : #include <unordered_map>
22 : #include <utility>
23 : #include <vector>
24 :
25 0 : bool artdaq::DispatcherCore::initialize(fhicl::ParameterSet const& pset)
26 : {
27 0 : TLOG(TLVL_DEBUG + 32) << "initialize method called with DAQ "
28 0 : << "ParameterSet = \"" << pset.to_string() << "\".";
29 :
30 0 : pset_ = pset;
31 : // 04-Apr-2019, KAB: added support for art config params to be in an "art" block
32 0 : if (pset_.has_key("art"))
33 : {
34 0 : pset_ = pset_.get<fhicl::ParameterSet>("art");
35 : }
36 0 : pset_.erase("outputs");
37 0 : pset_.erase("physics");
38 0 : pset_.erase("daq");
39 :
40 0 : TLOG(TLVL_DEBUG + 32) << "Pieces of the input pset that are saved for later: \"" << pset_.to_string() << "\".";
41 :
42 : // pull out the relevant parts of the ParameterSet
43 0 : fhicl::ParameterSet daq_pset;
44 : try
45 : {
46 0 : daq_pset = pset.get<fhicl::ParameterSet>("daq");
47 : }
48 0 : catch (...)
49 : {
50 0 : TLOG(TLVL_ERROR)
51 0 : << "Unable to find the DAQ parameters in the initialization "
52 0 : << "ParameterSet: \"" + pset.to_string() + "\".";
53 0 : return false;
54 0 : }
55 0 : fhicl::ParameterSet agg_pset;
56 : try
57 : {
58 0 : agg_pset = daq_pset.get<fhicl::ParameterSet>("dispatcher", daq_pset.get<fhicl::ParameterSet>("aggregator"));
59 : }
60 0 : catch (...)
61 : {
62 0 : TLOG(TLVL_ERROR)
63 0 : << "Unable to find the Dispatcher parameters in the DAQ "
64 0 : << "initialization ParameterSet: \"" + daq_pset.to_string() + "\".";
65 0 : return false;
66 0 : }
67 :
68 0 : broadcast_mode_ = agg_pset.get<bool>("broadcast_mode", true);
69 0 : allow_label_overwrites_ = agg_pset.get<bool>("allow_label_overwrites", true);
70 0 : if (broadcast_mode_ && !agg_pset.has_key("broadcast_mode"))
71 : {
72 0 : agg_pset.put<bool>("broadcast_mode", true);
73 0 : if (!agg_pset.has_key("non_reliable_mode"))
74 : {
75 0 : agg_pset.put<bool>("non_reliable_mode", true);
76 : }
77 0 : if (!agg_pset.has_key("non_reliable_mode_retry_count"))
78 : {
79 0 : agg_pset.put<int>("non_reliable_mode_retry_count", 2);
80 : }
81 : }
82 :
83 0 : agg_pset.erase("restart_crashed_art_processes");
84 0 : agg_pset.put<bool>("restart_crashed_art_processes", false);
85 :
86 0 : agg_pset.erase("art_analyzer_count");
87 0 : agg_pset.put<int>("art_analyzer_count", 0);
88 :
89 : // initialize the MetricManager and the names of our metrics
90 0 : fhicl::ParameterSet metric_pset = daq_pset.get<fhicl::ParameterSet>("metrics", fhicl::ParameterSet());
91 :
92 0 : return initializeDataReceiver(pset, agg_pset, metric_pset);
93 0 : }
94 :
95 0 : std::string artdaq::DispatcherCore::register_monitor(fhicl::ParameterSet const& pset)
96 : {
97 0 : TLOG(TLVL_DEBUG + 32) << "DispatcherCore::register_monitor called with argument \"" << pset.to_string() << "\"";
98 0 : check_filters_();
99 :
100 0 : if (stop_requested_.load())
101 : {
102 0 : TLOG(TLVL_WARNING) << "DispatcherCore::register_monitor called after stop. NOT registering new monitor!";
103 0 : return "Stopped";
104 : }
105 :
106 : try
107 : {
108 0 : TLOG(TLVL_DEBUG + 32) << "Getting unique_label from input ParameterSet";
109 0 : auto const& label = pset.get<std::string>("unique_label");
110 0 : TLOG(TLVL_DEBUG + 32) << "Unique label is " << label;
111 0 : TLOG(TLVL_INFO) << "Registering monitor with unique_label \"" << label << "\"";
112 :
113 0 : bool overwriting_label = false;
114 :
115 : {
116 0 : std::lock_guard<std::mutex> lock(dispatcher_transfers_mutex_);
117 0 : if (registered_monitors_.count(label) != 0u)
118 : {
119 0 : if (allow_label_overwrites_)
120 : {
121 0 : TLOG(TLVL_WARNING) << "Restarting companion art process with label " << label << "!";
122 0 : overwriting_label = true;
123 : }
124 : else
125 : {
126 0 : throw cet::exception("DispatcherCore") << "Unique label already exists!"; // NOLINT(cert-err60-cpp)
127 : }
128 : }
129 :
130 0 : registered_monitors_[label] = pset;
131 0 : }
132 :
133 0 : if (overwriting_label)
134 : {
135 : // ELF, Jul 21, 2020: This can take a long time, and we don't want to block the XMLRPC thread
136 0 : boost::thread thread([this, label] { restart_art_process_(label); });
137 0 : thread.detach();
138 0 : }
139 : else
140 : {
141 : // ELF, Jul 21, 2020: This can take a long time, and we don't want to block the XMLRPC thread
142 0 : boost::thread thread([this, label] { start_art_process_(label); });
143 0 : thread.detach();
144 0 : }
145 0 : }
146 0 : catch (const cet::exception& e)
147 : {
148 0 : std::stringstream errmsg;
149 0 : errmsg << "Unable to create a Transfer plugin with the FHiCL code \"" << pset.to_string() << "\", a new monitor has not been registered" << std::endl;
150 0 : errmsg << "Exception: " << e.what();
151 0 : TLOG(TLVL_ERROR) << errmsg.str();
152 0 : return errmsg.str();
153 0 : }
154 0 : catch (const boost::exception& e)
155 : {
156 0 : std::stringstream errmsg;
157 0 : errmsg << "Unable to create a Transfer plugin with the FHiCL code \"" << pset.to_string() << "\", a new monitor has not been registered" << std::endl;
158 0 : errmsg << "Exception: " << boost::diagnostic_information(e);
159 0 : TLOG(TLVL_ERROR) << errmsg.str();
160 0 : return errmsg.str();
161 0 : }
162 0 : catch (const std::exception& e)
163 : {
164 0 : std::stringstream errmsg;
165 0 : errmsg << "Unable to create a Transfer plugin with the FHiCL code \"" << pset.to_string() << "\", a new monitor has not been registered" << std::endl;
166 0 : errmsg << "Exception: " << e.what();
167 0 : TLOG(TLVL_ERROR) << errmsg.str();
168 0 : return errmsg.str();
169 0 : }
170 0 : catch (...)
171 : {
172 0 : std::stringstream errmsg;
173 0 : errmsg << "Unable to create a Transfer plugin with the FHiCL code \"" << pset.to_string() << "\", a new monitor has not been registered";
174 0 : TLOG(TLVL_ERROR) << errmsg.str();
175 0 : return errmsg.str();
176 0 : }
177 :
178 0 : TLOG(TLVL_DEBUG + 32) << "Successfully registered monitor";
179 0 : return "Success";
180 : }
181 :
182 0 : std::string artdaq::DispatcherCore::unregister_monitor(std::string const& label)
183 : {
184 0 : TLOG(TLVL_INFO) << "DispatcherCore::unregister_monitor called for unique_label \"" << label << "\"";
185 0 : check_filters_();
186 :
187 : try
188 : {
189 : {
190 0 : std::lock_guard<std::mutex> lock(dispatcher_transfers_mutex_);
191 0 : if (registered_monitors_.count(label) == 0)
192 : {
193 0 : std::stringstream errmsg;
194 : errmsg << "Warning in DispatcherCore::unregister_monitor: unable to find requested transfer plugin with "
195 0 : << "label \"" << label << "\"";
196 0 : TLOG(TLVL_WARNING) << errmsg.str();
197 0 : return errmsg.str();
198 0 : }
199 :
200 0 : registered_monitors_.erase(label);
201 0 : }
202 :
203 : // ELF, Jul 21, 2020: This can take a long time, and we don't want to block the XMLRPC thread
204 0 : boost::thread thread([this, label] { stop_art_process_(label); });
205 0 : thread.detach();
206 0 : }
207 0 : catch (...)
208 : {
209 0 : std::stringstream errmsg;
210 0 : errmsg << "Unable to unregister transfer plugin with label \"" << label << "\"";
211 0 : TLOG(TLVL_ERROR) << errmsg.str();
212 0 : return errmsg.str();
213 0 : }
214 :
215 0 : TLOG(TLVL_DEBUG + 32) << "unregister_monitor completed successfully";
216 0 : return "Success";
217 : }
218 :
219 0 : fhicl::ParameterSet artdaq::DispatcherCore::merge_parameter_sets_(fhicl::ParameterSet const& skel, const std::string& label, const fhicl::ParameterSet& pset)
220 : {
221 0 : fhicl::ParameterSet generated_pset(skel);
222 0 : fhicl::ParameterSet generated_outputs;
223 0 : fhicl::ParameterSet generated_physics;
224 0 : fhicl::ParameterSet generated_physics_analyzers;
225 0 : fhicl::ParameterSet generated_physics_producers;
226 0 : fhicl::ParameterSet generated_physics_filters;
227 0 : std::unordered_map<std::string, std::vector<std::string>> generated_physics_filter_paths;
228 :
229 0 : TLOG(TLVL_DEBUG + 32) << "merge_parameter_sets_: Generating fhicl for monitor " << label;
230 :
231 : try
232 : {
233 0 : auto path = pset.get<std::vector<std::string>>("path");
234 :
235 0 : auto filters = pset.get<std::vector<fhicl::ParameterSet>>("filter_paths", std::vector<fhicl::ParameterSet>());
236 0 : for (auto& filter : filters)
237 : {
238 : try
239 : {
240 0 : auto name = filter.get<std::string>("name");
241 0 : auto path = filter.get<std::vector<std::string>>("path");
242 0 : if (generated_physics_filter_paths.count(name) != 0u)
243 : {
244 0 : bool matched = generated_physics_filter_paths[name].size() == path.size();
245 0 : for (size_t ii = 0; matched && ii < generated_physics_filter_paths[name].size(); ++ii)
246 : {
247 0 : matched = matched && path[ii] == generated_physics_filter_paths[name][ii];
248 : }
249 :
250 0 : if (matched)
251 : {
252 : // Path is already configured
253 0 : continue;
254 : }
255 :
256 0 : auto newname = label + name;
257 0 : generated_physics_filter_paths[newname] = path;
258 0 : }
259 : else
260 : {
261 0 : generated_physics_filter_paths[name] = path;
262 : }
263 0 : }
264 0 : catch (...)
265 : {
266 0 : }
267 : }
268 :
269 : // outputs section
270 0 : auto outputs = pset.get<fhicl::ParameterSet>("outputs");
271 0 : if (outputs.get_pset_names().size() > 1 || outputs.get_pset_names().empty())
272 : {
273 : // Only one output allowed per monitor
274 : }
275 0 : auto output_name = outputs.get_pset_names()[0];
276 0 : auto output_pset = outputs.get<fhicl::ParameterSet>(output_name);
277 0 : generated_outputs.put(label + output_name, output_pset);
278 0 : bool outputInPath = false;
279 0 : for (auto& ii : path)
280 : {
281 0 : if (ii == output_name)
282 : {
283 0 : ii = label + output_name;
284 0 : outputInPath = true;
285 : }
286 : }
287 0 : if (!outputInPath)
288 : {
289 0 : path.push_back(label + output_name);
290 : }
291 :
292 : // physics section
293 0 : auto physics_pset = pset.get<fhicl::ParameterSet>("physics");
294 :
295 0 : if (physics_pset.has_key("analyzers"))
296 : {
297 0 : auto analyzers = physics_pset.get<fhicl::ParameterSet>("analyzers");
298 0 : for (const auto& key : analyzers.get_pset_names())
299 : {
300 0 : if (generated_physics_analyzers.has_key(key) && analyzers.get<fhicl::ParameterSet>(key) == generated_physics_analyzers.get<fhicl::ParameterSet>(key))
301 : {
302 : // Module is already configured
303 0 : continue;
304 : }
305 0 : if (generated_physics_analyzers.has_key(key))
306 : {
307 : // Module already exists with name, rename
308 0 : auto newkey = label + key;
309 0 : generated_physics_analyzers.put<fhicl::ParameterSet>(newkey, analyzers.get<fhicl::ParameterSet>(key));
310 0 : for (auto& ii : path)
311 : {
312 0 : if (ii == key)
313 : {
314 0 : ii = newkey;
315 : }
316 : }
317 0 : }
318 : else
319 : {
320 0 : generated_physics_analyzers.put<fhicl::ParameterSet>(key, analyzers.get<fhicl::ParameterSet>(key));
321 : }
322 0 : }
323 0 : }
324 0 : if (physics_pset.has_key("producers"))
325 : {
326 0 : auto producers = physics_pset.get<fhicl::ParameterSet>("producers");
327 0 : for (const auto& key : producers.get_pset_names())
328 : {
329 0 : if (generated_physics_producers.has_key(key) && producers.get<fhicl::ParameterSet>(key) == generated_physics_producers.get<fhicl::ParameterSet>(key))
330 : {
331 : // Module is already configured
332 0 : continue;
333 : }
334 0 : if (generated_physics_producers.has_key(key))
335 : {
336 : // Module already exists with name, rename
337 0 : auto newkey = label + key;
338 0 : generated_physics_producers.put<fhicl::ParameterSet>(newkey, producers.get<fhicl::ParameterSet>(key));
339 0 : for (auto& ii : path)
340 : {
341 0 : if (ii == key)
342 : {
343 0 : ii = newkey;
344 : }
345 : }
346 0 : }
347 : else
348 : {
349 0 : generated_physics_producers.put<fhicl::ParameterSet>(key, producers.get<fhicl::ParameterSet>(key));
350 : }
351 0 : }
352 0 : }
353 0 : if (physics_pset.has_key("filters"))
354 : {
355 0 : auto filters = physics_pset.get<fhicl::ParameterSet>("filters");
356 0 : for (const auto& key : filters.get_pset_names())
357 : {
358 0 : if (generated_physics_filters.has_key(key) && filters.get<fhicl::ParameterSet>(key) == generated_physics_filters.get<fhicl::ParameterSet>(key))
359 : {
360 : // Module is already configured
361 0 : continue;
362 : }
363 0 : if (generated_physics_filters.has_key(key))
364 : {
365 : // Module already exists with name, rename
366 0 : auto newkey = label + key;
367 0 : generated_physics_filters.put<fhicl::ParameterSet>(newkey, filters.get<fhicl::ParameterSet>(key));
368 0 : for (auto& ii : path)
369 : {
370 0 : if (ii == key)
371 : {
372 0 : ii = newkey;
373 : }
374 : }
375 0 : }
376 : else
377 : {
378 0 : generated_physics_filters.put<fhicl::ParameterSet>(key, filters.get<fhicl::ParameterSet>(key));
379 : }
380 0 : }
381 0 : }
382 0 : generated_physics.put<std::vector<std::string>>(label, path);
383 0 : }
384 0 : catch (cet::exception& e)
385 : {
386 : // Error in parsing input fhicl
387 0 : TLOG(TLVL_ERROR) << "merge_parameter_sets_: Error processing input fhicl: " << e.what();
388 0 : }
389 :
390 0 : TLOG(TLVL_DEBUG + 32) << "merge_parameter_sets_: Building final ParameterSet";
391 0 : generated_pset.put("outputs", generated_outputs);
392 :
393 0 : generated_physics.put("analyzers", generated_physics_analyzers);
394 0 : generated_physics.put("producers", generated_physics_producers);
395 0 : generated_physics.put("filters", generated_physics_filters);
396 :
397 0 : for (auto& path : generated_physics_filter_paths)
398 : {
399 0 : generated_physics.put(path.first, path.second);
400 : }
401 :
402 0 : generated_pset.put("physics", generated_physics);
403 :
404 0 : return generated_pset;
405 0 : }
406 :
407 0 : fhicl::ParameterSet artdaq::DispatcherCore::generate_filter_fhicl_()
408 : {
409 0 : TLOG(TLVL_DEBUG + 32) << "generate_filter_fhicl_ BEGIN";
410 0 : std::lock_guard<std::mutex> lock(dispatcher_transfers_mutex_);
411 0 : fhicl::ParameterSet generated_pset = pset_;
412 :
413 0 : for (auto& monitor : registered_monitors_)
414 : {
415 0 : auto label = monitor.first;
416 0 : auto pset = monitor.second;
417 0 : generated_pset = merge_parameter_sets_(generated_pset, label, pset);
418 0 : }
419 :
420 0 : TLOG(TLVL_DEBUG + 32) << "generate_filter_fhicl_ returning ParameterSet: " << generated_pset.to_string();
421 0 : return generated_pset;
422 0 : }
423 :
424 0 : void artdaq::DispatcherCore::check_filters_()
425 : {
426 0 : std::lock_guard<std::mutex> lock(dispatcher_transfers_mutex_);
427 0 : auto it = registered_monitors_.begin();
428 0 : while (it != registered_monitors_.end())
429 : {
430 0 : if (registered_monitor_pids_.count(it->first) != 0u)
431 : {
432 0 : if (!event_store_ptr_)
433 : {
434 0 : registered_monitor_pids_.erase(it->first);
435 : }
436 : else
437 : {
438 0 : auto pid = registered_monitor_pids_[it->first];
439 0 : auto sts = kill(pid, 0);
440 0 : if (sts < 0)
441 : {
442 0 : registered_monitor_pids_.erase(it->first);
443 : }
444 : }
445 : }
446 0 : ++it;
447 : }
448 0 : }
449 :
450 0 : void artdaq::DispatcherCore::restart_art_process_(std::string const& label)
451 : {
452 0 : stop_art_process_(label);
453 0 : start_art_process_(label);
454 0 : }
455 :
456 0 : void artdaq::DispatcherCore::start_art_process_(std::string const& label)
457 : {
458 0 : if (event_store_ptr_ != nullptr)
459 : {
460 0 : if (broadcast_mode_)
461 : {
462 0 : fhicl::ParameterSet pset;
463 : {
464 0 : std::lock_guard<std::mutex> lock(dispatcher_transfers_mutex_);
465 0 : pset = registered_monitors_[label];
466 0 : }
467 0 : fhicl::ParameterSet ps = merge_parameter_sets_(pset_, label, pset);
468 0 : TLOG(TLVL_DEBUG + 32) << "Starting art process with received fhicl";
469 0 : auto pid = event_store_ptr_->StartArtProcess(ps, registered_monitors_.size() - 1);
470 : {
471 0 : std::lock_guard<std::mutex> lock(dispatcher_transfers_mutex_);
472 0 : registered_monitor_pids_[label] = pid;
473 0 : }
474 0 : }
475 : else
476 : {
477 0 : TLOG(TLVL_DEBUG + 32) << "Calling ReconfigureArt";
478 0 : fhicl::ParameterSet generated_fhicl;
479 : {
480 0 : std::lock_guard<std::mutex> lock(dispatcher_transfers_mutex_);
481 0 : generated_fhicl = generate_filter_fhicl_();
482 0 : }
483 0 : event_store_ptr_->ReconfigureArt(generated_fhicl);
484 0 : TLOG(TLVL_DEBUG + 32) << "Done with ReconfigureArt";
485 0 : }
486 : }
487 : else
488 : {
489 0 : TLOG(TLVL_ERROR) << "Unable to add monitor as there is no SharedMemoryEventManager instance!";
490 : }
491 0 : }
492 :
493 0 : void artdaq::DispatcherCore::stop_art_process_(std::string const& label)
494 : {
495 0 : if (event_store_ptr_ != nullptr)
496 : {
497 0 : if (broadcast_mode_)
498 : {
499 0 : if (registered_monitor_pids_.count(label) != 0u)
500 : {
501 0 : std::set<pid_t> pids;
502 : {
503 0 : std::lock_guard<std::mutex> lock(dispatcher_transfers_mutex_);
504 0 : pids.insert(registered_monitor_pids_[label]);
505 0 : registered_monitor_pids_.erase(label);
506 0 : }
507 0 : TLOG(TLVL_DEBUG + 32) << "Calling ShutdownArtProcesses";
508 0 : event_store_ptr_->ShutdownArtProcesses(pids);
509 0 : TLOG(TLVL_DEBUG + 32) << "Done with ShutdownArtProcesses";
510 0 : }
511 : }
512 : else
513 : {
514 0 : TLOG(TLVL_DEBUG + 32) << "Calling ReconfigureArt";
515 0 : fhicl::ParameterSet generated_fhicl;
516 : {
517 0 : std::lock_guard<std::mutex> lock(dispatcher_transfers_mutex_);
518 0 : generated_fhicl = generate_filter_fhicl_();
519 0 : }
520 0 : event_store_ptr_->ReconfigureArt(generated_fhicl);
521 0 : TLOG(TLVL_DEBUG + 32) << "Done with ReconfigureArt";
522 0 : }
523 : }
524 0 : }
|