otsdaq  3.03.00
ARTDAQSupervisor.cc
1 
2 
3 #define TRACEMF_USE_VERBATIM 1 // for trace longer path filenames
4 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisor.hh"
5 
6 #include "artdaq-core/Utilities/configureMessageFacility.hh"
7 #include "artdaq/BuildInfo/GetPackageBuildInfo.hh"
8 #include "artdaq/DAQdata/Globals.hh"
9 #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
10 #include "cetlib_except/exception.h"
11 #include "fhiclcpp/make_ParameterSet.h"
12 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisorTRACEController.h"
13 
14 #include "artdaq-core/Utilities/ExceptionHandler.hh" /*for artdaq::ExceptionHandler*/
15 
16 #include <boost/exception/all.hpp>
17 #include <boost/filesystem.hpp>
18 
19 #include <signal.h>
20 #include <regex>
21 
22 #define OUT_ON_ERR_SIZE 1000 //tail size of output to include on error
23 
24 using namespace ots;
25 
26 XDAQ_INSTANTIATOR_IMPL(ARTDAQSupervisor)
27 
28 #define FAKE_CONFIG_NAME "ots_config"
29 #define DAQINTERFACE_PORT \
30  std::atoi(__ENV__("ARTDAQ_BASE_PORT")) + \
31  (partition_ * std::atoi(__ENV__("ARTDAQ_PORTS_PER_PARTITION")))
32 
33 static ARTDAQSupervisor* instance = nullptr;
34 static std::unordered_map<int, struct sigaction> old_actions =
35  std::unordered_map<int, struct sigaction>();
36 static bool sighandler_init = false;
37 static void signal_handler(int signum)
38 {
39  // Messagefacility may already be gone at this point, TRACE ONLY!
40 #if TRACE_REVNUM < 1459
41  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
42 #else
43  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
44 #endif
45  << "A signal of type " << signum
46  << " was caught by ARTDAQSupervisor. Shutting down DAQInterface, "
47  "then proceeding with default handlers!";
48 
49  if(instance)
50  instance->destroy();
51 
52  sigset_t set;
53  pthread_sigmask(SIG_UNBLOCK, NULL, &set);
54  pthread_sigmask(SIG_UNBLOCK, &set, NULL);
55 
56 #if TRACE_REVNUM < 1459
57  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
58 #else
59  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
60 #endif
61  << "Calling default signal handler";
62  if(signum != SIGUSR2)
63  {
64  sigaction(signum, &old_actions[signum], NULL);
65  kill(getpid(), signum); // Only send signal to self
66  }
67  else
68  {
69  // Send Interrupt signal if parsing SIGUSR2 (i.e. user-defined exception that
70  // should tear down ARTDAQ)
71  sigaction(SIGINT, &old_actions[SIGINT], NULL);
72  kill(getpid(), SIGINT); // Only send signal to self
73  }
74 }
75 
76 static void init_sighandler(ARTDAQSupervisor* inst)
77 {
78  static std::mutex sighandler_mutex;
79  std::unique_lock<std::mutex> lk(sighandler_mutex);
80 
81  if(!sighandler_init)
82  {
83  sighandler_init = true;
84  instance = inst;
85  std::vector<int> signals = {
86  SIGINT,
87  SIGILL,
88  SIGABRT,
89  SIGFPE,
90  SIGSEGV,
91  SIGPIPE,
92  SIGALRM,
93  SIGTERM,
94  SIGUSR2,
95  SIGHUP}; // SIGQUIT is used by art in normal operation
96  for(auto signal : signals)
97  {
98  struct sigaction old_action;
99  sigaction(signal, NULL, &old_action);
100 
101  // If the old handler wasn't SIG_IGN (it's a handler that just
102  // "ignore" the signal)
103  if(old_action.sa_handler != SIG_IGN)
104  {
105  struct sigaction action;
106  action.sa_handler = signal_handler;
107  sigemptyset(&action.sa_mask);
108  for(auto sigblk : signals)
109  {
110  sigaddset(&action.sa_mask, sigblk);
111  }
112  action.sa_flags = 0;
113 
114  // Replace the signal handler of SIGINT with the one described by
115  // new_action
116  sigaction(signal, &action, NULL);
117  old_actions[signal] = old_action;
118  }
119  }
120  }
121 }
122 
123 //==============================================================================
124 ARTDAQSupervisor::ARTDAQSupervisor(xdaq::ApplicationStub* stub)
125  : CoreSupervisorBase(stub)
126  , daqinterface_ptr_(NULL)
127  , partition_(getSupervisorProperty("partition", 0))
128  , daqinterface_state_("notrunning")
129  , runner_thread_(nullptr)
130 {
131  __SUP_COUT__ << "Constructor." << __E__;
132 
133  INIT_MF("." /*directory used is USER_DATA/LOG/.*/);
134  init_sighandler(this);
135 
136  // Only use system Python
137  // unsetenv("PYTHONPATH");
138  // unsetenv("PYTHONHOME");
139 
140  // Write out settings file
141  auto settings_file = __ENV__("DAQINTERFACE_SETTINGS");
142  std::ofstream o(settings_file, std::ios::trunc);
143 
144  setenv("DAQINTERFACE_PARTITION_NUMBER", std::to_string(partition_).c_str(), 1);
145  auto logfileName = std::string(__ENV__("OTSDAQ_LOG_DIR")) +
146  "/DAQInteface/DAQInterface_partition" +
147  std::to_string(partition_) + ".log";
148  setenv("DAQINTERFACE_LOGFILE", logfileName.c_str(), 1);
149 
150  o << "log_directory: "
151  << getSupervisorProperty("log_directory", std::string(__ENV__("OTSDAQ_LOG_DIR")))
152  << std::endl;
153 
154  {
155  const std::string record_directory = getSupervisorProperty(
156  "record_directory", ARTDAQTableBase::ARTDAQ_FCL_PATH + "/run_records/");
157  mkdir(record_directory.c_str(), 0755);
158  o << "record_directory: " << record_directory << std::endl;
159  }
160 
161  o << "package_hashes_to_save: "
162  << getSupervisorProperty("package_hashes_to_save", "[artdaq]") << std::endl;
163 
164  o << "spack_root_for_bash_scripts: "
165  << getSupervisorProperty("spack_root_for_bash_scripts",
166  std::string(__ENV__("SPACK_ROOT")))
167  << std::endl;
168  o << "boardreader timeout: " << getSupervisorProperty("boardreader_timeout", 30)
169  << std::endl;
170  o << "eventbuilder timeout: " << getSupervisorProperty("eventbuilder_timeout", 30)
171  << std::endl;
172  o << "datalogger timeout: " << getSupervisorProperty("datalogger_timeout", 30)
173  << std::endl;
174  o << "dispatcher timeout: " << getSupervisorProperty("dispatcher_timeout", 30)
175  << std::endl;
176  // Only put max_fragment_size_bytes into DAQInterface settings file if advanced_memory_usage is disabled
177  if(!getSupervisorProperty("advanced_memory_usage", false))
178  {
179  o << "max_fragment_size_bytes: "
180  << getSupervisorProperty("max_fragment_size_bytes", 1048576) << std::endl;
181  }
182  o << "transfer_plugin_to_use: "
183  << getSupervisorProperty("transfer_plugin_to_use", "TCPSocket") << std::endl;
184  if(getSupervisorProperty("transfer_plugin_from_brs", "") != "")
185  {
186  o << "transfer_plugin_from_brs: "
187  << getSupervisorProperty("transfer_plugin_from_brs", "") << std::endl;
188  }
189  if(getSupervisorProperty("transfer_plugin_from_ebs", "") != "")
190  {
191  o << "transfer_plugin_from_ebs: "
192  << getSupervisorProperty("transfer_plugin_from_ebs", "") << std::endl;
193  }
194  if(getSupervisorProperty("transfer_plugin_from_dls", "") != "")
195  {
196  o << "transfer_plugin_from_dls: "
197  << getSupervisorProperty("transfer_plugin_from_dls", "") << std::endl;
198  }
199  o << "all_events_to_all_dispatchers: " << std::boolalpha
200  << getSupervisorProperty("all_events_to_all_dispatchers", true) << std::endl;
201  o << "data_directory_override: "
202  << getSupervisorProperty("data_directory_override",
203  std::string(__ENV__("ARTDAQ_OUTPUT_DIR")))
204  << std::endl;
205  o << "max_configurations_to_list: "
206  << getSupervisorProperty("max_configurations_to_list", 10) << std::endl;
207  o << "disable_unique_rootfile_labels: "
208  << getSupervisorProperty("disable_unique_rootfile_labels", false) << std::endl;
209  o << "use_messageviewer: " << std::boolalpha
210  << getSupervisorProperty("use_messageviewer", false) << std::endl;
211  o << "use_messagefacility: " << std::boolalpha
212  << getSupervisorProperty("use_messagefacility", true) << std::endl;
213  o << "fake_messagefacility: " << std::boolalpha
214  << getSupervisorProperty("fake_messagefacility", false) << std::endl;
215  o << "kill_existing_processes: " << std::boolalpha
216  << getSupervisorProperty("kill_existing_processes", true) << std::endl;
217  o << "advanced_memory_usage: " << std::boolalpha
218  << getSupervisorProperty("advanced_memory_usage", false) << std::endl;
219  o << "strict_fragment_id_mode: " << std::boolalpha
220  << getSupervisorProperty("strict_fragment_id_mode", false) << std::endl;
221  o << "disable_private_network_bookkeeping: " << std::boolalpha
222  << getSupervisorProperty("disable_private_network_bookkeeping", false) << std::endl;
223  o << "allowed_processors: " << getSupervisorProperty("allowed_processors", "0-255")
224  << std::
225  endl; // Note this sets a taskset for ALL processes, on all nodes (ex. "1,2,5-7")
226 
227  o.close();
228 
229  // destroy current TRACEController and instantiate ARTDAQSupervisorTRACEController
230  if(CorePropertySupervisorBase::theTRACEController_)
231  {
232  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
233  delete CorePropertySupervisorBase::
234  theTRACEController_; // destruct current TRACEController
235  CorePropertySupervisorBase::theTRACEController_ = nullptr;
236  }
237  CorePropertySupervisorBase::theTRACEController_ =
239  ((ARTDAQSupervisorTRACEController*)CorePropertySupervisorBase::theTRACEController_)
240  ->setSupervisorPtr(this);
241 
242  __SUP_COUT__ << "Constructed." << __E__;
243 } // end constructor()
244 
245 //==============================================================================
246 ARTDAQSupervisor::~ARTDAQSupervisor(void)
247 {
248  __SUP_COUT__ << "Destructor." << __E__;
249  destroy();
250  __SUP_COUT__ << "Destructed." << __E__;
251 } // end destructor()
252 
253 //==============================================================================
254 void ARTDAQSupervisor::destroy(void)
255 {
256  __SUP_COUT__ << "Destroying..." << __E__;
257 
258  if(daqinterface_ptr_ != NULL)
259  {
260  __SUP_COUT__ << "Calling recover transition" << __E__;
261  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
262  PyObject* pName = PyUnicode_FromString("do_recover");
263  /*PyObject* res =*/PyObject_CallMethodObjArgs(
264  daqinterface_ptr_, pName, NULL);
265 
266  __SUP_COUT__ << "Making sure that correct state has been reached" << __E__;
267  getDAQState_();
268  while(daqinterface_state_ != "stopped")
269  {
270  getDAQState_();
271  __SUP_COUT__ << "State is " << daqinterface_state_
272  << ", waiting 1s and retrying..." << __E__;
273  usleep(1000000);
274  }
275 
276  // Cleanup
277  Py_XDECREF(daqinterface_ptr_);
278  // Py_XDECREF(pStateArgs2);
279  // Py_XDECREF(out_text);
280  // Py_XDECREF(err_text);
281  // Py_XDECREF(sys_stdout);
282  // Py_XDECREF(sys_stderr);
283  // Py_XDECREF(stringIO_out);
284  // Py_XDECREF(stringIO_err);
285  // Py_XDECREF(io);
286  // Py_XDECREF(sys);
287  daqinterface_ptr_ = NULL;
288  }
289 
290  __SUP_COUT__ << "Flusing printouts" << __E__;
291 
292  //make sure to flush printouts
293  PyRun_SimpleString(R"(
294 import sys
295 sys.stdout = sys.__stdout__
296 sys.stderr = sys.__stderr__
297 )");
298  Py_XDECREF(stringIO_out);
299  Py_XDECREF(stringIO_err);
300 
301  __SUP_COUT__ << "Thread and garbage cleanup" << __E__;
302  //force python thread cleanup:
303  PyRun_SimpleString(
304  "import threading; [t.join() for t in threading.enumerate() if t is not "
305  "threading.main_thread()]");
306  PyRun_SimpleString("import gc; gc.collect()");
307  Py_Finalize();
308 
309  // CorePropertySupervisorBase would destroy, but since it was created here, attempt to destroy
311  {
312  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
315  }
316 
317  __SUP_COUT__ << "Destroyed." << __E__;
318 } // end destroy()
319 
320 //==============================================================================
321 void ARTDAQSupervisor::init(void)
322 {
323  stop_runner_();
324 
325  __SUP_COUT__ << "Initializing..." << __E__;
326  {
327  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
328 
329  // allSupervisorInfo_.init(getApplicationContext());
330  artdaq::configureMessageFacility("ARTDAQSupervisor");
331  __SUP_COUT__ << "artdaq MF configured." << __E__;
332 
333  // initialization
334  char* daqinterface_dir = getenv("ARTDAQ_DAQINTERFACE_DIR");
335  if(daqinterface_dir == NULL)
336  {
337  __SS__ << "ARTDAQ_DAQINTERFACE_DIR environment variable not set! This "
338  "means that DAQInterface has not been setup!"
339  << __E__;
340  __SUP_SS_THROW__;
341  }
342  else
343  {
344  __SUP_COUT__ << "Initializing Python" << __E__;
345  Py_Initialize();
346 
347  __SUP_COUT__ << "Adding DAQInterface directory to PYTHON_PATH" << __E__;
348  PyObject* sysPath = PySys_GetObject((char*)"path");
349  PyObject* programName = PyUnicode_FromString(daqinterface_dir);
350  PyList_Append(sysPath, programName);
351  Py_DECREF(programName);
352 
353  __SUP_COUT__ << "Creating Module name" << __E__;
354  PyObject* pName = PyUnicode_FromString("rc.control.daqinterface");
355  /* Error checking of pName left out */
356 
357  __SUP_COUT__ << "Importing module" << __E__;
358  PyObject* pModule = PyImport_Import(pName);
359  Py_DECREF(pName);
360 
361  if(pModule == NULL)
362  {
363  PyErr_Print();
364  __SS__ << "Failed to load rc.control.daqinterface" << __E__;
365  __SUP_SS_THROW__;
366  }
367  else
368  {
369  __SUP_COUT__ << "Loading python module dictionary" << __E__;
370  PyObject* pDict = PyModule_GetDict(pModule);
371  if(pDict == NULL)
372  {
373  PyErr_Print();
374  __SS__ << "Unable to load module dictionary" << __E__;
375  __SUP_SS_THROW__;
376  }
377  else
378  {
379  Py_DECREF(pModule);
380 
381  __SUP_COUT__ << "Getting DAQInterface object pointer" << __E__;
382  PyObject* di_obj_ptr = PyDict_GetItemString(pDict, "DAQInterface");
383 
384  __SUP_COUT__ << "Filling out DAQInterface args struct" << __E__;
385  PyObject* pArgs = PyTuple_New(0);
386 
387  PyObject* kwargs = Py_BuildValue("{s:s, s:s, s:i, s:i, s:s, s:s}",
388  "logpath",
389  ".daqint.log",
390  "name",
391  "DAQInterface",
392  "partition_number",
393  partition_,
394  "rpc_port",
395  DAQINTERFACE_PORT,
396  "rpc_host",
397  "localhost",
398  "control_host",
399  "localhost");
400 
401  __SUP_COUT__ << "Calling DAQInterface Object Constructor" << __E__;
402 
403  //------------- redirect stdout to string
404  // Get sys and io
405  PyObject* sys = PyImport_ImportModule("sys");
406  PyObject* io = PyImport_ImportModule("io");
407 
408  // Create StringIO objects for stdout and stderr
409  stringIO_out = PyObject_CallMethod(io, "StringIO", NULL);
410  stringIO_err = PyObject_CallMethod(io, "StringIO", NULL);
411 
412  // Save originals (not needed, since just keep the redirection until daqinterface_ptr_ is destructed)
413  // PyObject* sys_stdout = PyObject_GetAttrString(sys, "stdout");
414  // PyObject* sys_stderr = PyObject_GetAttrString(sys, "stderr");
415 
416  // Redirect
417  PyObject_SetAttrString(sys, "stdout", stringIO_out);
418  PyObject_SetAttrString(sys, "stderr", stringIO_err);
419  //------------- end redirect stdout to string
420 
421  daqinterface_ptr_ = PyObject_Call(di_obj_ptr, pArgs, kwargs);
422 
423  if(0) //example printout handling
424  {
425  // Force an error
426  PyObject* bad = PyObject_CallMethod(sys, "does_not_exist", NULL);
427  if(!bad)
428  PyErr_Print(); // <-- this writes into stringIO_err, not the terminal
429 
430  // Grab stderr contents
431  PyObject* err_text =
432  PyObject_CallMethod(stringIO_err, "getvalue", NULL);
433  if(err_text)
434  __COUT__ << "Captured stderr:\n"
435  << PyUnicode_AsUTF8(err_text) << "\n";
436  else
437  __COUT__ << "Capture of stderr failed.";
438  } //end example printout handling
439 
440  // Cleanup
441  Py_DECREF(di_obj_ptr);
442  Py_XDECREF(sys);
443  Py_XDECREF(io);
444  }
445  }
446  }
447 
448  getDAQState_();
449 
450  // { //attempt to cleanup old artdaq processes DOES NOT WORK because artdaq interface knows it hasn't started
451  // __SUP_COUT__ << "Attempting artdaq stale cleanup..." << __E__;
452  // std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
453  // getDAQState_();
454  // __SUP_COUT__ << "Status before cleanup: " << daqinterface_state_ << __E__;
455 
456  // PyObject* pName = PyUnicode_FromString("do_recover");
457  // PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
458  // __COUT_MULTI_LBL__(0,captureStderrAndStdout_("do_recover"),"do_recover");
459 
460  // if(res == NULL)
461  // {
462  // std::string err = capturePyErr("do_recover");
463  // __SS__ << "Error with clean up calling do_recover: " << err << __E__;
464  // __SUP_SS_THROW__;
465  // }
466  // getDAQState_();
467  // __SUP_COUT__ << "Status after cleanup: " << daqinterface_state_ << __E__;
468  // __SUP_COUT__ << "cleanup DONE." << __E__;
469  // }
470  }
471  start_runner_();
472  __SUP_COUT__ << "Initialized." << __E__;
473 } // end init()
474 
475 //==============================================================================
476 void ARTDAQSupervisor::transitionConfiguring(toolbox::Event::Reference /*event*/)
477 {
478  __SUP_COUT__ << "transitionConfiguring" << __E__;
479 
480  // activate the configuration tree (the first iteration)
481  if(RunControlStateMachine::getIterationIndex() == 0 &&
482  RunControlStateMachine::getSubIterationIndex() == 0)
483  {
484  thread_error_message_ = "";
485  thread_progress_bar_.resetProgressBar(0);
486  last_thread_progress_update_ = time(0); // initialize timeout timer
487 
488  std::pair<std::string /*group name*/, TableGroupKey> theGroup(
489  SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
490  .getParameters()
491  .getValue("ConfigurationTableGroupName"),
492  TableGroupKey(SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
493  .getParameters()
494  .getValue("ConfigurationTableGroupKey")));
495 
496  __SUP_COUT__ << "Configuration table group name: " << theGroup.first
497  << " key: " << theGroup.second << __E__;
498 
499  try
500  {
501  // disable version tracking to accept untracked versions to be selected by the FSM transition source
502  theConfigurationManager_->loadTableGroup(
503  theGroup.first,
504  theGroup.second,
505  true /*doActivate*/,
506  0,
507  0,
508  0,
509  0,
510  0,
511  0,
512  false,
513  0,
514  0,
515  ConfigurationManager::LoadGroupType::ALL_TYPES,
516  true /*ignoreVersionTracking*/);
517  }
518  catch(const std::runtime_error& e)
519  {
520  __SS__ << "Error loading table group '" << theGroup.first << "("
521  << theGroup.second << ")! \n"
522  << e.what() << __E__;
523  __SUP_COUT_ERR__ << ss.str();
524  // ExceptionHandler(ExceptionHandlerRethrow::no, ss.str());
525 
526  //__SS_THROW_ONLY__;
527  theStateMachine_.setErrorMessage(ss.str());
528  throw toolbox::fsm::exception::Exception(
529  "Transition Error" /*name*/,
530  ss.str() /* message*/,
531  "ARTDAQSupervisor::transitionConfiguring" /*module*/,
532  __LINE__ /*line*/,
533  __FUNCTION__ /*function*/
534  );
535  }
536  catch(...)
537  {
538  __SS__ << "Unknown error loading table group '" << theGroup.first << "("
539  << theGroup.second << ")!" << __E__;
540  __SUP_COUT_ERR__ << ss.str();
541  // ExceptionHandler(ExceptionHandlerRethrow::no, ss.str());
542 
543  //__SS_THROW_ONLY__;
544  theStateMachine_.setErrorMessage(ss.str());
545  throw toolbox::fsm::exception::Exception(
546  "Transition Error" /*name*/,
547  ss.str() /* message*/,
548  "ARTDAQSupervisor::transitionConfiguring" /*module*/,
549  __LINE__ /*line*/,
550  __FUNCTION__ /*function*/
551  );
552  }
553 
554  // start configuring thread
555  std::thread(&ARTDAQSupervisor::configuringThread, this).detach();
556 
557  __SUP_COUT__ << "Configuring thread started." << __E__;
558 
559  RunControlStateMachine::
560  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
561  }
562  else // not first time
563  {
564  std::string errorMessage;
565  {
566  std::lock_guard<std::mutex> lock(
567  thread_mutex_); // lock out for remainder of scope
568  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
569  }
570  int progress = thread_progress_bar_.read();
571  __SUP_COUTV__(errorMessage);
572  __SUP_COUTV__(progress);
573  __SUP_COUTV__(thread_progress_bar_.isComplete());
574 
575  // check for done and error messages
576  if(errorMessage == "" && // if no update in 600 seconds, give up
577  time(0) - last_thread_progress_update_ > 600)
578  {
579  __SUP_SS__ << "There has been no update from the configuration thread for "
580  << (time(0) - last_thread_progress_update_)
581  << " seconds, assuming something is wrong and giving up! "
582  << "Last progress received was " << progress << __E__;
583  errorMessage = ss.str();
584  }
585 
586  if(errorMessage != "")
587  {
588  __SUP_SS__ << "Error was caught in configuring thread: " << errorMessage
589  << __E__;
590  __SUP_COUT_ERR__ << "\n" << ss.str();
591 
592  theStateMachine_.setErrorMessage(ss.str());
593  throw toolbox::fsm::exception::Exception(
594  "Transition Error" /*name*/,
595  ss.str() /* message*/,
596  "CoreSupervisorBase::transitionConfiguring" /*module*/,
597  __LINE__ /*line*/,
598  __FUNCTION__ /*function*/
599  );
600  }
601 
602  if(!thread_progress_bar_.isComplete())
603  {
604  RunControlStateMachine::
605  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
606 
607  if(last_thread_progress_read_ != progress)
608  {
609  last_thread_progress_read_ = progress;
610  last_thread_progress_update_ = time(0);
611  }
612 
613  sleep(1 /*seconds*/);
614  }
615  else
616  {
617  __SUP_COUT_INFO__ << "Complete configuring transition!" << __E__;
618  __SUP_COUTV__(getProcessInfo_());
619  }
620  }
621 
622  return;
623 } // end transitionConfiguring()
624 
625 //==============================================================================
626 void ARTDAQSupervisor::configuringThread()
627 try
628 {
629  std::string uid = theConfigurationManager_
630  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
631  "/" + CorePropertySupervisorBase::getSupervisorUID() +
632  "/" + "LinkToSupervisorTable")
633  .getValueAsString();
634 
635  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
636 
637  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
638 
639  ConfigurationTree theSupervisorNode = getSupervisorTableNode();
640 
641  thread_progress_bar_.step();
642 
643  set_thread_message_("ConfigGen");
644 
645  auto info = ARTDAQTableBase::extractARTDAQInfo(
646  theSupervisorNode,
647  false /*getStatusFalseNodes*/,
648  true /*doWriteFHiCL*/,
649  getSupervisorProperty("max_fragment_size_bytes", 8888),
650  getSupervisorProperty("routing_timeout_ms", 1999),
651  getSupervisorProperty("routing_retry_count", 12),
652  &thread_progress_bar_);
653 
654  // Check lists
655  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::BoardReader) == 0)
656  {
657  __GEN_SS__ << "There must be at least one enabled BoardReader!" << __E__;
658  __GEN_SS_THROW__;
659  return;
660  }
661  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::EventBuilder) == 0)
662  {
663  __GEN_SS__ << "There must be at least one enabled EventBuilder!" << __E__;
664  __GEN_SS_THROW__;
665  return;
666  }
667 
668  thread_progress_bar_.step();
669  set_thread_message_("Writing boot.txt");
670 
671  __GEN_COUT__ << "Writing boot.txt" << __E__;
672 
673  int debugLevel = theSupervisorNode.getNode("DAQInterfaceDebugLevel").getValue<int>();
674  std::string setupScript = theSupervisorNode.getNode("DAQSetupScript").getValue();
675 
676  std::ofstream o(ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt", std::ios::trunc);
677  o << "DAQ setup script: " << setupScript << std::endl;
678  o << "debug level: " << debugLevel << std::endl;
679  o << std::endl;
680 
681  if(info.subsystems.size() > 1)
682  {
683  for(auto& ss : info.subsystems)
684  {
685  if(ss.first == 0)
686  continue;
687  o << "Subsystem id: " << ss.first << std::endl;
688  if(ss.second.destination != 0)
689  {
690  o << "Subsystem destination: " << ss.second.destination << std::endl;
691  }
692  for(auto& sss : ss.second.sources)
693  {
694  o << "Subsystem source: " << sss << std::endl;
695  }
696  if(ss.second.eventMode)
697  {
698  o << "Subsystem fragmentMode: False" << std::endl;
699  }
700  o << std::endl;
701  }
702  }
703 
704  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
705  {
706  o << "EventBuilder host: " << builder.hostname << std::endl;
707  o << "EventBuilder label: " << builder.label << std::endl;
708  label_to_proc_type_map_[builder.label] = "EventBuilder";
709  if(builder.subsystem != 1)
710  {
711  o << "EventBuilder subsystem: " << builder.subsystem << std::endl;
712  }
713  if(builder.allowed_processors != "")
714  {
715  o << "EventBuilder allowed_processors: " << builder.allowed_processors
716  << std::endl;
717  }
718  o << std::endl;
719  }
720  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
721  {
722  o << "DataLogger host: " << logger.hostname << std::endl;
723  o << "DataLogger label: " << logger.label << std::endl;
724  label_to_proc_type_map_[logger.label] = "DataLogger";
725  if(logger.subsystem != 1)
726  {
727  o << "DataLogger subsystem: " << logger.subsystem << std::endl;
728  }
729  if(logger.allowed_processors != "")
730  {
731  o << "DataLogger allowed_processors: " << logger.allowed_processors
732  << std::endl;
733  }
734  o << std::endl;
735  }
736  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
737  {
738  o << "Dispatcher host: " << dispatcher.hostname << std::endl;
739  o << "Dispatcher label: " << dispatcher.label << std::endl;
740  o << "Dispatcher port: " << dispatcher.port << std::endl;
741  label_to_proc_type_map_[dispatcher.label] = "Dispatcher";
742  if(dispatcher.subsystem != 1)
743  {
744  o << "Dispatcher subsystem: " << dispatcher.subsystem << std::endl;
745  }
746  if(dispatcher.allowed_processors != "")
747  {
748  o << "Dispatcher allowed_processors: " << dispatcher.allowed_processors
749  << std::endl;
750  }
751  o << std::endl;
752  }
753  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
754  {
755  o << "RoutingManager host: " << rmanager.hostname << std::endl;
756  o << "RoutingManager label: " << rmanager.label << std::endl;
757  label_to_proc_type_map_[rmanager.label] = "RoutingManager";
758  if(rmanager.subsystem != 1)
759  {
760  o << "RoutingManager subsystem: " << rmanager.subsystem << std::endl;
761  }
762  if(rmanager.allowed_processors != "")
763  {
764  o << "RoutingManager allowed_processors: " << rmanager.allowed_processors
765  << std::endl;
766  }
767  o << std::endl;
768  }
769  o.close();
770 
771  thread_progress_bar_.step();
772  set_thread_message_("Writing Fhicl Files");
773 
774  __GEN_COUT__ << "Building configuration directory" << __E__;
775 
776  boost::system::error_code ignored;
777  boost::filesystem::remove_all(ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME,
778  ignored);
779  mkdir((ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME).c_str(), 0755);
780 
781  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
782  {
783  symlink(ARTDAQTableBase::getFlatFHICLFilename(
784  ARTDAQTableBase::ARTDAQAppType::BoardReader, reader.label)
785  .c_str(),
786  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
787  reader.label + ".fcl")
788  .c_str());
789  }
790  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
791  {
792  symlink(ARTDAQTableBase::getFlatFHICLFilename(
793  ARTDAQTableBase::ARTDAQAppType::EventBuilder, builder.label)
794  .c_str(),
795  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
796  builder.label + ".fcl")
797  .c_str());
798  }
799  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
800  {
801  symlink(ARTDAQTableBase::getFlatFHICLFilename(
802  ARTDAQTableBase::ARTDAQAppType::DataLogger, logger.label)
803  .c_str(),
804  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
805  logger.label + ".fcl")
806  .c_str());
807  }
808  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
809  {
810  symlink(ARTDAQTableBase::getFlatFHICLFilename(
811  ARTDAQTableBase::ARTDAQAppType::Dispatcher, dispatcher.label)
812  .c_str(),
813  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
814  dispatcher.label + ".fcl")
815  .c_str());
816  }
817  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
818  {
819  symlink(ARTDAQTableBase::getFlatFHICLFilename(
820  ARTDAQTableBase::ARTDAQAppType::RoutingManager, rmanager.label)
821  .c_str(),
822  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
823  rmanager.label + ".fcl")
824  .c_str());
825  }
826 
827  thread_progress_bar_.step();
828 
829  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
830  getDAQState_();
831  if(daqinterface_state_ != "stopped" && daqinterface_state_ != "")
832  {
833  __GEN_SS__ << "Cannot configure DAQInterface because it is in the wrong state"
834  << " (" << daqinterface_state_ << " != stopped)!" << __E__;
835  __GEN_SS_THROW__
836  }
837 
838  set_thread_message_("Calling setdaqcomps");
839  __GEN_COUT__ << "Calling setdaqcomps" << __E__;
840  __GEN_COUT__ << "Status before setdaqcomps: " << daqinterface_state_ << __E__;
841  PyObject* pName1 = PyUnicode_FromString("setdaqcomps");
842 
843  PyObject* readerDict = PyDict_New();
844  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
845  {
846  label_to_proc_type_map_[reader.label] = "BoardReader";
847  PyObject* readerName = PyUnicode_FromString(reader.label.c_str());
848 
849  int list_size = reader.allowed_processors != "" ? 4 : 3;
850 
851  PyObject* readerData = PyList_New(list_size);
852  PyObject* readerHost = PyUnicode_FromString(reader.hostname.c_str());
853  PyObject* readerPort = PyUnicode_FromString("-1");
854  PyObject* readerSubsystem =
855  PyUnicode_FromString(std::to_string(reader.subsystem).c_str());
856  PyList_SetItem(readerData, 0, readerHost);
857  PyList_SetItem(readerData, 1, readerPort);
858  PyList_SetItem(readerData, 2, readerSubsystem);
859  if(reader.allowed_processors != "")
860  {
861  PyObject* readerAllowedProcessors =
862  PyUnicode_FromString(reader.allowed_processors.c_str());
863  PyList_SetItem(readerData, 3, readerAllowedProcessors);
864  }
865  PyDict_SetItem(readerDict, readerName, readerData);
866  }
867  PyObject* res1 =
868  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName1, readerDict, NULL);
869  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("setdaqcomps"), "setdaqcomps");
870 
871  Py_DECREF(readerDict);
872 
873  if(res1 == NULL)
874  {
875  std::string err = capturePyErr("setdaqcomps");
876  __GEN_SS__ << "Error calling setdaqcomps transition: " << err << __E__;
877  __GEN_SS_THROW__;
878  }
879 
880  getDAQState_();
881  __GEN_COUT__ << "Status after setdaqcomps: " << daqinterface_state_ << __E__;
882 
883  thread_progress_bar_.step();
884  set_thread_message_("Calling do_boot");
885  __GEN_COUT__ << "Calling do_boot" << __E__;
886  __GEN_COUT__ << "Status before boot: " << daqinterface_state_ << __E__;
887  PyObject* pName2 = PyUnicode_FromString("do_boot");
888  PyObject* pStateArgs1 =
889  PyUnicode_FromString((ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt").c_str());
890  PyObject* res2 =
891  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName2, pStateArgs1, NULL);
892  std::string doBootOutput = captureStderrAndStdout_("do_boot");
893  __COUT_MULTI_LBL__(0, doBootOutput, "do_boot");
894 
895  if(res2 == NULL)
896  {
897  std::string err = capturePyErr();
898  __GEN_COUT__ << "Error on first boost attempt, recovering and retrying: " << err
899  << __E__;
900 
901  PyObject* pName = PyUnicode_FromString("do_recover");
902  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
903  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_recover"), "do_recover");
904 
905  if(res == NULL)
906  {
907  std::string err = capturePyErr();
908  __GEN_SS__ << "Error calling recover transition!!!! " << err << __E__;
909  if(doBootOutput.size() > OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
910  ss << "... last " << OUT_ON_ERR_SIZE
911  << " characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
912  else
913  ss << doBootOutput;
914  __GEN_SS_THROW__;
915  }
916 
917  thread_progress_bar_.step();
918  set_thread_message_("Calling do_boot (retry)");
919  __GEN_COUT__ << "Calling do_boot again" << __E__;
920  __GEN_COUT__ << "Status before boot: " << daqinterface_state_ << __E__;
921  PyObject* res3 =
922  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName2, pStateArgs1, NULL);
923  doBootOutput = captureStderrAndStdout_("do_boot (retry)");
924  __COUT_MULTI_LBL__(0, doBootOutput, "do_boot (retry)");
925 
926  if(res3 == NULL)
927  {
928  std::string err = capturePyErr();
929  __GEN_SS__ << "Error calling boot transition (2nd try): " << err << __E__;
930  if(doBootOutput.size() > OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
931  ss << "... last " << OUT_ON_ERR_SIZE
932  << " characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
933  else
934  ss << doBootOutput;
935  __GEN_SS_THROW__;
936  }
937  }
938 
939  getDAQState_();
940  if(daqinterface_state_ != "booted")
941  {
942  __GEN_SS__ << "DAQInterface boot transition failed! "
943  << "Status after boot attempt: " << daqinterface_state_ << __E__;
944  if(doBootOutput.size() > OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
945  ss << "... last " << OUT_ON_ERR_SIZE
946  << " characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
947  else
948  ss << doBootOutput;
949  __GEN_SS_THROW__;
950  }
951  __GEN_COUT__ << "Status after boot: " << daqinterface_state_ << __E__;
952 
953  thread_progress_bar_.step();
954  set_thread_message_("Calling do_config");
955  __GEN_COUT__ << "Calling do_config" << __E__;
956  __GEN_COUT__ << "Status before config: " << daqinterface_state_ << __E__;
957  std::string doConfigOutput = "";
958  { //do_config call
959  PyObject* pName3 = PyUnicode_FromString("do_config");
960  PyObject* pStateArgs2 = Py_BuildValue("[s]", FAKE_CONFIG_NAME);
961  PyObject* res3 =
962  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName3, pStateArgs2, NULL);
963  doConfigOutput = captureStderrAndStdout_("do_config");
964  __COUT_MULTI_LBL__(0, doConfigOutput, "do_config");
965  if(res3 == NULL)
966  {
967  std::string err = capturePyErr("do_config");
968  __GEN_SS__ << "Error calling config transition: " << err << __E__;
969  __GEN_SS_THROW__;
970  }
971  const char* res_cstr = PyUnicode_AsUTF8(res3);
972  __SUP_COUTT__ << "do_config result=" << (res_cstr ? res_cstr : "") << __E__;
973  } //end do_config call
974 
975  getDAQState_();
976  if(daqinterface_state_ != "ready")
977  {
978  __GEN_SS__ << "DAQInterface config transition failed!" << __E__
979  << "Supervisor state: \"" << daqinterface_state_ << "\" != \"ready\" "
980  << __E__;
981  auto doConfigOutput_recover_i =
982  doConfigOutput.find("RECOVER transition underway");
983  if(doConfigOutput_recover_i == std::string::npos)
984  ss << doConfigOutput;
985  else if(doConfigOutput_recover_i >
986  OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
987  ss << "... tail of " << OUT_ON_ERR_SIZE << " characters before recovery: "
988  << doConfigOutput.substr(
989  doConfigOutput_recover_i - OUT_ON_ERR_SIZE +
990  std::string("RECOVER transition underway").size(),
991  OUT_ON_ERR_SIZE);
992  else
993  ss << doConfigOutput.substr(
994  0,
995  doConfigOutput_recover_i +
996  std::string("RECOVER transition underway").size());
997  __GEN_SS_THROW__;
998  }
999  __GEN_COUT__ << "Status after config: " << daqinterface_state_ << __E__;
1000  thread_progress_bar_.complete();
1001  set_thread_message_("Configured");
1002  __GEN_COUT__ << "Configured." << __E__;
1003 
1004 } // end configuringThread()
1005 catch(const std::runtime_error& e)
1006 {
1007  set_thread_message_("ERROR");
1008  __SS__ << "Error was caught while configuring: " << e.what() << __E__;
1009  __COUT_ERR__ << "\n" << ss.str();
1010  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1011  thread_error_message_ = ss.str();
1012 }
1013 catch(...)
1014 {
1015  set_thread_message_("ERROR");
1016  __SS__ << "Unknown error was caught while configuring. Please checked the logs."
1017  << __E__;
1018  __COUT_ERR__ << "\n" << ss.str();
1019 
1020  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1021 
1022  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1023  thread_error_message_ = ss.str();
1024 } // end configuringThread() error handling
1025 
1026 //==============================================================================
1027 void ARTDAQSupervisor::transitionHalting(toolbox::Event::Reference /*event*/)
1028 try
1029 {
1030  set_thread_message_("Halting");
1031  __SUP_COUT__ << "Halting..." << __E__;
1032  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1033  getDAQState_();
1034  __SUP_COUT__ << "Status before halt: " << daqinterface_state_ << __E__;
1035 
1036  if(daqinterface_state_ == "running")
1037  {
1038  // First stop before halting
1039  PyObject* pName = PyUnicode_FromString("do_stop_running");
1040  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1041  __COUT_MULTI_LBL__(
1042  0, captureStderrAndStdout_("do_stop_running"), "do_stop_running");
1043 
1044  if(res == NULL)
1045  {
1046  std::string err = capturePyErr();
1047  __SS__ << "Error calling DAQ Interface stop transition: " << err << __E__;
1048  __SUP_SS_THROW__;
1049  }
1050  }
1051 
1052  PyObject* pName = PyUnicode_FromString("do_command");
1053  PyObject* pArg = PyUnicode_FromString("Shutdown");
1054  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1055  __COUT_MULTI_LBL__(
1056  0, captureStderrAndStdout_("do_command Shutdown"), "do_command Shutdown");
1057 
1058  if(res == NULL)
1059  {
1060  std::string err = capturePyErr();
1061  __SS__ << "Error calling DAQ Interface halt transition: " << err << __E__;
1062  __SUP_SS_THROW__;
1063  }
1064 
1065  getDAQState_();
1066  __SUP_COUT__ << "Status after halt: " << daqinterface_state_ << __E__;
1067  __SUP_COUT__ << "Halted." << __E__;
1068  set_thread_message_("Halted");
1069 } // end transitionHalting()
1070 catch(const std::runtime_error& e)
1071 {
1072  const std::string transitionName = "Halting";
1073  // if halting from Failed state, then ignore errors
1074  if(theStateMachine_.getProvenanceStateName() ==
1075  RunControlStateMachine::FAILED_STATE_NAME ||
1076  theStateMachine_.getProvenanceStateName() ==
1077  RunControlStateMachine::HALTED_STATE_NAME)
1078  {
1079  __SUP_COUT_INFO__ << "Error was caught while halting (but ignoring because "
1080  "previous state was '"
1081  << RunControlStateMachine::FAILED_STATE_NAME
1082  << "'): " << e.what() << __E__;
1083  }
1084  else // if not previously in Failed state, then fail
1085  {
1086  __SUP_SS__ << "Error was caught while " << transitionName << ": " << e.what()
1087  << __E__;
1088  __SUP_COUT_ERR__ << "\n" << ss.str();
1089  theStateMachine_.setErrorMessage(ss.str());
1090  throw toolbox::fsm::exception::Exception(
1091  "Transition Error" /*name*/,
1092  ss.str() /* message*/,
1093  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
1094  __LINE__ /*line*/,
1095  __FUNCTION__ /*function*/
1096  );
1097  }
1098 } // end transitionHalting() std::runtime_error exception handling
1099 catch(...)
1100 {
1101  const std::string transitionName = "Halting";
1102  // if halting from Failed state, then ignore errors
1103  if(theStateMachine_.getProvenanceStateName() ==
1104  RunControlStateMachine::FAILED_STATE_NAME ||
1105  theStateMachine_.getProvenanceStateName() ==
1106  RunControlStateMachine::HALTED_STATE_NAME)
1107  {
1108  __SUP_COUT_INFO__ << "Unknown error was caught while halting (but ignoring "
1109  "because previous state was '"
1110  << RunControlStateMachine::FAILED_STATE_NAME << "')." << __E__;
1111  }
1112  else // if not previously in Failed state, then fail
1113  {
1114  __SUP_SS__ << "Unknown error was caught while " << transitionName
1115  << ". Please checked the logs." << __E__;
1116  __SUP_COUT_ERR__ << "\n" << ss.str();
1117  theStateMachine_.setErrorMessage(ss.str());
1118 
1119  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1120 
1121  throw toolbox::fsm::exception::Exception(
1122  "Transition Error" /*name*/,
1123  ss.str() /* message*/,
1124  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
1125  __LINE__ /*line*/,
1126  __FUNCTION__ /*function*/
1127  );
1128  }
1129 } // end transitionHalting() exception handling
1130 
1131 //==============================================================================
1132 void ARTDAQSupervisor::transitionInitializing(toolbox::Event::Reference /*event*/)
1133 try
1134 {
1135  set_thread_message_("Initializing");
1136  __SUP_COUT__ << "Initializing..." << __E__;
1137  init();
1138  __SUP_COUT__ << "Initialized." << __E__;
1139  set_thread_message_("Initialized");
1140 } // end transitionInitializing()
1141 catch(const std::runtime_error& e)
1142 {
1143  __SS__ << "Error was caught while Initializing: " << e.what() << __E__;
1144  __SS_THROW__;
1145 }
1146 catch(...)
1147 {
1148  __SS__ << "Unknown error was caught while Initializing. Please checked the logs."
1149  << __E__;
1150  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1151  __SS_THROW__;
1152 } // end transitionInitializing() error handling
1153 
1154 //==============================================================================
1155 void ARTDAQSupervisor::transitionPausing(toolbox::Event::Reference /*event*/)
1156 try
1157 {
1158  set_thread_message_("Pausing");
1159  __SUP_COUT__ << "Pausing..." << __E__;
1160  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1161 
1162  getDAQState_();
1163  __SUP_COUT__ << "Status before pause: " << daqinterface_state_ << __E__;
1164 
1165  PyObject* pName = PyUnicode_FromString("do_command");
1166  PyObject* pArg = PyUnicode_FromString("Pause");
1167  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1168  __COUT_MULTI_LBL__(
1169  0, captureStderrAndStdout_("do_command Pause"), "do_command Pause");
1170 
1171  if(res == NULL)
1172  {
1173  std::string err = capturePyErr();
1174  __SS__ << "Error calling DAQ Interface Pause transition: " << err << __E__;
1175  __SUP_SS_THROW__;
1176  }
1177 
1178  getDAQState_();
1179  __SUP_COUT__ << "Status after pause: " << daqinterface_state_ << __E__;
1180 
1181  __SUP_COUT__ << "Paused." << __E__;
1182  set_thread_message_("Paused");
1183 } // end transitionPausing()
1184 catch(const std::runtime_error& e)
1185 {
1186  __SS__ << "Error was caught while Pausing: " << e.what() << __E__;
1187  __SS_THROW__;
1188 }
1189 catch(...)
1190 {
1191  __SS__ << "Unknown error was caught while Pausing. Please checked the logs." << __E__;
1192  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1193  __SS_THROW__;
1194 } // end transitionPausing() error handling
1195 
1196 //==============================================================================
1197 void ARTDAQSupervisor::transitionResuming(toolbox::Event::Reference /*event*/)
1198 try
1199 {
1200  set_thread_message_("Resuming");
1201  __SUP_COUT__ << "Resuming..." << __E__;
1202  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1203 
1204  getDAQState_();
1205  __SUP_COUT__ << "Status before resume: " << daqinterface_state_ << __E__;
1206  PyObject* pName = PyUnicode_FromString("do_command");
1207  PyObject* pArg = PyUnicode_FromString("Resume");
1208  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1209  __COUT_MULTI_LBL__(
1210  0, captureStderrAndStdout_("do_command Resume"), "do_command Resume");
1211 
1212  if(res == NULL)
1213  {
1214  std::string err = capturePyErr();
1215  __SS__ << "Error calling DAQ Interface Resume transition: " << err << __E__;
1216  __SUP_SS_THROW__;
1217  }
1218  getDAQState_();
1219  __SUP_COUT__ << "Status after resume: " << daqinterface_state_ << __E__;
1220  __SUP_COUT__ << "Resumed." << __E__;
1221  set_thread_message_("Resumed");
1222 } // end transitionResuming()
1223 catch(const std::runtime_error& e)
1224 {
1225  __SS__ << "Error was caught while Resuming: " << e.what() << __E__;
1226  __SS_THROW__;
1227 }
1228 catch(...)
1229 {
1230  __SS__ << "Unknown error was caught while Resuming. Please checked the logs."
1231  << __E__;
1232  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1233  __SS_THROW__;
1234 } // end transitionResuming() error handling
1235 
1236 //==============================================================================
1237 void ARTDAQSupervisor::transitionStarting(toolbox::Event::Reference /*event*/)
1238 try
1239 {
1240  __SUP_COUT__ << "transitionStarting" << __E__;
1241 
1242  // first time launch thread because artdaq Supervisor may take a while
1243  if(RunControlStateMachine::getIterationIndex() == 0 &&
1244  RunControlStateMachine::getSubIterationIndex() == 0)
1245  {
1246  thread_error_message_ = "";
1247  thread_progress_bar_.resetProgressBar(0);
1248  last_thread_progress_update_ = time(0); // initialize timeout timer
1249 
1250  // start configuring thread
1251  std::thread(&ARTDAQSupervisor::startingThread, this).detach();
1252 
1253  __SUP_COUT__ << "Starting thread started." << __E__;
1254 
1255  RunControlStateMachine::
1256  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1257  }
1258  else // not first time
1259  {
1260  std::string errorMessage;
1261  {
1262  std::lock_guard<std::mutex> lock(
1263  thread_mutex_); // lock out for remainder of scope
1264  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
1265  }
1266  int progress = thread_progress_bar_.read();
1267  __SUP_COUTV__(errorMessage);
1268  __SUP_COUTV__(progress);
1269  __SUP_COUTV__(thread_progress_bar_.isComplete());
1270 
1271  // check for done and error messages
1272  if(errorMessage == "" && // if no update in 600 seconds, give up
1273  time(0) - last_thread_progress_update_ > 600)
1274  {
1275  __SUP_SS__ << "There has been no update from the start thread for "
1276  << (time(0) - last_thread_progress_update_)
1277  << " seconds, assuming something is wrong and giving up! "
1278  << "Last progress received was " << progress << __E__;
1279  errorMessage = ss.str();
1280  }
1281 
1282  if(errorMessage != "")
1283  {
1284  __SUP_SS__ << "Error was caught in starting thread: " << errorMessage
1285  << __E__;
1286  __SUP_COUT_ERR__ << "\n" << ss.str();
1287 
1288  theStateMachine_.setErrorMessage(ss.str());
1289  throw toolbox::fsm::exception::Exception(
1290  "Transition Error" /*name*/,
1291  ss.str() /* message*/,
1292  "CoreSupervisorBase::transitionStarting" /*module*/,
1293  __LINE__ /*line*/,
1294  __FUNCTION__ /*function*/
1295  );
1296  }
1297 
1298  if(!thread_progress_bar_.isComplete())
1299  {
1300  RunControlStateMachine::
1301  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1302 
1303  if(last_thread_progress_read_ != progress)
1304  {
1305  last_thread_progress_read_ = progress;
1306  last_thread_progress_update_ = time(0);
1307  }
1308 
1309  sleep(1 /*seconds*/);
1310  }
1311  else
1312  {
1313  __SUP_COUT_INFO__ << "Complete starting transition!" << __E__;
1314  __SUP_COUTV__(getProcessInfo_());
1315  }
1316  }
1317 
1318  return;
1319 
1320 } // end transitionStarting()
1321 catch(const std::runtime_error& e)
1322 {
1323  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1324  __SS_THROW__;
1325 }
1326 catch(...)
1327 {
1328  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1329  << __E__;
1330  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1331  __SS_THROW__;
1332 } // end transitionStarting() error handling
1333 
1334 //==============================================================================
1335 void ARTDAQSupervisor::startingThread()
1336 try
1337 {
1338  std::string uid = theConfigurationManager_
1339  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
1340  "/" + CorePropertySupervisorBase::getSupervisorUID() +
1341  "/" + "LinkToSupervisorTable")
1342  .getValueAsString();
1343 
1344  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
1345  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
1346  __GEN_COUT__ << "Starting..." << __E__;
1347  set_thread_message_("Starting");
1348 
1349  thread_progress_bar_.step();
1350  stop_runner_();
1351  {
1352  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1353  getDAQState_();
1354  __GEN_COUT__ << "Status before start: " << daqinterface_state_ << __E__;
1355  auto runNumber = SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
1356  .getParameters()
1357  .getValue("RunNumber");
1358 
1359  thread_progress_bar_.step();
1360 
1361  PyObject* pName = PyUnicode_FromString("do_start_running");
1362  int run_number = std::stoi(runNumber);
1363  PyObject* pStateArgs = PyLong_FromLong(run_number);
1364  PyObject* res =
1365  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pStateArgs, NULL);
1366  __COUT_MULTI_LBL__(
1367  0, captureStderrAndStdout_("do_start_running"), "do_start_running");
1368 
1369  thread_progress_bar_.step();
1370 
1371  if(res == NULL)
1372  {
1373  std::string err = capturePyErr();
1374  __SS__ << "Error calling start transition: " << err << __E__;
1375  __GEN_SS_THROW__;
1376  }
1377  getDAQState_();
1378 
1379  thread_progress_bar_.step();
1380 
1381  __GEN_COUT__ << "Status after start: " << daqinterface_state_ << __E__;
1382  if(daqinterface_state_ != "running")
1383  {
1384  __SS__ << "DAQInterface start transition failed!" << __E__;
1385  __GEN_SS_THROW__;
1386  }
1387 
1388  thread_progress_bar_.step();
1389  }
1390  start_runner_();
1391  set_thread_message_("Started");
1392  thread_progress_bar_.step();
1393 
1394  __GEN_COUT__ << "Started." << __E__;
1395  thread_progress_bar_.complete();
1396 
1397 } // end startingThread()
1398 catch(const std::runtime_error& e)
1399 {
1400  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1401  __COUT_ERR__ << "\n" << ss.str();
1402  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1403  thread_error_message_ = ss.str();
1404 }
1405 catch(...)
1406 {
1407  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1408  << __E__;
1409  __COUT_ERR__ << "\n" << ss.str();
1410 
1411  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1412 
1413  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1414  thread_error_message_ = ss.str();
1415 } // end startingThread() error handling
1416 
1417 //==============================================================================
1418 void ARTDAQSupervisor::transitionStopping(toolbox::Event::Reference /*event*/)
1419 try
1420 {
1421  __SUP_COUT__ << "Stopping..." << __E__;
1422  set_thread_message_("Stopping");
1423  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1424  getDAQState_();
1425  __SUP_COUT__ << "Status before stop: " << daqinterface_state_ << __E__;
1426  PyObject* pName = PyUnicode_FromString("do_stop_running");
1427  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1428  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_stop_running"), "do_stop_running");
1429 
1430  if(res == NULL)
1431  {
1432  std::string err = capturePyErr();
1433  __SS__ << "Error calling DAQ Interface stop transition: " << err << __E__;
1434  __SUP_SS_THROW__;
1435  }
1436  getDAQState_();
1437  __SUP_COUT__ << "Status after stop: " << daqinterface_state_ << __E__;
1438  __SUP_COUT__ << "Stopped." << __E__;
1439  set_thread_message_("Stopped");
1440 } // end transitionStopping()
1441 catch(const std::runtime_error& e)
1442 {
1443  __SS__ << "Error was caught while Stopping: " << e.what() << __E__;
1444  __SS_THROW__;
1445 }
1446 catch(...)
1447 {
1448  __SS__ << "Unknown error was caught while Stopping. Please checked the logs."
1449  << __E__;
1450  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1451  __SS_THROW__;
1452 } // end transitionStopping() error handling
1453 
1454 //==============================================================================
1455 void ots::ARTDAQSupervisor::enteringError(toolbox::Event::Reference /*event*/)
1456 {
1457  __SUP_COUT__ << "Entering error recovery state" << __E__;
1458  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1459  getDAQState_();
1460  __SUP_COUT__ << "Status before error: " << daqinterface_state_ << __E__;
1461 
1462  PyObject* pName = PyUnicode_FromString("do_recover");
1463  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1464  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_recover"), "do_recover");
1465 
1466  if(res == NULL)
1467  {
1468  std::string err = capturePyErr();
1469  __SS__ << "Error calling DAQ Interface recover transition: " << err << __E__;
1470  __SUP_SS_THROW__;
1471  }
1472  getDAQState_();
1473  __SUP_COUT__ << "Status after error: " << daqinterface_state_ << __E__;
1474  __SUP_COUT__ << "EnteringError DONE." << __E__;
1475 
1476 } // end enteringError()
1477 
1478 std::vector<SupervisorInfo::SubappInfo> ots::ARTDAQSupervisor::getSubappInfo(void)
1479 {
1480  auto apps = getAndParseProcessInfo_();
1481  std::vector<SupervisorInfo::SubappInfo> output;
1482  for(auto& app : apps)
1483  {
1485 
1486  info.name = app.label;
1487  info.detail = "Rank " + std::to_string(app.rank) + ", subsystem " +
1488  std::to_string(app.subsystem);
1489  info.lastStatusTime = time(0);
1490  info.progress = 100;
1491  info.status = artdaqStateToOtsState(app.state);
1492  info.url = "http://" + app.host + ":" + std::to_string(app.port) + "/RPC2";
1493  info.class_name = "ARTDAQ " + labelToProcType_(app.label);
1494 
1495  output.push_back(info);
1496  }
1497  return output;
1498 }
1499 
1500 //==============================================================================
1501 std::string ots::ARTDAQSupervisor::capturePyErr(std::string label /* = "" */)
1502 {
1503  PyErr_Print(); // dump the Python exception <-- this writes into stringIO_err, not the terminal
1504  if(label.size())
1505  label += ' '; //for nice printing
1506 
1507  PyObject* err_text = PyObject_CallMethod(stringIO_err, "getvalue", NULL);
1508  std::string err = "";
1509  if(!err_text)
1510  err = "Capture of " + label + "PyErr failed.";
1511  else
1512  err = "Capture of " + label + "PyErr: " + std::string(PyUnicode_AsUTF8(err_text));
1513 
1514  //clear buffer for reuse
1515  {
1516  PyObject* r1 = PyObject_CallMethod(stringIO_err, "seek", "i", 0);
1517  Py_XDECREF(r1);
1518  PyObject* r2 = PyObject_CallMethod(stringIO_err, "truncate", NULL);
1519  Py_XDECREF(r2);
1520  }
1521  return err;
1522 } //end captureStderr()
1523 
1524 //==============================================================================
1525 std::string ots::ARTDAQSupervisor::captureStderrAndStdout_(std::string label /* = "" */)
1526 {
1527  if(label.size())
1528  label += ' '; //for nice printing
1529 
1530  std::string outString = "";
1531  //------------- capture stdout and stderr
1532  PyObject* out_text = PyObject_CallMethod(stringIO_out, "getvalue", NULL);
1533  if(!out_text)
1534  PyErr_Print(); // dump the Python exception <-- this writes into stringIO_err, not the terminal
1535  else
1536  {
1537  const char* out_cstr = PyUnicode_AsUTF8(out_text);
1538  if(out_cstr && strlen(out_cstr))
1539  outString = "Captured " + label + "stdout:\n" +
1540  std::string(out_cstr ? out_cstr : "") + "\n";
1541  else
1542  outString = "Captured " + label + "stdout empty.\n";
1543  }
1544 
1545  std::string errString = "";
1546  PyObject* err_text = PyObject_CallMethod(stringIO_err, "getvalue", NULL);
1547  if(!err_text)
1548  __SUP_COUT__ << "Capture of " << label << "stderr failed.";
1549  else
1550  {
1551  const char* err_cstr = PyUnicode_AsUTF8(err_text);
1552  if(err_cstr && strlen(err_cstr))
1553  errString = "Captured " + label + "stderr:\n" +
1554  std::string(err_cstr ? err_cstr : "") + "\n";
1555  else
1556  errString = "Captured " + label + "stderr empty.\n";
1557  }
1558 
1559  //clear buffers for reuse
1560  {
1561  PyObject* r1 = PyObject_CallMethod(stringIO_out, "seek", "i", 0);
1562  Py_XDECREF(r1);
1563  PyObject* r2 = PyObject_CallMethod(stringIO_out, "truncate", NULL);
1564  Py_XDECREF(r2);
1565  }
1566  {
1567  PyObject* r1 = PyObject_CallMethod(stringIO_err, "seek", "i", 0);
1568  Py_XDECREF(r1);
1569  PyObject* r2 = PyObject_CallMethod(stringIO_err, "truncate", NULL);
1570  Py_XDECREF(r2);
1571  }
1572  //------------- end capture stdout and stderr
1573 
1574  return errString + outString;
1575 } //end captureStderrAndStdout_()
1576 
1577 //==============================================================================
1578 void ots::ARTDAQSupervisor::getDAQState_()
1579 {
1580  //__SUP_COUT__ << "Getting DAQInterface state" << __E__;
1581  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1582 
1583  if(daqinterface_ptr_ == nullptr)
1584  {
1585  daqinterface_state_ = "";
1586  return;
1587  }
1588 
1589  int tries = 0;
1590  while(tries < 5)
1591  {
1592  PyObject* pName = PyUnicode_FromString("state");
1593  PyObject* pArg = PyUnicode_FromString("DAQInterface");
1594  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1595 
1596  if(res == NULL)
1597  {
1598  std::string err = capturePyErr("getDAQState_");
1599  __SS__ << "Retry n " << tries
1600  << ". Error calling state function from getDAQState_() - here was the "
1601  "error: "
1602  << err << "\n\n"
1603  << StringMacros::stackTrace() << __E__;
1604  // __SUP_SS_THROW__;
1605  //do not throw, just mark state empty
1606  daqinterface_state_ = "";
1607  __COUT_ERR__ << ss.str();
1608  tries++;
1609  usleep(100000);
1610  continue;
1611  }
1612  daqinterface_state_ = std::string(PyUnicode_AsUTF8(res));
1613  __SUP_COUTS__(2) << "getDAQState_ state=" << daqinterface_state_ << __E__;
1614  break;
1615  }
1616 
1617 } // end getDAQState_()
1618 
1619 //==============================================================================
1620 std::string ots::ARTDAQSupervisor::getProcessInfo_(void)
1621 {
1622  //__SUP_COUT__ << "Getting DAQInterface state" << __E__;
1623  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1624 
1625  if(daqinterface_ptr_ == nullptr)
1626  {
1627  return "";
1628  }
1629 
1630  PyObject* pName = PyUnicode_FromString("artdaq_process_info");
1631  PyObject* pArg = PyUnicode_FromString("DAQInterface");
1632  PyObject* pArg2 = PyBool_FromLong(true);
1633  PyObject* res =
1634  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, pArg2, NULL);
1635 
1636  if(res == NULL)
1637  {
1638  std::string err = capturePyErr();
1639  __SS__ << "Error calling artdaq_process_info function: " << err << __E__;
1640  __SUP_SS_THROW__;
1641  return "";
1642  }
1643  return std::string(PyUnicode_AsUTF8(res));
1644  //__SUP_COUT__ << "getDAQState_ DONE: state=" << result << __E__;
1645 } // end getProcessInfo_()
1646 
1647 std::string ots::ARTDAQSupervisor::artdaqStateToOtsState(std::string state)
1648 {
1649  if(state == "nonexistant")
1650  return RunControlStateMachine::INITIAL_STATE_NAME;
1651  if(state == "Ready")
1652  return "Configured";
1653  if(state == "Running")
1654  return RunControlStateMachine::RUNNING_STATE_NAME;
1655  if(state == "Paused")
1656  return RunControlStateMachine::PAUSED_STATE_NAME;
1657  if(state == "Stopped")
1658  return RunControlStateMachine::HALTED_STATE_NAME;
1659 
1660  TLOG(TLVL_WARNING) << "Unrecognized state name " << state;
1661  return RunControlStateMachine::FAILED_STATE_NAME;
1662 }
1663 
1664 std::string ots::ARTDAQSupervisor::labelToProcType_(std::string label)
1665 {
1666  if(label_to_proc_type_map_.count(label))
1667  {
1668  return label_to_proc_type_map_[label];
1669  }
1670  return "UNKNOWN";
1671 }
1672 
1673 //==============================================================================
1674 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo>
1675 ots::ARTDAQSupervisor::getAndParseProcessInfo_()
1676 {
1677  std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> output;
1678  auto info = getProcessInfo_();
1679  auto procs = tokenize_(info);
1680 
1681  // 0: Whole string
1682  // 1: Process Label
1683  // 2: Process host
1684  // 3: Process port
1685  // 4: Process subsystem
1686  // 5: Process Rank
1687  // 6: Process state
1688  std::regex re("(.*?) at ([^:]*):(\\d+) \\(subsystem (\\d+), rank (\\d+)\\): (.*)");
1689 
1690  for(auto& proc : procs)
1691  {
1692  std::smatch match;
1693  if(std::regex_match(proc, match, re))
1694  {
1695  DAQInterfaceProcessInfo info;
1696 
1697  info.label = match[1];
1698  info.host = match[2];
1699  info.port = std::stoi(match[3]);
1700  info.subsystem = std::stoi(match[4]);
1701  info.rank = std::stoi(match[5]);
1702  info.state = match[6];
1703 
1704  output.push_back(info);
1705  }
1706  }
1707  return output;
1708 } // end getAndParseProcessInfo_()
1709 
1710 //==============================================================================
1712  std::unique_ptr<artdaq::CommanderInterface>>>
1713 ots::ARTDAQSupervisor::makeCommandersFromProcessInfo()
1714 {
1715  std::list<
1716  std::pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>>
1717  output;
1718  auto infos = getAndParseProcessInfo_();
1719 
1720  for(auto& info : infos)
1721  {
1722  artdaq::Commandable cm;
1723  fhicl::ParameterSet ps;
1724 
1725  ps.put<std::string>("commanderPluginType", "xmlrpc");
1726  ps.put<int>("id", info.port);
1727  ps.put<std::string>("server_url", info.host);
1728 
1729  output.emplace_back(std::make_pair<DAQInterfaceProcessInfo,
1730  std::unique_ptr<artdaq::CommanderInterface>>(
1731  std::move(info), artdaq::MakeCommanderPlugin(ps, cm)));
1732  }
1733 
1734  return output;
1735 } // end makeCommandersFromProcessInfo()
1736 
1737 //==============================================================================
1738 std::list<std::string> ots::ARTDAQSupervisor::tokenize_(std::string const& input)
1739 {
1740  size_t pos = 0;
1741  std::list<std::string> output;
1742 
1743  while(pos != std::string::npos && pos < input.size())
1744  {
1745  auto newpos = input.find('\n', pos);
1746  if(newpos != std::string::npos)
1747  {
1748  output.emplace_back(input, pos, newpos - pos);
1749  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1750  pos = newpos + 1;
1751  }
1752  else
1753  {
1754  output.emplace_back(input, pos);
1755  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1756  pos = newpos;
1757  }
1758  }
1759  return output;
1760 } // end tokenize_()
1761 
1762 //==============================================================================
1763 void ots::ARTDAQSupervisor::daqinterfaceRunner_()
1764 {
1765  TLOG(TLVL_TRACE) << "Runner thread starting";
1766  runner_running_ = true;
1767  while(runner_running_)
1768  {
1769  if(daqinterface_ptr_ != NULL)
1770  {
1771  std::unique_lock<std::recursive_mutex> lk(daqinterface_mutex_);
1772  getDAQState_();
1773  std::string state_before = daqinterface_state_;
1774 
1775  if(daqinterface_state_ == "running" || daqinterface_state_ == "ready" ||
1776  daqinterface_state_ == "booted")
1777  {
1778  try
1779  {
1780  TLOG(TLVL_TRACE) << "Calling DAQInterface::check_proc_heartbeats";
1781  PyObject* pName = PyUnicode_FromString("check_proc_heartbeats");
1782  PyObject* res =
1783  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1784  __COUT_MULTI_LBL__(1,
1785  captureStderrAndStdout_("check_proc_heartbeats"),
1786  "check_proc_heartbeats");
1787  TLOG(TLVL_TRACE)
1788  << "Done with DAQInterface::check_proc_heartbeats call";
1789 
1790  if(res == NULL)
1791  {
1792  runner_running_ = false;
1793  std::string err = capturePyErr("check_proc_heartbeats");
1794  __SS__ << "Error calling check_proc_heartbeats function: " << err
1795  << __E__;
1796  __SUP_SS_THROW__;
1797  break;
1798  }
1799  }
1800  catch(cet::exception& ex)
1801  {
1802  runner_running_ = false;
1803  std::string err = capturePyErr("check_proc_heartbeats");
1804  __SS__ << "An cet::exception occurred while calling "
1805  "check_proc_heartbeats function "
1806  << ex.explain_self() << ": " << err << __E__;
1807  __SUP_SS_THROW__;
1808  break;
1809  }
1810  catch(std::exception& ex)
1811  {
1812  runner_running_ = false;
1813  std::string err = capturePyErr("check_proc_heartbeats");
1814  __SS__ << "An std::exception occurred while calling "
1815  "check_proc_heartbeats function: "
1816  << ex.what() << "\n\n"
1817  << err << __E__;
1818  __SUP_SS_THROW__;
1819  break;
1820  }
1821  catch(...)
1822  {
1823  runner_running_ = false;
1824  std::string err = capturePyErr("check_proc_heartbeats");
1825  __SS__ << "An unknown Error occurred while calling "
1826  "check_proc_heartbeats function: "
1827  << err << __E__;
1828  __SUP_SS_THROW__;
1829  break;
1830  }
1831 
1832  lk.unlock();
1833  getDAQState_();
1834  if(daqinterface_state_ != state_before)
1835  {
1836  runner_running_ = false;
1837  lk.unlock();
1838  __SS__ << "DAQInterface state unexpectedly changed from "
1839  << state_before << " to " << daqinterface_state_
1840  << ". Check supervisor log file for more info!" << __E__;
1841  __SUP_SS_THROW__;
1842  break;
1843  }
1844  }
1845  }
1846  else
1847  {
1848  break;
1849  }
1850  usleep(1000000);
1851  }
1852  runner_running_ = false;
1853  TLOG(TLVL_TRACE) << "Runner thread complete";
1854 } // end daqinterfaceRunner_()
1855 
1856 //==============================================================================
1857 void ots::ARTDAQSupervisor::stop_runner_()
1858 {
1859  runner_running_ = false;
1860  if(runner_thread_ && runner_thread_->joinable())
1861  {
1862  runner_thread_->join();
1863  runner_thread_.reset(nullptr);
1864  }
1865 } // end stop_runner_()
1866 
1867 //==============================================================================
1868 void ots::ARTDAQSupervisor::start_runner_()
1869 {
1870  stop_runner_();
1871  runner_thread_ =
1872  std::make_unique<std::thread>(&ots::ARTDAQSupervisor::daqinterfaceRunner_, this);
1873 } // end start_runner_()
virtual void transitionHalting(toolbox::Event::Reference event) override
virtual void transitionInitializing(toolbox::Event::Reference event) override
void loadTableGroup(const std::string &tableGroupName, const TableGroupKey &tableGroupKey, bool doActivate=false, std::map< std::string, TableVersion > *groupMembers=0, ProgressBar *progressBar=0, std::string *accumulateWarnings=0, std::string *groupComment=0, std::string *groupAuthor=0, std::string *groupCreateTime=0, bool doNotLoadMember=false, std::string *groupTypeString=0, std::map< std::string, std::string > *groupAliases=0, ConfigurationManager::LoadGroupType groupTypeToLoad=ConfigurationManager::LoadGroupType::ALL_TYPES, bool ignoreVersionTracking=false)
ConfigurationTree getNode(const std::string &nodeString, bool doNotThrowOnBrokenUIDLinks=false) const
"root/parent/parent/"
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
const std::string & getValueAsString(bool returnLinkTableValue=false) const
void getValue(T &value) const
ITRACEController * theTRACEController_
only define for an app that receives a command
bool isComplete()
get functions
Definition: ProgressBar.cc:88
void step()
thread safe
Definition: ProgressBar.cc:74
int read()
if stepsToComplete==0, then define any progress as 50%, thread safe
Definition: ProgressBar.cc:120
void complete()
declare complete, thread safe
Definition: ProgressBar.cc:95
void INIT_MF(const char *name)
static std::string stackTrace(void)
std::string name
Also key in map.