otsdaq  3.03.00
ARTDAQSupervisor.cc
1 
2 
3 #define TRACEMF_USE_VERBATIM 1 // for trace longer path filenames
4 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisor.hh"
5 
6 #include "artdaq-core/Utilities/configureMessageFacility.hh"
7 #include "artdaq/BuildInfo/GetPackageBuildInfo.hh"
8 #include "artdaq/DAQdata/Globals.hh"
9 #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
10 #include "cetlib_except/exception.h"
11 #include "fhiclcpp/make_ParameterSet.h"
12 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisorTRACEController.h"
13 
14 #include "artdaq-core/Utilities/ExceptionHandler.hh" /*for artdaq::ExceptionHandler*/
15 
16 #include <boost/exception/all.hpp>
17 #include <boost/filesystem.hpp>
18 
19 #include <signal.h>
20 #include <regex>
21 
22 using namespace ots;
23 
24 XDAQ_INSTANTIATOR_IMPL(ARTDAQSupervisor)
25 
26 #define FAKE_CONFIG_NAME "ots_config"
27 #define DAQINTERFACE_PORT \
28  std::atoi(__ENV__("ARTDAQ_BASE_PORT")) + \
29  (partition_ * std::atoi(__ENV__("ARTDAQ_PORTS_PER_PARTITION")))
30 
31 static ARTDAQSupervisor* instance = nullptr;
32 static std::unordered_map<int, struct sigaction> old_actions =
33  std::unordered_map<int, struct sigaction>();
34 static bool sighandler_init = false;
35 static void signal_handler(int signum)
36 {
37  // Messagefacility may already be gone at this point, TRACE ONLY!
38 #if TRACE_REVNUM < 1459
39  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
40 #else
41  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
42 #endif
43  << "A signal of type " << signum
44  << " was caught by ARTDAQSupervisor. Shutting down DAQInterface, "
45  "then proceeding with default handlers!";
46 
47  if(instance)
48  instance->destroy();
49 
50  sigset_t set;
51  pthread_sigmask(SIG_UNBLOCK, NULL, &set);
52  pthread_sigmask(SIG_UNBLOCK, &set, NULL);
53 
54 #if TRACE_REVNUM < 1459
55  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
56 #else
57  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
58 #endif
59  << "Calling default signal handler";
60  if(signum != SIGUSR2)
61  {
62  sigaction(signum, &old_actions[signum], NULL);
63  kill(getpid(), signum); // Only send signal to self
64  }
65  else
66  {
67  // Send Interrupt signal if parsing SIGUSR2 (i.e. user-defined exception that
68  // should tear down ARTDAQ)
69  sigaction(SIGINT, &old_actions[SIGINT], NULL);
70  kill(getpid(), SIGINT); // Only send signal to self
71  }
72 }
73 
74 static void init_sighandler(ARTDAQSupervisor* inst)
75 {
76  static std::mutex sighandler_mutex;
77  std::unique_lock<std::mutex> lk(sighandler_mutex);
78 
79  if(!sighandler_init)
80  {
81  sighandler_init = true;
82  instance = inst;
83  std::vector<int> signals = {
84  SIGINT,
85  SIGILL,
86  SIGABRT,
87  SIGFPE,
88  SIGSEGV,
89  SIGPIPE,
90  SIGALRM,
91  SIGTERM,
92  SIGUSR2,
93  SIGHUP}; // SIGQUIT is used by art in normal operation
94  for(auto signal : signals)
95  {
96  struct sigaction old_action;
97  sigaction(signal, NULL, &old_action);
98 
99  // If the old handler wasn't SIG_IGN (it's a handler that just
100  // "ignore" the signal)
101  if(old_action.sa_handler != SIG_IGN)
102  {
103  struct sigaction action;
104  action.sa_handler = signal_handler;
105  sigemptyset(&action.sa_mask);
106  for(auto sigblk : signals)
107  {
108  sigaddset(&action.sa_mask, sigblk);
109  }
110  action.sa_flags = 0;
111 
112  // Replace the signal handler of SIGINT with the one described by
113  // new_action
114  sigaction(signal, &action, NULL);
115  old_actions[signal] = old_action;
116  }
117  }
118  }
119 }
120 
121 //==============================================================================
122 ARTDAQSupervisor::ARTDAQSupervisor(xdaq::ApplicationStub* stub)
123  : CoreSupervisorBase(stub)
124  , daqinterface_ptr_(NULL)
125  , partition_(getSupervisorProperty("partition", 0))
126  , daqinterface_state_("notrunning")
127  , runner_thread_(nullptr)
128 {
129  __SUP_COUT__ << "Constructor." << __E__;
130 
131  INIT_MF("." /*directory used is USER_DATA/LOG/.*/);
132  init_sighandler(this);
133 
134  // Only use system Python
135  // unsetenv("PYTHONPATH");
136  // unsetenv("PYTHONHOME");
137 
138  // Write out settings file
139  auto settings_file = __ENV__("DAQINTERFACE_SETTINGS");
140  std::ofstream o(settings_file, std::ios::trunc);
141 
142  setenv("DAQINTERFACE_PARTITION_NUMBER", std::to_string(partition_).c_str(), 1);
143  auto logfileName = std::string(__ENV__("OTSDAQ_LOG_DIR")) +
144  "/DAQInteface/DAQInterface_partition" +
145  std::to_string(partition_) + ".log";
146  setenv("DAQINTERFACE_LOGFILE", logfileName.c_str(), 1);
147 
148  o << "log_directory: "
149  << getSupervisorProperty("log_directory", std::string(__ENV__("OTSDAQ_LOG_DIR")))
150  << std::endl;
151 
152  {
153  const std::string record_directory = getSupervisorProperty(
154  "record_directory", ARTDAQTableBase::ARTDAQ_FCL_PATH + "/run_records/");
155  mkdir(record_directory.c_str(), 0755);
156  o << "record_directory: " << record_directory << std::endl;
157  }
158 
159  o << "package_hashes_to_save: "
160  << getSupervisorProperty("package_hashes_to_save", "[artdaq]") << std::endl;
161 
162  o << "spack_root_for_bash_scripts: "
163  << getSupervisorProperty("spack_root_for_bash_scripts",
164  std::string(__ENV__("SPACK_ROOT")))
165  << std::endl;
166  o << "boardreader timeout: " << getSupervisorProperty("boardreader_timeout", 30)
167  << std::endl;
168  o << "eventbuilder timeout: " << getSupervisorProperty("eventbuilder_timeout", 30)
169  << std::endl;
170  o << "datalogger timeout: " << getSupervisorProperty("datalogger_timeout", 30)
171  << std::endl;
172  o << "dispatcher timeout: " << getSupervisorProperty("dispatcher_timeout", 30)
173  << std::endl;
174  // Only put max_fragment_size_bytes into DAQInterface settings file if advanced_memory_usage is disabled
175  if(!getSupervisorProperty("advanced_memory_usage", false))
176  {
177  o << "max_fragment_size_bytes: "
178  << getSupervisorProperty("max_fragment_size_bytes", 1048576) << std::endl;
179  }
180  o << "transfer_plugin_to_use: "
181  << getSupervisorProperty("transfer_plugin_to_use", "TCPSocket") << std::endl;
182  if(getSupervisorProperty("transfer_plugin_from_brs", "") != "")
183  {
184  o << "transfer_plugin_from_brs: "
185  << getSupervisorProperty("transfer_plugin_from_brs", "") << std::endl;
186  }
187  if(getSupervisorProperty("transfer_plugin_from_ebs", "") != "")
188  {
189  o << "transfer_plugin_from_ebs: "
190  << getSupervisorProperty("transfer_plugin_from_ebs", "") << std::endl;
191  }
192  if(getSupervisorProperty("transfer_plugin_from_dls", "") != "")
193  {
194  o << "transfer_plugin_from_dls: "
195  << getSupervisorProperty("transfer_plugin_from_dls", "") << std::endl;
196  }
197  o << "all_events_to_all_dispatchers: " << std::boolalpha
198  << getSupervisorProperty("all_events_to_all_dispatchers", true) << std::endl;
199  o << "data_directory_override: "
200  << getSupervisorProperty("data_directory_override",
201  std::string(__ENV__("ARTDAQ_OUTPUT_DIR")))
202  << std::endl;
203  o << "max_configurations_to_list: "
204  << getSupervisorProperty("max_configurations_to_list", 10) << std::endl;
205  o << "disable_unique_rootfile_labels: "
206  << getSupervisorProperty("disable_unique_rootfile_labels", false) << std::endl;
207  o << "use_messageviewer: " << std::boolalpha
208  << getSupervisorProperty("use_messageviewer", false) << std::endl;
209  o << "use_messagefacility: " << std::boolalpha
210  << getSupervisorProperty("use_messagefacility", true) << std::endl;
211  o << "fake_messagefacility: " << std::boolalpha
212  << getSupervisorProperty("fake_messagefacility", false) << std::endl;
213  o << "kill_existing_processes: " << std::boolalpha
214  << getSupervisorProperty("kill_existing_processes", true) << std::endl;
215  o << "advanced_memory_usage: " << std::boolalpha
216  << getSupervisorProperty("advanced_memory_usage", false) << std::endl;
217  o << "strict_fragment_id_mode: " << std::boolalpha
218  << getSupervisorProperty("strict_fragment_id_mode", false) << std::endl;
219  o << "disable_private_network_bookkeeping: " << std::boolalpha
220  << getSupervisorProperty("disable_private_network_bookkeeping", false) << std::endl;
221  o << "allowed_processors: " << getSupervisorProperty("allowed_processors", "0-255")
222  << std::
223  endl; // Note this sets a taskset for ALL processes, on all nodes (ex. "1,2,5-7")
224 
225  o.close();
226 
227  // destroy current TRACEController and instantiate ARTDAQSupervisorTRACEController
228  if(CorePropertySupervisorBase::theTRACEController_)
229  {
230  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
231  delete CorePropertySupervisorBase::
232  theTRACEController_; // destruct current TRACEController
233  CorePropertySupervisorBase::theTRACEController_ = nullptr;
234  }
235  CorePropertySupervisorBase::theTRACEController_ =
237  ((ARTDAQSupervisorTRACEController*)CorePropertySupervisorBase::theTRACEController_)
238  ->setSupervisorPtr(this);
239 
240  __SUP_COUT__ << "Constructed." << __E__;
241 } // end constructor()
242 
243 //==============================================================================
244 ARTDAQSupervisor::~ARTDAQSupervisor(void)
245 {
246  __SUP_COUT__ << "Destructor." << __E__;
247  destroy();
248  __SUP_COUT__ << "Destructed." << __E__;
249 } // end destructor()
250 
251 //==============================================================================
252 void ARTDAQSupervisor::destroy(void)
253 {
254  __SUP_COUT__ << "Destroying..." << __E__;
255 
256  if(daqinterface_ptr_ != NULL)
257  {
258  __SUP_COUT__ << "Calling recover transition" << __E__;
259  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
260  PyObject* pName = PyUnicode_FromString("do_recover");
261  /*PyObject* res =*/PyObject_CallMethodObjArgs(
262  daqinterface_ptr_, pName, NULL);
263 
264  __SUP_COUT__ << "Making sure that correct state has been reached" << __E__;
265  getDAQState_();
266  while(daqinterface_state_ != "stopped")
267  {
268  getDAQState_();
269  __SUP_COUT__ << "State is " << daqinterface_state_
270  << ", waiting 1s and retrying..." << __E__;
271  usleep(1000000);
272  }
273 
274  Py_XDECREF(daqinterface_ptr_);
275  daqinterface_ptr_ = NULL;
276  }
277 
278  Py_Finalize();
279 
280  // CorePropertySupervisorBase would destroy, but since it was created here, attempt to destroy
282  {
283  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
286  }
287 
288  __SUP_COUT__ << "Destroyed." << __E__;
289 } // end destroy()
290 
291 //==============================================================================
292 void ARTDAQSupervisor::init(void)
293 {
294  stop_runner_();
295 
296  __SUP_COUT__ << "Initializing..." << __E__;
297  {
298  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
299 
300  // allSupervisorInfo_.init(getApplicationContext());
301  artdaq::configureMessageFacility("ARTDAQSupervisor");
302  __SUP_COUT__ << "artdaq MF configured." << __E__;
303 
304  // initialization
305  char* daqinterface_dir = getenv("ARTDAQ_DAQINTERFACE_DIR");
306  if(daqinterface_dir == NULL)
307  {
308  __SS__ << "ARTDAQ_DAQINTERFACE_DIR environment variable not set! This "
309  "means that DAQInterface has not been setup!"
310  << __E__;
311  __SUP_SS_THROW__;
312  }
313  else
314  {
315  __SUP_COUT__ << "Initializing Python" << __E__;
316  Py_Initialize();
317 
318  __SUP_COUT__ << "Adding DAQInterface directory to PYTHON_PATH" << __E__;
319  PyObject* sysPath = PySys_GetObject((char*)"path");
320  PyObject* programName = PyUnicode_FromString(daqinterface_dir);
321  PyList_Append(sysPath, programName);
322  Py_DECREF(programName);
323 
324  __SUP_COUT__ << "Creating Module name" << __E__;
325  PyObject* pName = PyUnicode_FromString("rc.control.daqinterface");
326  /* Error checking of pName left out */
327 
328  __SUP_COUT__ << "Importing module" << __E__;
329  PyObject* pModule = PyImport_Import(pName);
330  Py_DECREF(pName);
331 
332  if(pModule == NULL)
333  {
334  PyErr_Print();
335  __SS__ << "Failed to load rc.control.daqinterface" << __E__;
336  __SUP_SS_THROW__;
337  }
338  else
339  {
340  __SUP_COUT__ << "Loading python module dictionary" << __E__;
341  PyObject* pDict = PyModule_GetDict(pModule);
342  if(pDict == NULL)
343  {
344  PyErr_Print();
345  __SS__ << "Unable to load module dictionary" << __E__;
346  __SUP_SS_THROW__;
347  }
348  else
349  {
350  Py_DECREF(pModule);
351 
352  __SUP_COUT__ << "Getting DAQInterface object pointer" << __E__;
353  PyObject* di_obj_ptr = PyDict_GetItemString(pDict, "DAQInterface");
354 
355  __SUP_COUT__ << "Filling out DAQInterface args struct" << __E__;
356  PyObject* pArgs = PyTuple_New(0);
357 
358  PyObject* kwargs = Py_BuildValue("{s:s, s:s, s:i, s:i, s:s, s:s}",
359  "logpath",
360  ".daqint.log",
361  "name",
362  "DAQInterface",
363  "partition_number",
364  partition_,
365  "rpc_port",
366  DAQINTERFACE_PORT,
367  "rpc_host",
368  "localhost",
369  "control_host",
370  "localhost");
371 
372  __SUP_COUT__ << "Calling DAQInterface Object Constructor" << __E__;
373  daqinterface_ptr_ = PyObject_Call(di_obj_ptr, pArgs, kwargs);
374 
375  Py_DECREF(di_obj_ptr);
376  }
377  }
378  }
379 
380  getDAQState_();
381 
382  // { //attempt to cleanup old artdaq processes DOES NOT WORK because artdaq interface knows it hasn't started
383  // __SUP_COUT__ << "Attempting artdaq stale cleanup..." << __E__;
384  // std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
385  // getDAQState_();
386  // __SUP_COUT__ << "Status before cleanup: " << daqinterface_state_ << __E__;
387 
388  // PyObject* pName = PyUnicode_FromString("do_recover");
389  // PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
390 
391  // if(res == NULL)
392  // {
393  // PyErr_Print();
394  // __SS__ << "Error with clean up calling do_recover" << __E__;
395  // __SUP_SS_THROW__;
396  // }
397  // getDAQState_();
398  // __SUP_COUT__ << "Status after cleanup: " << daqinterface_state_ << __E__;
399  // __SUP_COUT__ << "cleanup DONE." << __E__;
400  // }
401  }
402  start_runner_();
403  __SUP_COUT__ << "Initialized." << __E__;
404 } // end init()
405 
406 //==============================================================================
407 void ARTDAQSupervisor::transitionConfiguring(toolbox::Event::Reference /*event*/)
408 {
409  __SUP_COUT__ << "transitionConfiguring" << __E__;
410 
411  // activate the configuration tree (the first iteration)
412  if(RunControlStateMachine::getIterationIndex() == 0 &&
413  RunControlStateMachine::getSubIterationIndex() == 0)
414  {
415  thread_error_message_ = "";
416  thread_progress_bar_.resetProgressBar(0);
417  last_thread_progress_update_ = time(0); // initialize timeout timer
418 
419  std::pair<std::string /*group name*/, TableGroupKey> theGroup(
420  SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
421  .getParameters()
422  .getValue("ConfigurationTableGroupName"),
423  TableGroupKey(SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
424  .getParameters()
425  .getValue("ConfigurationTableGroupKey")));
426 
427  __SUP_COUT__ << "Configuration table group name: " << theGroup.first
428  << " key: " << theGroup.second << __E__;
429 
430  try
431  {
432  // disable version tracking to accept untracked versions to be selected by the FSM transition source
433  theConfigurationManager_->loadTableGroup(
434  theGroup.first,
435  theGroup.second,
436  true /*doActivate*/,
437  0,
438  0,
439  0,
440  0,
441  0,
442  0,
443  false,
444  0,
445  0,
446  ConfigurationManager::LoadGroupType::ALL_TYPES,
447  true /*ignoreVersionTracking*/);
448  }
449  catch(const std::runtime_error& e)
450  {
451  __SS__ << "Error loading table group '" << theGroup.first << "("
452  << theGroup.second << ")! \n"
453  << e.what() << __E__;
454  __SUP_COUT_ERR__ << ss.str();
455  // ExceptionHandler(ExceptionHandlerRethrow::no, ss.str());
456 
457  //__SS_THROW_ONLY__;
458  theStateMachine_.setErrorMessage(ss.str());
459  throw toolbox::fsm::exception::Exception(
460  "Transition Error" /*name*/,
461  ss.str() /* message*/,
462  "ARTDAQSupervisor::transitionConfiguring" /*module*/,
463  __LINE__ /*line*/,
464  __FUNCTION__ /*function*/
465  );
466  }
467  catch(...)
468  {
469  __SS__ << "Unknown error loading table group '" << theGroup.first << "("
470  << theGroup.second << ")!" << __E__;
471  __SUP_COUT_ERR__ << ss.str();
472  // ExceptionHandler(ExceptionHandlerRethrow::no, ss.str());
473 
474  //__SS_THROW_ONLY__;
475  theStateMachine_.setErrorMessage(ss.str());
476  throw toolbox::fsm::exception::Exception(
477  "Transition Error" /*name*/,
478  ss.str() /* message*/,
479  "ARTDAQSupervisor::transitionConfiguring" /*module*/,
480  __LINE__ /*line*/,
481  __FUNCTION__ /*function*/
482  );
483  }
484 
485  // start configuring thread
486  std::thread(&ARTDAQSupervisor::configuringThread, this).detach();
487 
488  __SUP_COUT__ << "Configuring thread started." << __E__;
489 
490  RunControlStateMachine::
491  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
492  }
493  else // not first time
494  {
495  std::string errorMessage;
496  {
497  std::lock_guard<std::mutex> lock(
498  thread_mutex_); // lock out for remainder of scope
499  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
500  }
501  int progress = thread_progress_bar_.read();
502  __SUP_COUTV__(errorMessage);
503  __SUP_COUTV__(progress);
504  __SUP_COUTV__(thread_progress_bar_.isComplete());
505 
506  // check for done and error messages
507  if(errorMessage == "" && // if no update in 600 seconds, give up
508  time(0) - last_thread_progress_update_ > 600)
509  {
510  __SUP_SS__ << "There has been no update from the configuration thread for "
511  << (time(0) - last_thread_progress_update_)
512  << " seconds, assuming something is wrong and giving up! "
513  << "Last progress received was " << progress << __E__;
514  errorMessage = ss.str();
515  }
516 
517  if(errorMessage != "")
518  {
519  __SUP_SS__ << "Error was caught in configuring thread: " << errorMessage
520  << __E__;
521  __SUP_COUT_ERR__ << "\n" << ss.str();
522 
523  theStateMachine_.setErrorMessage(ss.str());
524  throw toolbox::fsm::exception::Exception(
525  "Transition Error" /*name*/,
526  ss.str() /* message*/,
527  "CoreSupervisorBase::transitionConfiguring" /*module*/,
528  __LINE__ /*line*/,
529  __FUNCTION__ /*function*/
530  );
531  }
532 
533  if(!thread_progress_bar_.isComplete())
534  {
535  RunControlStateMachine::
536  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
537 
538  if(last_thread_progress_read_ != progress)
539  {
540  last_thread_progress_read_ = progress;
541  last_thread_progress_update_ = time(0);
542  }
543 
544  sleep(1 /*seconds*/);
545  }
546  else
547  {
548  __SUP_COUT_INFO__ << "Complete configuring transition!" << __E__;
549  __SUP_COUTV__(getProcessInfo_());
550  }
551  }
552 
553  return;
554 } // end transitionConfiguring()
555 
556 //==============================================================================
557 void ARTDAQSupervisor::configuringThread()
558 try
559 {
560  std::string uid = theConfigurationManager_
561  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
562  "/" + CorePropertySupervisorBase::getSupervisorUID() +
563  "/" + "LinkToSupervisorTable")
564  .getValueAsString();
565 
566  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
567 
568  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
569 
570  ConfigurationTree theSupervisorNode = getSupervisorTableNode();
571 
572  thread_progress_bar_.step();
573 
574  set_thread_message_("ConfigGen");
575 
576  auto info = ARTDAQTableBase::extractARTDAQInfo(
577  theSupervisorNode,
578  false /*getStatusFalseNodes*/,
579  true /*doWriteFHiCL*/,
580  getSupervisorProperty("max_fragment_size_bytes", 8888),
581  getSupervisorProperty("routing_timeout_ms", 1999),
582  getSupervisorProperty("routing_retry_count", 12),
583  &thread_progress_bar_);
584 
585  // Check lists
586  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::BoardReader) == 0)
587  {
588  __GEN_SS__ << "There must be at least one enabled BoardReader!" << __E__;
589  __GEN_SS_THROW__;
590  return;
591  }
592  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::EventBuilder) == 0)
593  {
594  __GEN_SS__ << "There must be at least one enabled EventBuilder!" << __E__;
595  __GEN_SS_THROW__;
596  return;
597  }
598 
599  thread_progress_bar_.step();
600  set_thread_message_("Writing boot.txt");
601 
602  __GEN_COUT__ << "Writing boot.txt" << __E__;
603 
604  int debugLevel = theSupervisorNode.getNode("DAQInterfaceDebugLevel").getValue<int>();
605  std::string setupScript = theSupervisorNode.getNode("DAQSetupScript").getValue();
606 
607  std::ofstream o(ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt", std::ios::trunc);
608  o << "DAQ setup script: " << setupScript << std::endl;
609  o << "debug level: " << debugLevel << std::endl;
610  o << std::endl;
611 
612  if(info.subsystems.size() > 1)
613  {
614  for(auto& ss : info.subsystems)
615  {
616  if(ss.first == 0)
617  continue;
618  o << "Subsystem id: " << ss.first << std::endl;
619  if(ss.second.destination != 0)
620  {
621  o << "Subsystem destination: " << ss.second.destination << std::endl;
622  }
623  for(auto& sss : ss.second.sources)
624  {
625  o << "Subsystem source: " << sss << std::endl;
626  }
627  if(ss.second.eventMode)
628  {
629  o << "Subsystem fragmentMode: False" << std::endl;
630  }
631  o << std::endl;
632  }
633  }
634 
635  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
636  {
637  o << "EventBuilder host: " << builder.hostname << std::endl;
638  o << "EventBuilder label: " << builder.label << std::endl;
639  label_to_proc_type_map_[builder.label] = "EventBuilder";
640  if(builder.subsystem != 1)
641  {
642  o << "EventBuilder subsystem: " << builder.subsystem << std::endl;
643  }
644  if(builder.allowed_processors != "")
645  {
646  o << "EventBuilder allowed_processors: " << builder.allowed_processors
647  << std::endl;
648  }
649  o << std::endl;
650  }
651  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
652  {
653  o << "DataLogger host: " << logger.hostname << std::endl;
654  o << "DataLogger label: " << logger.label << std::endl;
655  label_to_proc_type_map_[logger.label] = "DataLogger";
656  if(logger.subsystem != 1)
657  {
658  o << "DataLogger subsystem: " << logger.subsystem << std::endl;
659  }
660  if(logger.allowed_processors != "")
661  {
662  o << "DataLogger allowed_processors: " << logger.allowed_processors
663  << std::endl;
664  }
665  o << std::endl;
666  }
667  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
668  {
669  o << "Dispatcher host: " << dispatcher.hostname << std::endl;
670  o << "Dispatcher label: " << dispatcher.label << std::endl;
671  o << "Dispatcher port: " << dispatcher.port << std::endl;
672  label_to_proc_type_map_[dispatcher.label] = "Dispatcher";
673  if(dispatcher.subsystem != 1)
674  {
675  o << "Dispatcher subsystem: " << dispatcher.subsystem << std::endl;
676  }
677  if(dispatcher.allowed_processors != "")
678  {
679  o << "Dispatcher allowed_processors: " << dispatcher.allowed_processors
680  << std::endl;
681  }
682  o << std::endl;
683  }
684  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
685  {
686  o << "RoutingManager host: " << rmanager.hostname << std::endl;
687  o << "RoutingManager label: " << rmanager.label << std::endl;
688  label_to_proc_type_map_[rmanager.label] = "RoutingManager";
689  if(rmanager.subsystem != 1)
690  {
691  o << "RoutingManager subsystem: " << rmanager.subsystem << std::endl;
692  }
693  if(rmanager.allowed_processors != "")
694  {
695  o << "RoutingManager allowed_processors: " << rmanager.allowed_processors
696  << std::endl;
697  }
698  o << std::endl;
699  }
700  o.close();
701 
702  thread_progress_bar_.step();
703  set_thread_message_("Writing Fhicl Files");
704 
705  __GEN_COUT__ << "Building configuration directory" << __E__;
706 
707  boost::system::error_code ignored;
708  boost::filesystem::remove_all(ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME,
709  ignored);
710  mkdir((ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME).c_str(), 0755);
711 
712  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
713  {
714  symlink(ARTDAQTableBase::getFlatFHICLFilename(
715  ARTDAQTableBase::ARTDAQAppType::BoardReader, reader.label)
716  .c_str(),
717  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
718  reader.label + ".fcl")
719  .c_str());
720  }
721  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
722  {
723  symlink(ARTDAQTableBase::getFlatFHICLFilename(
724  ARTDAQTableBase::ARTDAQAppType::EventBuilder, builder.label)
725  .c_str(),
726  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
727  builder.label + ".fcl")
728  .c_str());
729  }
730  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
731  {
732  symlink(ARTDAQTableBase::getFlatFHICLFilename(
733  ARTDAQTableBase::ARTDAQAppType::DataLogger, logger.label)
734  .c_str(),
735  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
736  logger.label + ".fcl")
737  .c_str());
738  }
739  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
740  {
741  symlink(ARTDAQTableBase::getFlatFHICLFilename(
742  ARTDAQTableBase::ARTDAQAppType::Dispatcher, dispatcher.label)
743  .c_str(),
744  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
745  dispatcher.label + ".fcl")
746  .c_str());
747  }
748  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
749  {
750  symlink(ARTDAQTableBase::getFlatFHICLFilename(
751  ARTDAQTableBase::ARTDAQAppType::RoutingManager, rmanager.label)
752  .c_str(),
753  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
754  rmanager.label + ".fcl")
755  .c_str());
756  }
757 
758  thread_progress_bar_.step();
759 
760  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
761  getDAQState_();
762  if(daqinterface_state_ != "stopped" && daqinterface_state_ != "")
763  {
764  __GEN_SS__ << "Cannot configure DAQInterface because it is in the wrong state"
765  << " (" << daqinterface_state_ << " != stopped)!" << __E__;
766  __GEN_SS_THROW__
767  }
768 
769  set_thread_message_("Calling setdaqcomps");
770  __GEN_COUT__ << "Calling setdaqcomps" << __E__;
771  __GEN_COUT__ << "Status before setdaqcomps: " << daqinterface_state_ << __E__;
772  PyObject* pName1 = PyUnicode_FromString("setdaqcomps");
773 
774  PyObject* readerDict = PyDict_New();
775  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
776  {
777  label_to_proc_type_map_[reader.label] = "BoardReader";
778  PyObject* readerName = PyUnicode_FromString(reader.label.c_str());
779 
780  int list_size = reader.allowed_processors != "" ? 4 : 3;
781 
782  PyObject* readerData = PyList_New(list_size);
783  PyObject* readerHost = PyUnicode_FromString(reader.hostname.c_str());
784  PyObject* readerPort = PyUnicode_FromString("-1");
785  PyObject* readerSubsystem =
786  PyUnicode_FromString(std::to_string(reader.subsystem).c_str());
787  PyList_SetItem(readerData, 0, readerHost);
788  PyList_SetItem(readerData, 1, readerPort);
789  PyList_SetItem(readerData, 2, readerSubsystem);
790  if(reader.allowed_processors != "")
791  {
792  PyObject* readerAllowedProcessors =
793  PyUnicode_FromString(reader.allowed_processors.c_str());
794  PyList_SetItem(readerData, 3, readerAllowedProcessors);
795  }
796  PyDict_SetItem(readerDict, readerName, readerData);
797  }
798  PyObject* res1 =
799  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName1, readerDict, NULL);
800  Py_DECREF(readerDict);
801 
802  if(res1 == NULL)
803  {
804  PyErr_Print();
805  __GEN_SS__ << "Error calling setdaqcomps transition" << __E__;
806  __GEN_SS_THROW__;
807  }
808  getDAQState_();
809  __GEN_COUT__ << "Status after setdaqcomps: " << daqinterface_state_ << __E__;
810 
811  thread_progress_bar_.step();
812  set_thread_message_("Calling do_boot");
813  __GEN_COUT__ << "Calling do_boot" << __E__;
814  __GEN_COUT__ << "Status before boot: " << daqinterface_state_ << __E__;
815  PyObject* pName2 = PyUnicode_FromString("do_boot");
816  PyObject* pStateArgs1 =
817  PyUnicode_FromString((ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt").c_str());
818  PyObject* res2 =
819  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName2, pStateArgs1, NULL);
820 
821  if(res2 == NULL)
822  {
823  PyErr_Print();
824  __GEN_COUT__ << "Error on first boost attempt, recovering and retrying" << __E__;
825 
826  PyObject* pName = PyUnicode_FromString("do_recover");
827  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
828 
829  if(res == NULL)
830  {
831  PyErr_Print();
832  __GEN_SS__ << "Error calling recover transition!!!!" << __E__;
833  __GEN_SS_THROW__;
834  }
835 
836  thread_progress_bar_.step();
837  set_thread_message_("Calling do_boot (retry)");
838  __GEN_COUT__ << "Calling do_boot again" << __E__;
839  __GEN_COUT__ << "Status before boot: " << daqinterface_state_ << __E__;
840  PyObject* res3 =
841  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName2, pStateArgs1, NULL);
842 
843  if(res3 == NULL)
844  {
845  PyErr_Print();
846  __GEN_SS__ << "Error calling boot transition (2nd try)" << __E__;
847  __GEN_SS_THROW__;
848  }
849  }
850 
851  getDAQState_();
852  if(daqinterface_state_ != "booted")
853  {
854  __GEN_SS__ << "DAQInterface boot transition failed! "
855  << "Status after boot attempt: " << daqinterface_state_ << __E__;
856  __GEN_SS_THROW__;
857  }
858  __GEN_COUT__ << "Status after boot: " << daqinterface_state_ << __E__;
859 
860  thread_progress_bar_.step();
861  set_thread_message_("Calling do_config");
862  __GEN_COUT__ << "Calling do_config" << __E__;
863  __GEN_COUT__ << "Status before config: " << daqinterface_state_ << __E__;
864  PyObject* pName3 = PyUnicode_FromString("do_config");
865  PyObject* pStateArgs2 = Py_BuildValue("[s]", FAKE_CONFIG_NAME);
866  PyObject* res3 =
867  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName3, pStateArgs2, NULL);
868 
869  if(res3 == NULL)
870  {
871  PyErr_Print();
872  __GEN_SS__ << "Error calling config transition" << __E__;
873  __GEN_SS_THROW__;
874  }
875  getDAQState_();
876  if(daqinterface_state_ != "ready")
877  {
878  __GEN_SS__ << "DAQInterface config transition failed!" << __E__
879  << "Supervisor state: \"" << daqinterface_state_ << "\" != \"ready\" "
880  << __E__;
881  __GEN_SS_THROW__;
882  }
883  __GEN_COUT__ << "Status after config: " << daqinterface_state_ << __E__;
884  thread_progress_bar_.complete();
885  set_thread_message_("Configured");
886  __GEN_COUT__ << "Configured." << __E__;
887 
888 } // end configuringThread()
889 catch(const std::runtime_error& e)
890 {
891  set_thread_message_("ERROR");
892  __SS__ << "Error was caught while configuring: " << e.what() << __E__;
893  __COUT_ERR__ << "\n" << ss.str();
894  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
895  thread_error_message_ = ss.str();
896 }
897 catch(...)
898 {
899  set_thread_message_("ERROR");
900  __SS__ << "Unknown error was caught while configuring. Please checked the logs."
901  << __E__;
902  __COUT_ERR__ << "\n" << ss.str();
903 
904  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
905 
906  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
907  thread_error_message_ = ss.str();
908 } // end configuringThread() error handling
909 
910 //==============================================================================
911 void ARTDAQSupervisor::transitionHalting(toolbox::Event::Reference /*event*/)
912 try
913 {
914  set_thread_message_("Halting");
915  __SUP_COUT__ << "Halting..." << __E__;
916  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
917  getDAQState_();
918  __SUP_COUT__ << "Status before halt: " << daqinterface_state_ << __E__;
919 
920  if(daqinterface_state_ == "running")
921  {
922  // First stop before halting
923  PyObject* pName = PyUnicode_FromString("do_stop_running");
924  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
925 
926  if(res == NULL)
927  {
928  PyErr_Print();
929  __SS__ << "Error calling DAQ Interface stop transition." << __E__;
930  __SUP_SS_THROW__;
931  }
932  }
933 
934  PyObject* pName = PyUnicode_FromString("do_command");
935  PyObject* pArg = PyUnicode_FromString("Shutdown");
936  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
937 
938  if(res == NULL)
939  {
940  PyErr_Print();
941  __SS__ << "Error calling DAQ Interface halt transition." << __E__;
942  __SUP_SS_THROW__;
943  }
944 
945  getDAQState_();
946  __SUP_COUT__ << "Status after halt: " << daqinterface_state_ << __E__;
947  __SUP_COUT__ << "Halted." << __E__;
948  set_thread_message_("Halted");
949 } // end transitionHalting()
950 catch(const std::runtime_error& e)
951 {
952  const std::string transitionName = "Halting";
953  // if halting from Failed state, then ignore errors
954  if(theStateMachine_.getProvenanceStateName() ==
955  RunControlStateMachine::FAILED_STATE_NAME ||
956  theStateMachine_.getProvenanceStateName() ==
957  RunControlStateMachine::HALTED_STATE_NAME)
958  {
959  __SUP_COUT_INFO__ << "Error was caught while halting (but ignoring because "
960  "previous state was '"
961  << RunControlStateMachine::FAILED_STATE_NAME
962  << "'): " << e.what() << __E__;
963  }
964  else // if not previously in Failed state, then fail
965  {
966  __SUP_SS__ << "Error was caught while " << transitionName << ": " << e.what()
967  << __E__;
968  __SUP_COUT_ERR__ << "\n" << ss.str();
969  theStateMachine_.setErrorMessage(ss.str());
970  throw toolbox::fsm::exception::Exception(
971  "Transition Error" /*name*/,
972  ss.str() /* message*/,
973  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
974  __LINE__ /*line*/,
975  __FUNCTION__ /*function*/
976  );
977  }
978 } // end transitionHalting() std::runtime_error exception handling
979 catch(...)
980 {
981  const std::string transitionName = "Halting";
982  // if halting from Failed state, then ignore errors
983  if(theStateMachine_.getProvenanceStateName() ==
984  RunControlStateMachine::FAILED_STATE_NAME ||
985  theStateMachine_.getProvenanceStateName() ==
986  RunControlStateMachine::HALTED_STATE_NAME)
987  {
988  __SUP_COUT_INFO__ << "Unknown error was caught while halting (but ignoring "
989  "because previous state was '"
990  << RunControlStateMachine::FAILED_STATE_NAME << "')." << __E__;
991  }
992  else // if not previously in Failed state, then fail
993  {
994  __SUP_SS__ << "Unknown error was caught while " << transitionName
995  << ". Please checked the logs." << __E__;
996  __SUP_COUT_ERR__ << "\n" << ss.str();
997  theStateMachine_.setErrorMessage(ss.str());
998 
999  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1000 
1001  throw toolbox::fsm::exception::Exception(
1002  "Transition Error" /*name*/,
1003  ss.str() /* message*/,
1004  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
1005  __LINE__ /*line*/,
1006  __FUNCTION__ /*function*/
1007  );
1008  }
1009 } // end transitionHalting() exception handling
1010 
1011 //==============================================================================
1012 void ARTDAQSupervisor::transitionInitializing(toolbox::Event::Reference /*event*/)
1013 try
1014 {
1015  set_thread_message_("Initializing");
1016  __SUP_COUT__ << "Initializing..." << __E__;
1017  init();
1018  __SUP_COUT__ << "Initialized." << __E__;
1019  set_thread_message_("Initialized");
1020 } // end transitionInitializing()
1021 catch(const std::runtime_error& e)
1022 {
1023  __SS__ << "Error was caught while Initializing: " << e.what() << __E__;
1024  __SS_THROW__;
1025 }
1026 catch(...)
1027 {
1028  __SS__ << "Unknown error was caught while Initializing. Please checked the logs."
1029  << __E__;
1030  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1031  __SS_THROW__;
1032 } // end transitionInitializing() error handling
1033 
1034 //==============================================================================
1035 void ARTDAQSupervisor::transitionPausing(toolbox::Event::Reference /*event*/)
1036 try
1037 {
1038  set_thread_message_("Pausing");
1039  __SUP_COUT__ << "Pausing..." << __E__;
1040  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1041 
1042  getDAQState_();
1043  __SUP_COUT__ << "Status before pause: " << daqinterface_state_ << __E__;
1044 
1045  PyObject* pName = PyUnicode_FromString("do_command");
1046  PyObject* pArg = PyUnicode_FromString("Pause");
1047  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1048 
1049  if(res == NULL)
1050  {
1051  PyErr_Print();
1052  __SS__ << "Error calling DAQ Interface Pause transition." << __E__;
1053  __SUP_SS_THROW__;
1054  }
1055 
1056  getDAQState_();
1057  __SUP_COUT__ << "Status after pause: " << daqinterface_state_ << __E__;
1058 
1059  __SUP_COUT__ << "Paused." << __E__;
1060  set_thread_message_("Paused");
1061 } // end transitionPausing()
1062 catch(const std::runtime_error& e)
1063 {
1064  __SS__ << "Error was caught while Pausing: " << e.what() << __E__;
1065  __SS_THROW__;
1066 }
1067 catch(...)
1068 {
1069  __SS__ << "Unknown error was caught while Pausing. Please checked the logs." << __E__;
1070  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1071  __SS_THROW__;
1072 } // end transitionPausing() error handling
1073 
1074 //==============================================================================
1075 void ARTDAQSupervisor::transitionResuming(toolbox::Event::Reference /*event*/)
1076 try
1077 {
1078  set_thread_message_("Resuming");
1079  __SUP_COUT__ << "Resuming..." << __E__;
1080  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1081 
1082  getDAQState_();
1083  __SUP_COUT__ << "Status before resume: " << daqinterface_state_ << __E__;
1084  PyObject* pName = PyUnicode_FromString("do_command");
1085  PyObject* pArg = PyUnicode_FromString("Resume");
1086  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1087 
1088  if(res == NULL)
1089  {
1090  PyErr_Print();
1091  __SS__ << "Error calling DAQ Interface Resume transition." << __E__;
1092  __SUP_SS_THROW__;
1093  }
1094  getDAQState_();
1095  __SUP_COUT__ << "Status after resume: " << daqinterface_state_ << __E__;
1096  __SUP_COUT__ << "Resumed." << __E__;
1097  set_thread_message_("Resumed");
1098 } // end transitionResuming()
1099 catch(const std::runtime_error& e)
1100 {
1101  __SS__ << "Error was caught while Resuming: " << e.what() << __E__;
1102  __SS_THROW__;
1103 }
1104 catch(...)
1105 {
1106  __SS__ << "Unknown error was caught while Resuming. Please checked the logs."
1107  << __E__;
1108  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1109  __SS_THROW__;
1110 } // end transitionResuming() error handling
1111 
1112 //==============================================================================
1113 void ARTDAQSupervisor::transitionStarting(toolbox::Event::Reference /*event*/)
1114 try
1115 {
1116  __SUP_COUT__ << "transitionStarting" << __E__;
1117 
1118  // first time launch thread because artdaq Supervisor may take a while
1119  if(RunControlStateMachine::getIterationIndex() == 0 &&
1120  RunControlStateMachine::getSubIterationIndex() == 0)
1121  {
1122  thread_error_message_ = "";
1123  thread_progress_bar_.resetProgressBar(0);
1124  last_thread_progress_update_ = time(0); // initialize timeout timer
1125 
1126  // start configuring thread
1127  std::thread(&ARTDAQSupervisor::startingThread, this).detach();
1128 
1129  __SUP_COUT__ << "Starting thread started." << __E__;
1130 
1131  RunControlStateMachine::
1132  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1133  }
1134  else // not first time
1135  {
1136  std::string errorMessage;
1137  {
1138  std::lock_guard<std::mutex> lock(
1139  thread_mutex_); // lock out for remainder of scope
1140  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
1141  }
1142  int progress = thread_progress_bar_.read();
1143  __SUP_COUTV__(errorMessage);
1144  __SUP_COUTV__(progress);
1145  __SUP_COUTV__(thread_progress_bar_.isComplete());
1146 
1147  // check for done and error messages
1148  if(errorMessage == "" && // if no update in 600 seconds, give up
1149  time(0) - last_thread_progress_update_ > 600)
1150  {
1151  __SUP_SS__ << "There has been no update from the start thread for "
1152  << (time(0) - last_thread_progress_update_)
1153  << " seconds, assuming something is wrong and giving up! "
1154  << "Last progress received was " << progress << __E__;
1155  errorMessage = ss.str();
1156  }
1157 
1158  if(errorMessage != "")
1159  {
1160  __SUP_SS__ << "Error was caught in starting thread: " << errorMessage
1161  << __E__;
1162  __SUP_COUT_ERR__ << "\n" << ss.str();
1163 
1164  theStateMachine_.setErrorMessage(ss.str());
1165  throw toolbox::fsm::exception::Exception(
1166  "Transition Error" /*name*/,
1167  ss.str() /* message*/,
1168  "CoreSupervisorBase::transitionStarting" /*module*/,
1169  __LINE__ /*line*/,
1170  __FUNCTION__ /*function*/
1171  );
1172  }
1173 
1174  if(!thread_progress_bar_.isComplete())
1175  {
1176  RunControlStateMachine::
1177  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1178 
1179  if(last_thread_progress_read_ != progress)
1180  {
1181  last_thread_progress_read_ = progress;
1182  last_thread_progress_update_ = time(0);
1183  }
1184 
1185  sleep(1 /*seconds*/);
1186  }
1187  else
1188  {
1189  __SUP_COUT_INFO__ << "Complete starting transition!" << __E__;
1190  __SUP_COUTV__(getProcessInfo_());
1191  }
1192  }
1193 
1194  return;
1195 
1196 } // end transitionStarting()
1197 catch(const std::runtime_error& e)
1198 {
1199  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1200  __SS_THROW__;
1201 }
1202 catch(...)
1203 {
1204  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1205  << __E__;
1206  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1207  __SS_THROW__;
1208 } // end transitionStarting() error handling
1209 
1210 //==============================================================================
1211 void ARTDAQSupervisor::startingThread()
1212 try
1213 {
1214  std::string uid = theConfigurationManager_
1215  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
1216  "/" + CorePropertySupervisorBase::getSupervisorUID() +
1217  "/" + "LinkToSupervisorTable")
1218  .getValueAsString();
1219 
1220  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
1221  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
1222  __GEN_COUT__ << "Starting..." << __E__;
1223  set_thread_message_("Starting");
1224 
1225  thread_progress_bar_.step();
1226  stop_runner_();
1227  {
1228  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1229  getDAQState_();
1230  __GEN_COUT__ << "Status before start: " << daqinterface_state_ << __E__;
1231  auto runNumber = SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
1232  .getParameters()
1233  .getValue("RunNumber");
1234 
1235  thread_progress_bar_.step();
1236 
1237  PyObject* pName = PyUnicode_FromString("do_start_running");
1238  int run_number = std::stoi(runNumber);
1239  PyObject* pStateArgs = PyLong_FromLong(run_number);
1240  PyObject* res =
1241  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pStateArgs, NULL);
1242 
1243  thread_progress_bar_.step();
1244 
1245  if(res == NULL)
1246  {
1247  PyErr_Print();
1248  __SS__ << "Error calling start transition" << __E__;
1249  __GEN_SS_THROW__;
1250  }
1251  getDAQState_();
1252 
1253  thread_progress_bar_.step();
1254 
1255  __GEN_COUT__ << "Status after start: " << daqinterface_state_ << __E__;
1256  if(daqinterface_state_ != "running")
1257  {
1258  __SS__ << "DAQInterface start transition failed!" << __E__;
1259  __GEN_SS_THROW__;
1260  }
1261 
1262  thread_progress_bar_.step();
1263  }
1264  start_runner_();
1265  set_thread_message_("Started");
1266  thread_progress_bar_.step();
1267 
1268  __GEN_COUT__ << "Started." << __E__;
1269  thread_progress_bar_.complete();
1270 
1271 } // end startingThread()
1272 catch(const std::runtime_error& e)
1273 {
1274  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1275  __COUT_ERR__ << "\n" << ss.str();
1276  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1277  thread_error_message_ = ss.str();
1278 }
1279 catch(...)
1280 {
1281  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1282  << __E__;
1283  __COUT_ERR__ << "\n" << ss.str();
1284 
1285  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1286 
1287  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1288  thread_error_message_ = ss.str();
1289 } // end startingThread() error handling
1290 
1291 //==============================================================================
1292 void ARTDAQSupervisor::transitionStopping(toolbox::Event::Reference /*event*/)
1293 try
1294 {
1295  __SUP_COUT__ << "Stopping..." << __E__;
1296  set_thread_message_("Stopping");
1297  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1298  getDAQState_();
1299  __SUP_COUT__ << "Status before stop: " << daqinterface_state_ << __E__;
1300  PyObject* pName = PyUnicode_FromString("do_stop_running");
1301  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1302 
1303  if(res == NULL)
1304  {
1305  PyErr_Print();
1306  __SS__ << "Error calling DAQ Interface stop transition." << __E__;
1307  __SUP_SS_THROW__;
1308  }
1309  getDAQState_();
1310  __SUP_COUT__ << "Status after stop: " << daqinterface_state_ << __E__;
1311  __SUP_COUT__ << "Stopped." << __E__;
1312  set_thread_message_("Stopped");
1313 } // end transitionStopping()
1314 catch(const std::runtime_error& e)
1315 {
1316  __SS__ << "Error was caught while Stopping: " << e.what() << __E__;
1317  __SS_THROW__;
1318 }
1319 catch(...)
1320 {
1321  __SS__ << "Unknown error was caught while Stopping. Please checked the logs."
1322  << __E__;
1323  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1324  __SS_THROW__;
1325 } // end transitionStopping() error handling
1326 
1327 //==============================================================================
1328 void ots::ARTDAQSupervisor::enteringError(toolbox::Event::Reference /*event*/)
1329 {
1330  __SUP_COUT__ << "Entering error recovery state" << __E__;
1331  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1332  getDAQState_();
1333  __SUP_COUT__ << "Status before error: " << daqinterface_state_ << __E__;
1334 
1335  PyObject* pName = PyUnicode_FromString("do_recover");
1336  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1337 
1338  if(res == NULL)
1339  {
1340  PyErr_Print();
1341  __SS__ << "Error calling DAQ Interface recover transition." << __E__;
1342  __SUP_SS_THROW__;
1343  }
1344  getDAQState_();
1345  __SUP_COUT__ << "Status after error: " << daqinterface_state_ << __E__;
1346  __SUP_COUT__ << "EnteringError DONE." << __E__;
1347 
1348 } // end enteringError()
1349 
1350 std::vector<SupervisorInfo::SubappInfo> ots::ARTDAQSupervisor::getSubappInfo(void)
1351 {
1352  auto apps = getAndParseProcessInfo_();
1353  std::vector<SupervisorInfo::SubappInfo> output;
1354  for(auto& app : apps)
1355  {
1357 
1358  info.name = app.label;
1359  info.detail = "Rank " + std::to_string(app.rank) + ", subsystem " +
1360  std::to_string(app.subsystem);
1361  info.lastStatusTime = time(0);
1362  info.progress = 100;
1363  info.status = artdaqStateToOtsState(app.state);
1364  info.url = "http://" + app.host + ":" + std::to_string(app.port) + "/RPC2";
1365  info.class_name = "ARTDAQ " + labelToProcType_(app.label);
1366 
1367  output.push_back(info);
1368  }
1369  return output;
1370 }
1371 
1372 //==============================================================================
1373 void ots::ARTDAQSupervisor::getDAQState_()
1374 {
1375  //__SUP_COUT__ << "Getting DAQInterface state" << __E__;
1376  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1377 
1378  if(daqinterface_ptr_ == nullptr)
1379  {
1380  daqinterface_state_ = "";
1381  return;
1382  }
1383 
1384  PyObject* pName = PyUnicode_FromString("state");
1385  PyObject* pArg = PyUnicode_FromString("DAQInterface");
1386  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1387 
1388  if(res == NULL)
1389  {
1390  PyErr_Print();
1391  __SS__ << "Error calling state function" << __E__;
1392  __SUP_SS_THROW__;
1393  return;
1394  }
1395  daqinterface_state_ = std::string(PyUnicode_AsUTF8(res));
1396  //__SUP_COUT__ << "getDAQState_ DONE: state=" << result << __E__;
1397 } // end getDAQState_()
1398 
1399 //==============================================================================
1400 std::string ots::ARTDAQSupervisor::getProcessInfo_(void)
1401 {
1402  //__SUP_COUT__ << "Getting DAQInterface state" << __E__;
1403  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1404 
1405  if(daqinterface_ptr_ == nullptr)
1406  {
1407  return "";
1408  }
1409 
1410  PyObject* pName = PyUnicode_FromString("artdaq_process_info");
1411  PyObject* pArg = PyUnicode_FromString("DAQInterface");
1412  PyObject* pArg2 = PyBool_FromLong(true);
1413  PyObject* res =
1414  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, pArg2, NULL);
1415 
1416  if(res == NULL)
1417  {
1418  PyErr_Print();
1419  __SS__ << "Error calling artdaq_process_info function" << __E__;
1420  __SUP_SS_THROW__;
1421  return "";
1422  }
1423  return std::string(PyUnicode_AsUTF8(res));
1424  //__SUP_COUT__ << "getDAQState_ DONE: state=" << result << __E__;
1425 } // end getProcessInfo_()
1426 
1427 std::string ots::ARTDAQSupervisor::artdaqStateToOtsState(std::string state)
1428 {
1429  if(state == "nonexistant")
1430  return RunControlStateMachine::INITIAL_STATE_NAME;
1431  if(state == "Ready")
1432  return "Configured";
1433  if(state == "Running")
1434  return RunControlStateMachine::RUNNING_STATE_NAME;
1435  if(state == "Paused")
1436  return RunControlStateMachine::PAUSED_STATE_NAME;
1437  if(state == "Stopped")
1438  return RunControlStateMachine::HALTED_STATE_NAME;
1439 
1440  TLOG(TLVL_WARNING) << "Unrecognized state name " << state;
1441  return RunControlStateMachine::FAILED_STATE_NAME;
1442 }
1443 
1444 std::string ots::ARTDAQSupervisor::labelToProcType_(std::string label)
1445 {
1446  if(label_to_proc_type_map_.count(label))
1447  {
1448  return label_to_proc_type_map_[label];
1449  }
1450  return "UNKNOWN";
1451 }
1452 
1453 //==============================================================================
1454 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo>
1455 ots::ARTDAQSupervisor::getAndParseProcessInfo_()
1456 {
1457  std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> output;
1458  auto info = getProcessInfo_();
1459  auto procs = tokenize_(info);
1460 
1461  // 0: Whole string
1462  // 1: Process Label
1463  // 2: Process host
1464  // 3: Process port
1465  // 4: Process subsystem
1466  // 5: Process Rank
1467  // 6: Process state
1468  std::regex re("(.*?) at ([^:]*):(\\d+) \\(subsystem (\\d+), rank (\\d+)\\): (.*)");
1469 
1470  for(auto& proc : procs)
1471  {
1472  std::smatch match;
1473  if(std::regex_match(proc, match, re))
1474  {
1475  DAQInterfaceProcessInfo info;
1476 
1477  info.label = match[1];
1478  info.host = match[2];
1479  info.port = std::stoi(match[3]);
1480  info.subsystem = std::stoi(match[4]);
1481  info.rank = std::stoi(match[5]);
1482  info.state = match[6];
1483 
1484  output.push_back(info);
1485  }
1486  }
1487  return output;
1488 } // end getAndParseProcessInfo_()
1489 
1490 //==============================================================================
1492  std::unique_ptr<artdaq::CommanderInterface>>>
1493 ots::ARTDAQSupervisor::makeCommandersFromProcessInfo()
1494 {
1495  std::list<
1496  std::pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>>
1497  output;
1498  auto infos = getAndParseProcessInfo_();
1499 
1500  for(auto& info : infos)
1501  {
1502  artdaq::Commandable cm;
1503  fhicl::ParameterSet ps;
1504 
1505  ps.put<std::string>("commanderPluginType", "xmlrpc");
1506  ps.put<int>("id", info.port);
1507  ps.put<std::string>("server_url", info.host);
1508 
1509  output.emplace_back(std::make_pair<DAQInterfaceProcessInfo,
1510  std::unique_ptr<artdaq::CommanderInterface>>(
1511  std::move(info), artdaq::MakeCommanderPlugin(ps, cm)));
1512  }
1513 
1514  return output;
1515 } // end makeCommandersFromProcessInfo()
1516 
1517 //==============================================================================
1518 std::list<std::string> ots::ARTDAQSupervisor::tokenize_(std::string const& input)
1519 {
1520  size_t pos = 0;
1521  std::list<std::string> output;
1522 
1523  while(pos != std::string::npos && pos < input.size())
1524  {
1525  auto newpos = input.find('\n', pos);
1526  if(newpos != std::string::npos)
1527  {
1528  output.emplace_back(input, pos, newpos - pos);
1529  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1530  pos = newpos + 1;
1531  }
1532  else
1533  {
1534  output.emplace_back(input, pos);
1535  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1536  pos = newpos;
1537  }
1538  }
1539  return output;
1540 } // end tokenize_()
1541 
1542 //==============================================================================
1543 void ots::ARTDAQSupervisor::daqinterfaceRunner_()
1544 {
1545  TLOG(TLVL_TRACE) << "Runner thread starting";
1546  runner_running_ = true;
1547  while(runner_running_)
1548  {
1549  if(daqinterface_ptr_ != NULL)
1550  {
1551  std::unique_lock<std::recursive_mutex> lk(daqinterface_mutex_);
1552  getDAQState_();
1553  std::string state_before = daqinterface_state_;
1554 
1555  if(daqinterface_state_ == "running" || daqinterface_state_ == "ready" ||
1556  daqinterface_state_ == "booted")
1557  {
1558  try
1559  {
1560  TLOG(TLVL_TRACE) << "Calling DAQInterface::check_proc_heartbeats";
1561  PyObject* pName = PyUnicode_FromString("check_proc_heartbeats");
1562  PyObject* res =
1563  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1564  TLOG(TLVL_TRACE)
1565  << "Done with DAQInterface::check_proc_heartbeats call";
1566 
1567  if(res == NULL)
1568  {
1569  runner_running_ = false;
1570  PyErr_Print();
1571  __SS__ << "Error calling check_proc_heartbeats function" << __E__;
1572  __SUP_SS_THROW__;
1573  break;
1574  }
1575  }
1576  catch(cet::exception& ex)
1577  {
1578  runner_running_ = false;
1579  PyErr_Print();
1580  __SS__ << "An cet::exception occurred while calling "
1581  "check_proc_heartbeats function: "
1582  << ex.explain_self() << __E__;
1583  __SUP_SS_THROW__;
1584  break;
1585  }
1586  catch(std::exception& ex)
1587  {
1588  runner_running_ = false;
1589  PyErr_Print();
1590  __SS__ << "An std::exception occurred while calling "
1591  "check_proc_heartbeats function: "
1592  << ex.what() << __E__;
1593  __SUP_SS_THROW__;
1594  break;
1595  }
1596  catch(...)
1597  {
1598  runner_running_ = false;
1599  PyErr_Print();
1600  __SS__ << "An unknown Error occurred while calling runner function"
1601  << __E__;
1602  __SUP_SS_THROW__;
1603  break;
1604  }
1605 
1606  lk.unlock();
1607  getDAQState_();
1608  if(daqinterface_state_ != state_before)
1609  {
1610  runner_running_ = false;
1611  lk.unlock();
1612  __SS__ << "DAQInterface state unexpectedly changed from "
1613  << state_before << " to " << daqinterface_state_
1614  << ". Check supervisor log file for more info!" << __E__;
1615  __SUP_SS_THROW__;
1616  break;
1617  }
1618  }
1619  }
1620  else
1621  {
1622  break;
1623  }
1624  usleep(1000000);
1625  }
1626  runner_running_ = false;
1627  TLOG(TLVL_TRACE) << "Runner thread complete";
1628 } // end daqinterfaceRunner_()
1629 
1630 //==============================================================================
1631 void ots::ARTDAQSupervisor::stop_runner_()
1632 {
1633  runner_running_ = false;
1634  if(runner_thread_ && runner_thread_->joinable())
1635  {
1636  runner_thread_->join();
1637  runner_thread_.reset(nullptr);
1638  }
1639 } // end stop_runner_()
1640 
1641 //==============================================================================
1642 void ots::ARTDAQSupervisor::start_runner_()
1643 {
1644  stop_runner_();
1645  runner_thread_ =
1646  std::make_unique<std::thread>(&ots::ARTDAQSupervisor::daqinterfaceRunner_, this);
1647 } // end start_runner_()
virtual void transitionHalting(toolbox::Event::Reference event) override
virtual void transitionInitializing(toolbox::Event::Reference event) override
void loadTableGroup(const std::string &tableGroupName, const TableGroupKey &tableGroupKey, bool doActivate=false, std::map< std::string, TableVersion > *groupMembers=0, ProgressBar *progressBar=0, std::string *accumulateWarnings=0, std::string *groupComment=0, std::string *groupAuthor=0, std::string *groupCreateTime=0, bool doNotLoadMember=false, std::string *groupTypeString=0, std::map< std::string, std::string > *groupAliases=0, ConfigurationManager::LoadGroupType onlyLoadIfBackboneOrContext=ConfigurationManager::LoadGroupType::ALL_TYPES, bool ignoreVersionTracking=false)
ConfigurationTree getNode(const std::string &nodeString, bool doNotThrowOnBrokenUIDLinks=false) const
"root/parent/parent/"
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
const std::string & getValueAsString(bool returnLinkTableValue=false) const
void getValue(T &value) const
ITRACEController * theTRACEController_
only define for an app that receives a command
bool isComplete()
get functions
Definition: ProgressBar.cc:88
void step()
thread safe
Definition: ProgressBar.cc:74
int read()
if stepsToComplete==0, then define any progress as 50%, thread safe
Definition: ProgressBar.cc:120
void complete()
declare complete, thread safe
Definition: ProgressBar.cc:95
void INIT_MF(const char *name)
std::string name
Also key in map.