otsdaq  3.03.00
ARTDAQSupervisor.cc
1 
2 
3 #define TRACEMF_USE_VERBATIM 1 // for trace longer path filenames
4 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisor.hh"
5 
6 #include "artdaq-core/Utilities/configureMessageFacility.hh"
7 #include "artdaq/BuildInfo/GetPackageBuildInfo.hh"
8 #include "artdaq/DAQdata/Globals.hh"
9 #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
10 #include "cetlib_except/exception.h"
11 #include "fhiclcpp/make_ParameterSet.h"
12 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisorTRACEController.h"
13 
14 #include "artdaq-core/Utilities/ExceptionHandler.hh" /*for artdaq::ExceptionHandler*/
15 
16 #include <boost/exception/all.hpp>
17 #include <boost/filesystem.hpp>
18 
19 #include <signal.h>
20 #include <regex>
21 
22 #define OUT_ON_ERR_SIZE 2000 //tail size of output to include on error
23 
24 using namespace ots;
25 
26 XDAQ_INSTANTIATOR_IMPL(ARTDAQSupervisor)
27 
28 #define FAKE_CONFIG_NAME "ots_config"
29 #define DAQINTERFACE_PORT \
30  std::atoi(__ENV__("ARTDAQ_BASE_PORT")) + \
31  (partition_ * std::atoi(__ENV__("ARTDAQ_PORTS_PER_PARTITION")))
32 
33 static ARTDAQSupervisor* instance = nullptr;
34 static std::unordered_map<int, struct sigaction> old_actions =
35  std::unordered_map<int, struct sigaction>();
36 static bool sighandler_init = false;
37 static void signal_handler(int signum)
38 {
39  // Messagefacility may already be gone at this point, TRACE ONLY!
40 #if TRACE_REVNUM < 1459
41  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
42 #else
43  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
44 #endif
45  << "A signal of type " << signum
46  << " was caught by ARTDAQSupervisor. Shutting down DAQInterface, "
47  "then proceeding with default handlers!";
48 
49  if(instance)
50  instance->destroy();
51 
52  sigset_t set;
53  pthread_sigmask(SIG_UNBLOCK, NULL, &set);
54  pthread_sigmask(SIG_UNBLOCK, &set, NULL);
55 
56 #if TRACE_REVNUM < 1459
57  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
58 #else
59  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
60 #endif
61  << "Calling default signal handler";
62  if(signum != SIGUSR2)
63  {
64  sigaction(signum, &old_actions[signum], NULL);
65  kill(getpid(), signum); // Only send signal to self
66  }
67  else
68  {
69  // Send Interrupt signal if parsing SIGUSR2 (i.e. user-defined exception that
70  // should tear down ARTDAQ)
71  sigaction(SIGINT, &old_actions[SIGINT], NULL);
72  kill(getpid(), SIGINT); // Only send signal to self
73  }
74 }
75 
76 static void init_sighandler(ARTDAQSupervisor* inst)
77 {
78  static std::mutex sighandler_mutex;
79  std::unique_lock<std::mutex> lk(sighandler_mutex);
80 
81  if(!sighandler_init)
82  {
83  sighandler_init = true;
84  instance = inst;
85  std::vector<int> signals = {
86  SIGINT,
87  SIGILL,
88  SIGABRT,
89  SIGFPE,
90  SIGSEGV,
91  SIGPIPE,
92  SIGALRM,
93  SIGTERM,
94  SIGUSR2,
95  SIGHUP}; // SIGQUIT is used by art in normal operation
96  for(auto signal : signals)
97  {
98  struct sigaction old_action;
99  sigaction(signal, NULL, &old_action);
100 
101  // If the old handler wasn't SIG_IGN (it's a handler that just
102  // "ignore" the signal)
103  if(old_action.sa_handler != SIG_IGN)
104  {
105  struct sigaction action;
106  action.sa_handler = signal_handler;
107  sigemptyset(&action.sa_mask);
108  for(auto sigblk : signals)
109  {
110  sigaddset(&action.sa_mask, sigblk);
111  }
112  action.sa_flags = 0;
113 
114  // Replace the signal handler of SIGINT with the one described by
115  // new_action
116  sigaction(signal, &action, NULL);
117  old_actions[signal] = old_action;
118  }
119  }
120  }
121 }
122 
123 //==============================================================================
124 ARTDAQSupervisor::ARTDAQSupervisor(xdaq::ApplicationStub* stub)
125  : CoreSupervisorBase(stub)
126  , daqinterface_ptr_(NULL)
127  , partition_(getSupervisorProperty("partition", 0))
128  , daqinterface_state_("notrunning")
129  , runner_thread_(nullptr)
130 {
131  __SUP_COUT__ << "Constructor." << __E__;
132 
133  INIT_MF("." /*directory used is USER_DATA/LOG/.*/);
134  init_sighandler(this);
135 
136  // Only use system Python
137  // unsetenv("PYTHONPATH");
138  // unsetenv("PYTHONHOME");
139 
140  // Write out settings file
141  auto settings_file = __ENV__("DAQINTERFACE_SETTINGS");
142  std::ofstream o(settings_file, std::ios::trunc);
143 
144  setenv("DAQINTERFACE_PARTITION_NUMBER", std::to_string(partition_).c_str(), 1);
145  auto logfileName = std::string(__ENV__("OTSDAQ_LOG_DIR")) +
146  "/DAQInteface/DAQInterface_partition" +
147  std::to_string(partition_) + ".log";
148  setenv("DAQINTERFACE_LOGFILE", logfileName.c_str(), 1);
149 
150  o << "log_directory: "
151  << getSupervisorProperty("log_directory", std::string(__ENV__("OTSDAQ_LOG_DIR")))
152  << std::endl;
153 
154  {
155  const std::string record_directory = getSupervisorProperty(
156  "record_directory", ARTDAQTableBase::ARTDAQ_FCL_PATH + "/run_records/");
157  mkdir(record_directory.c_str(), 0755);
158  o << "record_directory: " << record_directory << std::endl;
159  }
160 
161  o << "package_hashes_to_save: "
162  << getSupervisorProperty("package_hashes_to_save", "[artdaq]") << std::endl;
163 
164  o << "spack_root_for_bash_scripts: "
165  << getSupervisorProperty("spack_root_for_bash_scripts",
166  std::string(__ENV__("SPACK_ROOT")))
167  << std::endl;
168  o << "boardreader timeout: " << getSupervisorProperty("boardreader_timeout", 30)
169  << std::endl;
170  o << "eventbuilder timeout: " << getSupervisorProperty("eventbuilder_timeout", 30)
171  << std::endl;
172  o << "datalogger timeout: " << getSupervisorProperty("datalogger_timeout", 30)
173  << std::endl;
174  o << "dispatcher timeout: " << getSupervisorProperty("dispatcher_timeout", 30)
175  << std::endl;
176  // Only put max_fragment_size_bytes into DAQInterface settings file if advanced_memory_usage is disabled
177  if(!getSupervisorProperty("advanced_memory_usage", false))
178  {
179  o << "max_fragment_size_bytes: "
180  << getSupervisorProperty("max_fragment_size_bytes", 1048576) << std::endl;
181  }
182  o << "transfer_plugin_to_use: "
183  << getSupervisorProperty("transfer_plugin_to_use", "TCPSocket") << std::endl;
184  if(getSupervisorProperty("transfer_plugin_from_brs", "") != "")
185  {
186  o << "transfer_plugin_from_brs: "
187  << getSupervisorProperty("transfer_plugin_from_brs", "") << std::endl;
188  }
189  if(getSupervisorProperty("transfer_plugin_from_ebs", "") != "")
190  {
191  o << "transfer_plugin_from_ebs: "
192  << getSupervisorProperty("transfer_plugin_from_ebs", "") << std::endl;
193  }
194  if(getSupervisorProperty("transfer_plugin_from_dls", "") != "")
195  {
196  o << "transfer_plugin_from_dls: "
197  << getSupervisorProperty("transfer_plugin_from_dls", "") << std::endl;
198  }
199  o << "all_events_to_all_dispatchers: " << std::boolalpha
200  << getSupervisorProperty("all_events_to_all_dispatchers", true) << std::endl;
201  if(getSupervisorProperty("data_directory_override", "") != "")
202  {
203  o << "data_directory_override: "
204  << getSupervisorProperty("data_directory_override", "") << std::endl;
205  }
206  o << "max_configurations_to_list: "
207  << getSupervisorProperty("max_configurations_to_list", 10) << std::endl;
208  o << "disable_unique_rootfile_labels: "
209  << getSupervisorProperty("disable_unique_rootfile_labels", false) << std::endl;
210  o << "use_messageviewer: " << std::boolalpha
211  << getSupervisorProperty("use_messageviewer", false) << std::endl;
212  o << "use_messagefacility: " << std::boolalpha
213  << getSupervisorProperty("use_messagefacility", true) << std::endl;
214  o << "fake_messagefacility: " << std::boolalpha
215  << getSupervisorProperty("fake_messagefacility", false) << std::endl;
216  o << "kill_existing_processes: " << std::boolalpha
217  << getSupervisorProperty("kill_existing_processes", true) << std::endl;
218  o << "advanced_memory_usage: " << std::boolalpha
219  << getSupervisorProperty("advanced_memory_usage", false) << std::endl;
220  o << "strict_fragment_id_mode: " << std::boolalpha
221  << getSupervisorProperty("strict_fragment_id_mode", false) << std::endl;
222  o << "disable_private_network_bookkeeping: " << std::boolalpha
223  << getSupervisorProperty("disable_private_network_bookkeeping", false) << std::endl;
224  o << "allowed_processors: " << getSupervisorProperty("allowed_processors", "0-255")
225  << std::
226  endl; // Note this sets a taskset for ALL processes, on all nodes (ex. "1,2,5-7")
227 
228  o.close();
229 
230  // destroy current TRACEController and instantiate ARTDAQSupervisorTRACEController
231  if(CorePropertySupervisorBase::theTRACEController_)
232  {
233  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
234  delete CorePropertySupervisorBase::
235  theTRACEController_; // destruct current TRACEController
236  CorePropertySupervisorBase::theTRACEController_ = nullptr;
237  }
238  CorePropertySupervisorBase::theTRACEController_ =
240  ((ARTDAQSupervisorTRACEController*)CorePropertySupervisorBase::theTRACEController_)
241  ->setSupervisorPtr(this);
242 
243  __SUP_COUT__ << "Constructed." << __E__;
244 } // end constructor()
245 
246 //==============================================================================
247 ARTDAQSupervisor::~ARTDAQSupervisor(void)
248 {
249  __SUP_COUT__ << "Destructor." << __E__;
250  destroy();
251  __SUP_COUT__ << "Destructed." << __E__;
252 } // end destructor()
253 
254 //==============================================================================
255 void ARTDAQSupervisor::destroy(void)
256 {
257  __SUP_COUT__ << "Destroying..." << __E__;
258 
259  if(daqinterface_ptr_ != NULL)
260  {
261  __SUP_COUT__ << "Calling recover transition" << __E__;
262  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
263  PyObject* pName = PyUnicode_FromString("do_recover");
264  /*PyObject* res =*/PyObject_CallMethodObjArgs(
265  daqinterface_ptr_, pName, NULL);
266 
267  __SUP_COUT__ << "Making sure that correct state has been reached" << __E__;
268  getDAQState_();
269  while(daqinterface_state_ != "stopped")
270  {
271  getDAQState_();
272  __SUP_COUT__ << "State is " << daqinterface_state_
273  << ", waiting 1s and retrying..." << __E__;
274  usleep(1000000);
275  }
276 
277  // Cleanup
278  Py_XDECREF(daqinterface_ptr_);
279  // Py_XDECREF(pStateArgs2);
280  // Py_XDECREF(out_text);
281  // Py_XDECREF(err_text);
282  // Py_XDECREF(sys_stdout);
283  // Py_XDECREF(sys_stderr);
284  // Py_XDECREF(stringIO_out);
285  // Py_XDECREF(stringIO_err);
286  // Py_XDECREF(io);
287  // Py_XDECREF(sys);
288  daqinterface_ptr_ = NULL;
289  }
290 
291  __SUP_COUT__ << "Flusing printouts" << __E__;
292 
293  //make sure to flush printouts
294  PyRun_SimpleString(R"(
295 import sys
296 sys.stdout = sys.__stdout__
297 sys.stderr = sys.__stderr__
298 )");
299  Py_XDECREF(stringIO_out);
300  Py_XDECREF(stringIO_err);
301 
302  __SUP_COUT__ << "Thread and garbage cleanup" << __E__;
303  //force python thread cleanup:
304  PyRun_SimpleString(
305  "import threading; [t.join() for t in threading.enumerate() if t is not "
306  "threading.main_thread()]");
307  PyRun_SimpleString("import gc; gc.collect()");
308  Py_Finalize();
309 
310  // CorePropertySupervisorBase would destroy, but since it was created here, attempt to destroy
312  {
313  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
316  }
317 
318  __SUP_COUT__ << "Destroyed." << __E__;
319 } // end destroy()
320 
321 //==============================================================================
322 void ARTDAQSupervisor::init(void)
323 {
324  stop_runner_();
325 
326  __SUP_COUT__ << "Initializing..." << __E__;
327  {
328  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
329 
330  // allSupervisorInfo_.init(getApplicationContext());
331  artdaq::configureMessageFacility("ARTDAQSupervisor");
332  __SUP_COUT__ << "artdaq MF configured." << __E__;
333 
334  // initialization
335  char* daqinterface_dir = getenv("ARTDAQ_DAQINTERFACE_DIR");
336  if(daqinterface_dir == NULL)
337  {
338  __SS__ << "ARTDAQ_DAQINTERFACE_DIR environment variable not set! This "
339  "means that DAQInterface has not been setup!"
340  << __E__;
341  __SUP_SS_THROW__;
342  }
343  else
344  {
345  __SUP_COUT__ << "Initializing Python" << __E__;
346  Py_Initialize();
347 
348  //setup Python output to tee output to stdout/err and to StringIO buffer "tee_buffer"
349  PyRun_SimpleString(
350  "import sys\n"
351  "from io import StringIO\n"
352  "\n"
353  "class TeeOut:\n"
354  " def __init__(self, real, buf):\n"
355  " self.real = real\n"
356  " self.buf = buf\n"
357  " def write(self, data):\n"
358  " self.real.write(data)\n"
359  " self.buf.write(data)\n"
360  " def flush(self):\n"
361  " self.real.flush()\n"
362  " self.buf.flush()\n"
363  "\n"
364  "tee_buffer = StringIO()\n"
365  "sys.stdout = TeeOut(sys.stdout, tee_buffer)\n"
366  "sys.stderr = TeeOut(sys.stderr, tee_buffer)\n");
367 
368  __SUP_COUT__ << "Adding DAQInterface directory to PYTHON_PATH" << __E__;
369  PyObject* sysPath = PySys_GetObject((char*)"path");
370  PyObject* programName = PyUnicode_FromString(daqinterface_dir);
371  PyList_Append(sysPath, programName);
372  Py_DECREF(programName);
373 
374  __SUP_COUT__ << "Creating Module name" << __E__;
375  PyObject* pName = PyUnicode_FromString("rc.control.daqinterface");
376  /* Error checking of pName left out */
377 
378  __SUP_COUT__ << "Importing module" << __E__;
379  PyObject* pModule = PyImport_Import(pName);
380  Py_DECREF(pName);
381 
382  if(pModule == NULL)
383  {
384  PyErr_Print();
385  __SS__ << "Failed to load rc.control.daqinterface" << __E__;
386  __SUP_SS_THROW__;
387  }
388  else
389  {
390  __SUP_COUT__ << "Loading python module dictionary" << __E__;
391  PyObject* pDict = PyModule_GetDict(pModule);
392  if(pDict == NULL)
393  {
394  PyErr_Print();
395  __SS__ << "Unable to load module dictionary" << __E__;
396  __SUP_SS_THROW__;
397  }
398  else
399  {
400  Py_DECREF(pModule);
401 
402  __SUP_COUT__ << "Getting DAQInterface object pointer" << __E__;
403  PyObject* di_obj_ptr = PyDict_GetItemString(pDict, "DAQInterface");
404 
405  __SUP_COUT__ << "Filling out DAQInterface args struct" << __E__;
406  PyObject* pArgs = PyTuple_New(0);
407 
408  PyObject* kwargs = Py_BuildValue("{s:s, s:s, s:i, s:i, s:s, s:s}",
409  "logpath",
410  ".daqint.log",
411  "name",
412  "DAQInterface",
413  "partition_number",
414  partition_,
415  "rpc_port",
416  DAQINTERFACE_PORT,
417  "rpc_host",
418  "localhost",
419  "control_host",
420  "localhost");
421 
422  __SUP_COUT__ << "Calling DAQInterface Object Constructor" << __E__;
423 
424  // Get sys and io
425  PyObject* sys = PyImport_ImportModule("sys");
426  PyObject* io = PyImport_ImportModule("io");
427 
428  if(0)
429  {
430  //------------- redirect stdout to string
431 
432  // Create StringIO objects for stdout and stderr
433  stringIO_out = PyObject_CallMethod(io, "StringIO", NULL);
434  stringIO_err = PyObject_CallMethod(io, "StringIO", NULL);
435 
436  // Save originals (not needed, since just keep the redirection until daqinterface_ptr_ is destructed)
437  // PyObject* sys_stdout = PyObject_GetAttrString(sys, "stdout");
438  // PyObject* sys_stderr = PyObject_GetAttrString(sys, "stderr");
439 
440  // Redirect
441  PyObject_SetAttrString(sys, "stdout", stringIO_out);
442  PyObject_SetAttrString(sys, "stderr", stringIO_err);
443  //------------- end redirect stdout to string
444  }
445  else //capture tee buffer instead so output to console continues
446  {
447  PyObject* mainmod =
448  PyImport_AddModule("__main__"); // borrowed ref
449  PyObject* globals = PyModule_GetDict(mainmod); // borrowed ref
450 
451  stringIO_out =
452  PyDict_GetItemString(globals, "tee_buffer"); // borrowed
453  // Do not Py_DECREF borrowed references.
454  }
455 
456  daqinterface_ptr_ = PyObject_Call(di_obj_ptr, pArgs, kwargs);
457 
458  if(0) //example printout handling
459  {
460  // Force an error
461  PyObject* bad = PyObject_CallMethod(sys, "does_not_exist", NULL);
462  if(!bad)
463  PyErr_Print(); // <-- this writes into stringIO_err, not the terminal
464 
465  // Grab stderr contents
466  PyObject* err_text =
467  PyObject_CallMethod(stringIO_err, "getvalue", NULL);
468  if(err_text)
469  __COUT__ << "Captured stderr:\n"
470  << PyUnicode_AsUTF8(err_text) << "\n";
471  else
472  __COUT__ << "Capture of stderr failed.";
473  } //end example printout handling
474 
475  // Cleanup
476  Py_DECREF(di_obj_ptr);
477  Py_XDECREF(sys);
478  Py_XDECREF(io);
479  }
480  }
481  }
482 
483  getDAQState_();
484 
485  // { //attempt to cleanup old artdaq processes DOES NOT WORK because artdaq interface knows it hasn't started
486  // __SUP_COUT__ << "Attempting artdaq stale cleanup..." << __E__;
487  // std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
488  // getDAQState_();
489  // __SUP_COUT__ << "Status before cleanup: " << daqinterface_state_ << __E__;
490 
491  // PyObject* pName = PyUnicode_FromString("do_recover");
492  // PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
493  // __COUT_MULTI_LBL__(0,captureStderrAndStdout_("do_recover"),"do_recover");
494 
495  // if(res == NULL)
496  // {
497  // std::string err = capturePyErr("do_recover");
498  // __SS__ << "Error with clean up calling do_recover: " << err << __E__;
499  // __SUP_SS_THROW__;
500  // }
501  // getDAQState_();
502  // __SUP_COUT__ << "Status after cleanup: " << daqinterface_state_ << __E__;
503  // __SUP_COUT__ << "cleanup DONE." << __E__;
504  // }
505  }
506  start_runner_();
507  __SUP_COUT__ << "Initialized." << __E__;
508 } // end init()
509 
510 //==============================================================================
511 void ARTDAQSupervisor::transitionConfiguring(toolbox::Event::Reference /*event*/)
512 {
513  __SUP_COUT__ << "transitionConfiguring" << __E__;
514 
515  // activate the configuration tree (the first iteration)
516  if(RunControlStateMachine::getIterationIndex() == 0 &&
517  RunControlStateMachine::getSubIterationIndex() == 0)
518  {
519  thread_error_message_ = "";
520  thread_progress_bar_.resetProgressBar(0);
521  last_thread_progress_update_ = time(0); // initialize timeout timer
522 
523  std::pair<std::string /*group name*/, TableGroupKey> theGroup(
524  SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
525  .getParameters()
526  .getValue("ConfigurationTableGroupName"),
527  TableGroupKey(SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
528  .getParameters()
529  .getValue("ConfigurationTableGroupKey")));
530 
531  __SUP_COUT__ << "Configuration table group name: " << theGroup.first
532  << " key: " << theGroup.second << __E__;
533 
534  try
535  {
536  // disable version tracking to accept untracked versions to be selected by the FSM transition source
537  theConfigurationManager_->loadTableGroup(
538  theGroup.first,
539  theGroup.second,
540  true /*doActivate*/,
541  0,
542  0,
543  0,
544  0,
545  0,
546  0,
547  false,
548  0,
549  0,
550  ConfigurationManager::LoadGroupType::ALL_TYPES,
551  true /*ignoreVersionTracking*/);
552  }
553  catch(const std::runtime_error& e)
554  {
555  __SS__ << "Error loading table group '" << theGroup.first << "("
556  << theGroup.second << ")! \n"
557  << e.what() << __E__;
558  __SUP_COUT_ERR__ << ss.str();
559  // ExceptionHandler(ExceptionHandlerRethrow::no, ss.str());
560 
561  //__SS_THROW_ONLY__;
562  theStateMachine_.setErrorMessage(ss.str());
563  throw toolbox::fsm::exception::Exception(
564  "Transition Error" /*name*/,
565  ss.str() /* message*/,
566  "ARTDAQSupervisor::transitionConfiguring" /*module*/,
567  __LINE__ /*line*/,
568  __FUNCTION__ /*function*/
569  );
570  }
571  catch(...)
572  {
573  __SS__ << "Unknown error loading table group '" << theGroup.first << "("
574  << theGroup.second << ")!" << __E__;
575  __SUP_COUT_ERR__ << ss.str();
576  // ExceptionHandler(ExceptionHandlerRethrow::no, ss.str());
577 
578  //__SS_THROW_ONLY__;
579  theStateMachine_.setErrorMessage(ss.str());
580  throw toolbox::fsm::exception::Exception(
581  "Transition Error" /*name*/,
582  ss.str() /* message*/,
583  "ARTDAQSupervisor::transitionConfiguring" /*module*/,
584  __LINE__ /*line*/,
585  __FUNCTION__ /*function*/
586  );
587  }
588 
589  // start configuring thread
590  std::thread(&ARTDAQSupervisor::configuringThread, this).detach();
591 
592  __SUP_COUT__ << "Configuring thread started." << __E__;
593 
594  RunControlStateMachine::
595  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
596  }
597  else // not first time
598  {
599  std::string errorMessage;
600  {
601  std::lock_guard<std::mutex> lock(
602  thread_mutex_); // lock out for remainder of scope
603  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
604  }
605  int progress = thread_progress_bar_.read();
606  __SUP_COUTVS__(2, errorMessage);
607  __SUP_COUTVS__(2, progress);
608  __SUP_COUTVS__(2, thread_progress_bar_.isComplete());
609 
610  // check for done and error messages
611  if(errorMessage == "" && // if no update in 600 seconds, give up
612  time(0) - last_thread_progress_update_ > 600)
613  {
614  __SUP_SS__ << "There has been no update from the configuration thread for "
615  << (time(0) - last_thread_progress_update_)
616  << " seconds, assuming something is wrong and giving up! "
617  << "Last progress received was " << progress << __E__;
618  errorMessage = ss.str();
619  }
620 
621  if(errorMessage != "")
622  {
623  __SUP_SS__ << "Error was caught in configuring thread: " << errorMessage
624  << __E__;
625  __SUP_COUT_ERR__ << "\n" << ss.str();
626 
627  theStateMachine_.setErrorMessage(ss.str());
628  throw toolbox::fsm::exception::Exception(
629  "Transition Error" /*name*/,
630  ss.str() /* message*/,
631  "CoreSupervisorBase::transitionConfiguring" /*module*/,
632  __LINE__ /*line*/,
633  __FUNCTION__ /*function*/
634  );
635  }
636 
637  if(!thread_progress_bar_.isComplete())
638  {
639  __SUP_COUT__ << "Not done yet..." << __E__;
640  //attempt to get live view of python output
641  // __COUT_MULTI_LBL__(0, captureStderrAndStdout_("statuscheck"), "statuscheck");
642 
643  RunControlStateMachine::
644  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
645 
646  if(last_thread_progress_read_ != progress)
647  {
648  last_thread_progress_read_ = progress;
649  last_thread_progress_update_ = time(0);
650  }
651 
652  sleep(1 /*seconds*/);
653  }
654  else
655  {
656  __SUP_COUT_INFO__ << "Complete configuring transition!" << __E__;
657  __SUP_COUTV__(getProcessInfo_());
658  }
659  }
660 
661  return;
662 } // end transitionConfiguring()
663 
664 //==============================================================================
665 void ARTDAQSupervisor::configuringThread()
666 try
667 {
668  std::string uid = theConfigurationManager_
669  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
670  "/" + CorePropertySupervisorBase::getSupervisorUID() +
671  "/" + "LinkToSupervisorTable")
672  .getValueAsString();
673 
674  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
675 
676  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
677 
678  ConfigurationTree theSupervisorNode = getSupervisorTableNode();
679 
680  thread_progress_bar_.step();
681 
682  set_thread_message_("ConfigGen");
683 
684  auto info = ARTDAQTableBase::extractARTDAQInfo(
685  theSupervisorNode,
686  false /*getStatusFalseNodes*/,
687  true /*doWriteFHiCL*/,
688  getSupervisorProperty("max_fragment_size_bytes", 8888),
689  getSupervisorProperty("routing_timeout_ms", 1999),
690  getSupervisorProperty("routing_retry_count", 12),
691  &thread_progress_bar_);
692 
693  // Check lists
694  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::BoardReader) == 0)
695  {
696  __GEN_SS__ << "There must be at least one enabled BoardReader!" << __E__;
697  __GEN_SS_THROW__;
698  return;
699  }
700  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::EventBuilder) == 0)
701  {
702  __GEN_SS__ << "There must be at least one enabled EventBuilder!" << __E__;
703  __GEN_SS_THROW__;
704  return;
705  }
706 
707  thread_progress_bar_.step();
708  set_thread_message_("Writing boot.txt");
709 
710  __GEN_COUT__ << "Writing boot.txt" << __E__;
711 
712  int debugLevel = theSupervisorNode.getNode("DAQInterfaceDebugLevel").getValue<int>();
713  std::string setupScript = theSupervisorNode.getNode("DAQSetupScript").getValue();
714 
715  std::ofstream o(ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt", std::ios::trunc);
716  o << "DAQ setup script: " << setupScript << std::endl;
717  o << "debug level: " << debugLevel << std::endl;
718  o << std::endl;
719 
720  if(info.subsystems.size() > 1)
721  {
722  for(auto& ss : info.subsystems)
723  {
724  if(ss.first == 0)
725  continue;
726  o << "Subsystem id: " << ss.first << std::endl;
727  if(ss.second.destination != 0)
728  {
729  o << "Subsystem destination: " << ss.second.destination << std::endl;
730  }
731  for(auto& sss : ss.second.sources)
732  {
733  o << "Subsystem source: " << sss << std::endl;
734  }
735  if(ss.second.eventMode)
736  {
737  o << "Subsystem fragmentMode: False" << std::endl;
738  }
739  o << std::endl;
740  }
741  }
742 
743  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
744  {
745  o << "EventBuilder host: " << builder.hostname << std::endl;
746  o << "EventBuilder label: " << builder.label << std::endl;
747  label_to_proc_type_map_[builder.label] = "EventBuilder";
748  if(builder.subsystem != 1)
749  {
750  o << "EventBuilder subsystem: " << builder.subsystem << std::endl;
751  }
752  if(builder.allowed_processors != "")
753  {
754  o << "EventBuilder allowed_processors: " << builder.allowed_processors
755  << std::endl;
756  }
757  o << std::endl;
758  }
759  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
760  {
761  o << "DataLogger host: " << logger.hostname << std::endl;
762  o << "DataLogger label: " << logger.label << std::endl;
763  label_to_proc_type_map_[logger.label] = "DataLogger";
764  if(logger.subsystem != 1)
765  {
766  o << "DataLogger subsystem: " << logger.subsystem << std::endl;
767  }
768  if(logger.allowed_processors != "")
769  {
770  o << "DataLogger allowed_processors: " << logger.allowed_processors
771  << std::endl;
772  }
773  o << std::endl;
774  }
775  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
776  {
777  o << "Dispatcher host: " << dispatcher.hostname << std::endl;
778  o << "Dispatcher label: " << dispatcher.label << std::endl;
779  o << "Dispatcher port: " << dispatcher.port << std::endl;
780  label_to_proc_type_map_[dispatcher.label] = "Dispatcher";
781  if(dispatcher.subsystem != 1)
782  {
783  o << "Dispatcher subsystem: " << dispatcher.subsystem << std::endl;
784  }
785  if(dispatcher.allowed_processors != "")
786  {
787  o << "Dispatcher allowed_processors: " << dispatcher.allowed_processors
788  << std::endl;
789  }
790  o << std::endl;
791  }
792  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
793  {
794  o << "RoutingManager host: " << rmanager.hostname << std::endl;
795  o << "RoutingManager label: " << rmanager.label << std::endl;
796  label_to_proc_type_map_[rmanager.label] = "RoutingManager";
797  if(rmanager.subsystem != 1)
798  {
799  o << "RoutingManager subsystem: " << rmanager.subsystem << std::endl;
800  }
801  if(rmanager.allowed_processors != "")
802  {
803  o << "RoutingManager allowed_processors: " << rmanager.allowed_processors
804  << std::endl;
805  }
806  o << std::endl;
807  }
808  o.close();
809 
810  thread_progress_bar_.step();
811  set_thread_message_("Writing Fhicl Files");
812 
813  __GEN_COUT__ << "Building configuration directory" << __E__;
814 
815  boost::system::error_code ignored;
816  boost::filesystem::remove_all(ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME,
817  ignored);
818  mkdir((ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME).c_str(), 0755);
819 
820  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
821  {
822  symlink(ARTDAQTableBase::getFlatFHICLFilename(
823  ARTDAQTableBase::ARTDAQAppType::BoardReader, reader.label)
824  .c_str(),
825  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
826  reader.label + ".fcl")
827  .c_str());
828  }
829  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
830  {
831  symlink(ARTDAQTableBase::getFlatFHICLFilename(
832  ARTDAQTableBase::ARTDAQAppType::EventBuilder, builder.label)
833  .c_str(),
834  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
835  builder.label + ".fcl")
836  .c_str());
837  }
838  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
839  {
840  symlink(ARTDAQTableBase::getFlatFHICLFilename(
841  ARTDAQTableBase::ARTDAQAppType::DataLogger, logger.label)
842  .c_str(),
843  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
844  logger.label + ".fcl")
845  .c_str());
846  }
847  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
848  {
849  symlink(ARTDAQTableBase::getFlatFHICLFilename(
850  ARTDAQTableBase::ARTDAQAppType::Dispatcher, dispatcher.label)
851  .c_str(),
852  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
853  dispatcher.label + ".fcl")
854  .c_str());
855  }
856  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
857  {
858  symlink(ARTDAQTableBase::getFlatFHICLFilename(
859  ARTDAQTableBase::ARTDAQAppType::RoutingManager, rmanager.label)
860  .c_str(),
861  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
862  rmanager.label + ".fcl")
863  .c_str());
864  }
865 
866  thread_progress_bar_.step();
867 
868  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
869  getDAQState_();
870  if(daqinterface_state_ != "stopped" && daqinterface_state_ != "")
871  {
872  __GEN_SS__ << "Cannot configure DAQInterface because it is in the wrong state"
873  << " (" << daqinterface_state_ << " != stopped)!" << __E__;
874  __GEN_SS_THROW__
875  }
876 
877  set_thread_message_("Calling setdaqcomps");
878  __GEN_COUT__ << "Calling setdaqcomps" << __E__;
879  __GEN_COUT__ << "Status before setdaqcomps: " << daqinterface_state_ << __E__;
880  PyObject* pName1 = PyUnicode_FromString("setdaqcomps");
881 
882  PyObject* readerDict = PyDict_New();
883  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
884  {
885  label_to_proc_type_map_[reader.label] = "BoardReader";
886  PyObject* readerName = PyUnicode_FromString(reader.label.c_str());
887 
888  int list_size = reader.allowed_processors != "" ? 4 : 3;
889 
890  PyObject* readerData = PyList_New(list_size);
891  PyObject* readerHost = PyUnicode_FromString(reader.hostname.c_str());
892  PyObject* readerPort = PyUnicode_FromString("-1");
893  PyObject* readerSubsystem =
894  PyUnicode_FromString(std::to_string(reader.subsystem).c_str());
895  PyList_SetItem(readerData, 0, readerHost);
896  PyList_SetItem(readerData, 1, readerPort);
897  PyList_SetItem(readerData, 2, readerSubsystem);
898  if(reader.allowed_processors != "")
899  {
900  PyObject* readerAllowedProcessors =
901  PyUnicode_FromString(reader.allowed_processors.c_str());
902  PyList_SetItem(readerData, 3, readerAllowedProcessors);
903  }
904  PyDict_SetItem(readerDict, readerName, readerData);
905  }
906  PyObject* res1 =
907  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName1, readerDict, NULL);
908  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("setdaqcomps"), "setdaqcomps");
909 
910  Py_DECREF(readerDict);
911 
912  if(res1 == NULL)
913  {
914  std::string err = capturePyErr("setdaqcomps");
915  __GEN_SS__ << "Error calling setdaqcomps transition: " << err << __E__;
916  __GEN_SS_THROW__;
917  }
918 
919  getDAQState_();
920  __GEN_COUT__ << "Status after setdaqcomps: " << daqinterface_state_ << __E__;
921 
922  thread_progress_bar_.step();
923  set_thread_message_("Calling do_boot");
924  __GEN_COUT_INFO__ << "Calling do_boot" << __E__;
925  __GEN_COUT__ << "Status before boot: " << daqinterface_state_ << __E__;
926  PyObject* pName2 = PyUnicode_FromString("do_boot");
927  PyObject* pStateArgs1 =
928  PyUnicode_FromString((ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt").c_str());
929  PyObject* res2 =
930  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName2, pStateArgs1, NULL);
931  std::string doBootOutput = captureStderrAndStdout_("do_boot");
932  __COUT_MULTI_LBL__(0, doBootOutput, "do_boot");
933 
934  if(res2 == NULL)
935  {
936  std::string err = capturePyErr();
937  __GEN_COUT_INFO__ << "Error on first boost attempt, recovering and retrying: "
938  << err << __E__;
939 
940  PyObject* pName = PyUnicode_FromString("do_recover");
941  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
942  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_recover"), "do_recover");
943 
944  if(res == NULL)
945  {
946  std::string err = capturePyErr();
947  __GEN_SS__ << "Error calling recover transition!!!! " << err << __E__;
948  if(doBootOutput.size() > OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
949  ss << "... last " << OUT_ON_ERR_SIZE
950  << " characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
951  else
952  ss << doBootOutput;
953  __GEN_SS_THROW__;
954  }
955 
956  thread_progress_bar_.step();
957  set_thread_message_("Calling do_boot (retry)");
958  __GEN_COUT_INFO__ << "Calling do_boot again" << __E__;
959  __GEN_COUT__ << "Status before boot: " << daqinterface_state_ << __E__;
960  PyObject* res3 =
961  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName2, pStateArgs1, NULL);
962  doBootOutput = captureStderrAndStdout_("do_boot (retry)");
963  __COUT_MULTI_LBL__(0, doBootOutput, "do_boot (retry)");
964 
965  if(res3 == NULL)
966  {
967  std::string err = capturePyErr();
968  __GEN_SS__ << "Error calling boot transition (2nd try): " << err << __E__;
969  if(doBootOutput.size() > OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
970  ss << "... last " << OUT_ON_ERR_SIZE
971  << " characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
972  else
973  ss << doBootOutput;
974  __GEN_SS_THROW__;
975  }
976  }
977 
978  getDAQState_();
979  if(daqinterface_state_ != "booted")
980  {
981  std::cout << "Do boot output on error: \n" << doBootOutput << __E__;
982  __GEN_SS__ << "DAQInterface boot transition failed! "
983  << "Status after boot attempt: " << daqinterface_state_ << __E__;
984 
985  if(doBootOutput.size() > OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
986  ss << "... last " << OUT_ON_ERR_SIZE
987  << " characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
988  else
989  ss << doBootOutput;
990  __GEN_SS_THROW__;
991  }
992  __GEN_COUT__ << "Status after boot: " << daqinterface_state_ << __E__;
993 
994  thread_progress_bar_.step();
995  set_thread_message_("Calling do_config");
996  __GEN_COUT_INFO__ << "Calling do_config" << __E__;
997  __GEN_COUT__ << "Status before config: " << daqinterface_state_ << __E__;
998  std::string doConfigOutput = "";
999  { //do_config call
1000  PyObject* pName3 = PyUnicode_FromString("do_config");
1001  PyObject* pStateArgs2 = Py_BuildValue("[s]", FAKE_CONFIG_NAME);
1002  PyObject* res3 =
1003  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName3, pStateArgs2, NULL);
1004  doConfigOutput = captureStderrAndStdout_("do_config");
1005  __COUT_MULTI_LBL__(0, doConfigOutput, "do_config");
1006  if(res3 == NULL)
1007  {
1008  std::string err = capturePyErr("do_config");
1009  __GEN_SS__ << "Error calling config transition: " << err << __E__;
1010  __GEN_SS_THROW__;
1011  }
1012  const char* res_cstr = PyUnicode_AsUTF8(res3);
1013  __SUP_COUTT__ << "do_config result=" << (res_cstr ? res_cstr : "") << __E__;
1014  } //end do_config call
1015 
1016  getDAQState_();
1017  if(daqinterface_state_ != "ready")
1018  {
1019  __GEN_SS__ << "DAQInterface config transition failed!" << __E__
1020  << "Supervisor state: \"" << daqinterface_state_ << "\" != \"ready\" "
1021  << __E__;
1022  auto doConfigOutput_recover_i =
1023  doConfigOutput.find("RECOVER transition underway");
1024  if(doConfigOutput_recover_i == std::string::npos)
1025  ss << doConfigOutput;
1026  else if(doConfigOutput_recover_i >
1027  OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
1028  ss << "... tail of " << OUT_ON_ERR_SIZE << " characters before recovery: "
1029  << doConfigOutput.substr(
1030  doConfigOutput_recover_i - OUT_ON_ERR_SIZE +
1031  std::string("RECOVER transition underway").size(),
1032  OUT_ON_ERR_SIZE);
1033  else
1034  ss << doConfigOutput.substr(
1035  0,
1036  doConfigOutput_recover_i +
1037  std::string("RECOVER transition underway").size());
1038  __GEN_SS_THROW__;
1039  }
1040  __GEN_COUT__ << "Status after config: " << daqinterface_state_ << __E__;
1041  thread_progress_bar_.complete();
1042  set_thread_message_("Configured");
1043  __GEN_COUT_INFO__ << "Configured." << __E__;
1044 
1045 } // end configuringThread()
1046 catch(const std::runtime_error& e)
1047 {
1048  set_thread_message_("ERROR");
1049  __SS__ << "Error was caught while configuring: " << e.what() << __E__;
1050  __COUT_ERR__ << "\n" << ss.str();
1051  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1052  thread_error_message_ = ss.str();
1053 }
1054 catch(...)
1055 {
1056  set_thread_message_("ERROR");
1057  __SS__ << "Unknown error was caught while configuring. Please checked the logs."
1058  << __E__;
1059  __COUT_ERR__ << "\n" << ss.str();
1060 
1061  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1062 
1063  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1064  thread_error_message_ = ss.str();
1065 } // end configuringThread() error handling
1066 
1067 //==============================================================================
1068 void ARTDAQSupervisor::transitionHalting(toolbox::Event::Reference /*event*/)
1069 try
1070 {
1071  set_thread_message_("Halting");
1072  __SUP_COUT__ << "Halting..." << __E__;
1073 
1074  int tries = 0;
1075  while(tries++ < 5)
1076  {
1077  std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1078  std::try_to_lock);
1079  if(!lk.owns_lock()) //if lock not availabe, just report last status
1080  {
1081  __COUTS__(50) << "Do not have python lock for halt. tries=" << tries << __E__;
1082  sleep(1);
1083  continue;
1084  }
1085  __COUTS__(50) << "Have python lock!" << __E__;
1086 
1087  // std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1088  getDAQState_();
1089  __SUP_COUT__ << "Status before halt: " << daqinterface_state_ << __E__;
1090 
1091  if(daqinterface_state_ == "running")
1092  {
1093  // First stop before halting
1094  PyObject* pName = PyUnicode_FromString("do_stop_running");
1095  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1096  __COUT_MULTI_LBL__(
1097  0, captureStderrAndStdout_("do_stop_running"), "do_stop_running");
1098 
1099  if(res == NULL)
1100  {
1101  std::string err = capturePyErr();
1102  __SS__ << "Error calling DAQ Interface stop transition: " << err
1103  << __E__;
1104  __SUP_SS_THROW__;
1105  }
1106  }
1107 
1108  PyObject* pName = PyUnicode_FromString("do_command");
1109  PyObject* pArg = PyUnicode_FromString("Shutdown");
1110  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1111  __COUT_MULTI_LBL__(
1112  0, captureStderrAndStdout_("do_command Shutdown"), "do_command Shutdown");
1113 
1114  if(res == NULL)
1115  {
1116  std::string err = capturePyErr();
1117  __SS__ << "Error calling DAQ Interface halt transition: " << err << __E__;
1118  __SUP_SS_THROW__;
1119  }
1120 
1121  getDAQState_();
1122  __SUP_COUT__ << "Status after halt: " << daqinterface_state_ << __E__;
1123  break;
1124  } //end retry loop
1125 
1126  __SUP_COUT__ << "Halted." << __E__;
1127  set_thread_message_("Halted");
1128 } // end transitionHalting()
1129 catch(const std::runtime_error& e)
1130 {
1131  const std::string transitionName = "Halting";
1132  // if halting from Failed state, then ignore errors
1133  if(theStateMachine_.getProvenanceStateName() ==
1134  RunControlStateMachine::FAILED_STATE_NAME ||
1135  theStateMachine_.getProvenanceStateName() ==
1136  RunControlStateMachine::HALTED_STATE_NAME)
1137  {
1138  __SUP_COUT_INFO__ << "Error was caught while halting (but ignoring because "
1139  "previous state was '"
1140  << RunControlStateMachine::FAILED_STATE_NAME
1141  << "'): " << e.what() << __E__;
1142  }
1143  else // if not previously in Failed state, then fail
1144  {
1145  __SUP_SS__ << "Error was caught while " << transitionName << ": " << e.what()
1146  << __E__;
1147  __SUP_COUT_ERR__ << "\n" << ss.str();
1148  theStateMachine_.setErrorMessage(ss.str());
1149  throw toolbox::fsm::exception::Exception(
1150  "Transition Error" /*name*/,
1151  ss.str() /* message*/,
1152  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
1153  __LINE__ /*line*/,
1154  __FUNCTION__ /*function*/
1155  );
1156  }
1157 } // end transitionHalting() std::runtime_error exception handling
1158 catch(...)
1159 {
1160  const std::string transitionName = "Halting";
1161  // if halting from Failed state, then ignore errors
1162  if(theStateMachine_.getProvenanceStateName() ==
1163  RunControlStateMachine::FAILED_STATE_NAME ||
1164  theStateMachine_.getProvenanceStateName() ==
1165  RunControlStateMachine::HALTED_STATE_NAME)
1166  {
1167  __SUP_COUT_INFO__ << "Unknown error was caught while halting (but ignoring "
1168  "because previous state was '"
1169  << RunControlStateMachine::FAILED_STATE_NAME << "')." << __E__;
1170  }
1171  else // if not previously in Failed state, then fail
1172  {
1173  __SUP_SS__ << "Unknown error was caught while " << transitionName
1174  << ". Please checked the logs." << __E__;
1175  __SUP_COUT_ERR__ << "\n" << ss.str();
1176  theStateMachine_.setErrorMessage(ss.str());
1177 
1178  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1179 
1180  throw toolbox::fsm::exception::Exception(
1181  "Transition Error" /*name*/,
1182  ss.str() /* message*/,
1183  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
1184  __LINE__ /*line*/,
1185  __FUNCTION__ /*function*/
1186  );
1187  }
1188 } // end transitionHalting() exception handling
1189 
1190 //==============================================================================
1191 void ARTDAQSupervisor::transitionInitializing(toolbox::Event::Reference /*event*/)
1192 try
1193 {
1194  set_thread_message_("Initializing");
1195  __SUP_COUT__ << "Initializing..." << __E__;
1196  init();
1197  __SUP_COUT__ << "Initialized." << __E__;
1198  set_thread_message_("Initialized");
1199 } // end transitionInitializing()
1200 catch(const std::runtime_error& e)
1201 {
1202  __SS__ << "Error was caught while Initializing: " << e.what() << __E__;
1203  __SS_THROW__;
1204 }
1205 catch(...)
1206 {
1207  __SS__ << "Unknown error was caught while Initializing. Please checked the logs."
1208  << __E__;
1209  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1210  __SS_THROW__;
1211 } // end transitionInitializing() error handling
1212 
1213 //==============================================================================
1214 void ARTDAQSupervisor::transitionPausing(toolbox::Event::Reference /*event*/)
1215 try
1216 {
1217  set_thread_message_("Pausing");
1218  __SUP_COUT__ << "Pausing..." << __E__;
1219  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1220 
1221  getDAQState_();
1222  __SUP_COUT__ << "Status before pause: " << daqinterface_state_ << __E__;
1223 
1224  PyObject* pName = PyUnicode_FromString("do_command");
1225  PyObject* pArg = PyUnicode_FromString("Pause");
1226  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1227  __COUT_MULTI_LBL__(
1228  0, captureStderrAndStdout_("do_command Pause"), "do_command Pause");
1229 
1230  if(res == NULL)
1231  {
1232  std::string err = capturePyErr();
1233  __SS__ << "Error calling DAQ Interface Pause transition: " << err << __E__;
1234  __SUP_SS_THROW__;
1235  }
1236 
1237  getDAQState_();
1238  __SUP_COUT__ << "Status after pause: " << daqinterface_state_ << __E__;
1239 
1240  __SUP_COUT__ << "Paused." << __E__;
1241  set_thread_message_("Paused");
1242 } // end transitionPausing()
1243 catch(const std::runtime_error& e)
1244 {
1245  __SS__ << "Error was caught while Pausing: " << e.what() << __E__;
1246  __SS_THROW__;
1247 }
1248 catch(...)
1249 {
1250  __SS__ << "Unknown error was caught while Pausing. Please checked the logs." << __E__;
1251  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1252  __SS_THROW__;
1253 } // end transitionPausing() error handling
1254 
1255 //==============================================================================
1256 void ARTDAQSupervisor::transitionResuming(toolbox::Event::Reference /*event*/)
1257 try
1258 {
1259  set_thread_message_("Resuming");
1260  __SUP_COUT__ << "Resuming..." << __E__;
1261  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1262 
1263  getDAQState_();
1264  __SUP_COUT__ << "Status before resume: " << daqinterface_state_ << __E__;
1265  PyObject* pName = PyUnicode_FromString("do_command");
1266  PyObject* pArg = PyUnicode_FromString("Resume");
1267  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1268  __COUT_MULTI_LBL__(
1269  0, captureStderrAndStdout_("do_command Resume"), "do_command Resume");
1270 
1271  if(res == NULL)
1272  {
1273  std::string err = capturePyErr();
1274  __SS__ << "Error calling DAQ Interface Resume transition: " << err << __E__;
1275  __SUP_SS_THROW__;
1276  }
1277  getDAQState_();
1278  __SUP_COUT__ << "Status after resume: " << daqinterface_state_ << __E__;
1279  __SUP_COUT__ << "Resumed." << __E__;
1280  set_thread_message_("Resumed");
1281 } // end transitionResuming()
1282 catch(const std::runtime_error& e)
1283 {
1284  __SS__ << "Error was caught while Resuming: " << e.what() << __E__;
1285  __SS_THROW__;
1286 }
1287 catch(...)
1288 {
1289  __SS__ << "Unknown error was caught while Resuming. Please checked the logs."
1290  << __E__;
1291  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1292  __SS_THROW__;
1293 } // end transitionResuming() error handling
1294 
1295 //==============================================================================
1296 void ARTDAQSupervisor::transitionStarting(toolbox::Event::Reference /*event*/)
1297 try
1298 {
1299  __SUP_COUT__ << "transitionStarting" << __E__;
1300 
1301  // first time launch thread because artdaq Supervisor may take a while
1302  if(RunControlStateMachine::getIterationIndex() == 0 &&
1303  RunControlStateMachine::getSubIterationIndex() == 0)
1304  {
1305  thread_error_message_ = "";
1306  thread_progress_bar_.resetProgressBar(0);
1307  last_thread_progress_update_ = time(0); // initialize timeout timer
1308 
1309  // start configuring thread
1310  std::thread(&ARTDAQSupervisor::startingThread, this).detach();
1311 
1312  __SUP_COUT_INFO__ << "Starting thread started." << __E__;
1313 
1314  RunControlStateMachine::
1315  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1316  }
1317  else // not first time
1318  {
1319  std::string errorMessage;
1320  {
1321  std::lock_guard<std::mutex> lock(
1322  thread_mutex_); // lock out for remainder of scope
1323  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
1324  }
1325  int progress = thread_progress_bar_.read();
1326  __SUP_COUTV__(errorMessage);
1327  __SUP_COUTV__(progress);
1328  __SUP_COUTV__(thread_progress_bar_.isComplete());
1329 
1330  // check for done and error messages
1331  if(errorMessage == "" && // if no update in 600 seconds, give up
1332  time(0) - last_thread_progress_update_ > 600)
1333  {
1334  __SUP_SS__ << "There has been no update from the start thread for "
1335  << (time(0) - last_thread_progress_update_)
1336  << " seconds, assuming something is wrong and giving up! "
1337  << "Last progress received was " << progress << __E__;
1338  errorMessage = ss.str();
1339  }
1340 
1341  if(errorMessage != "")
1342  {
1343  __SUP_SS__ << "Error was caught in starting thread: " << errorMessage
1344  << __E__;
1345  __SUP_COUT_ERR__ << "\n" << ss.str();
1346 
1347  theStateMachine_.setErrorMessage(ss.str());
1348  throw toolbox::fsm::exception::Exception(
1349  "Transition Error" /*name*/,
1350  ss.str() /* message*/,
1351  "CoreSupervisorBase::transitionStarting" /*module*/,
1352  __LINE__ /*line*/,
1353  __FUNCTION__ /*function*/
1354  );
1355  }
1356 
1357  if(!thread_progress_bar_.isComplete())
1358  {
1359  __SUP_COUT__ << "Not done yet..." << __E__;
1360  //attempt to get live view of python output (not working and not needed with new Tee Buffer solution)
1361  // __COUT_MULTI_LBL__(0, captureStderrAndStdout_("statuscheck"), "statuscheck");
1362 
1363  RunControlStateMachine::
1364  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1365 
1366  if(last_thread_progress_read_ != progress)
1367  {
1368  last_thread_progress_read_ = progress;
1369  last_thread_progress_update_ = time(0);
1370  }
1371 
1372  sleep(1 /*seconds*/);
1373  }
1374  else
1375  {
1376  __SUP_COUT_INFO__ << "Starting transition completed!" << __E__;
1377  __SUP_COUTV__(getProcessInfo_());
1378  }
1379  }
1380 
1381  return;
1382 
1383 } // end transitionStarting()
1384 catch(const std::runtime_error& e)
1385 {
1386  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1387  __SS_THROW__;
1388 }
1389 catch(...)
1390 {
1391  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1392  << __E__;
1393  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1394  __SS_THROW__;
1395 } // end transitionStarting() error handling
1396 
1397 //==============================================================================
1398 void ARTDAQSupervisor::startingThread()
1399 try
1400 {
1401  std::string uid = theConfigurationManager_
1402  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
1403  "/" + CorePropertySupervisorBase::getSupervisorUID() +
1404  "/" + "LinkToSupervisorTable")
1405  .getValueAsString();
1406 
1407  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
1408  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
1409  __GEN_COUT__ << "Starting..." << __E__;
1410  set_thread_message_("Starting");
1411 
1412  thread_progress_bar_.step();
1413  stop_runner_();
1414  {
1415  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1416  getDAQState_();
1417  __GEN_COUT__ << "Status before start: " << daqinterface_state_ << __E__;
1418  auto runNumber = SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
1419  .getParameters()
1420  .getValue("RunNumber");
1421 
1422  thread_progress_bar_.step();
1423 
1424  __GEN_COUT_INFO__ << "Calling do_start_running" << __E__;
1425  PyObject* pName = PyUnicode_FromString("do_start_running");
1426  int run_number = std::stoi(runNumber);
1427  PyObject* pStateArgs = PyLong_FromLong(run_number);
1428  PyObject* res =
1429  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pStateArgs, NULL);
1430  __COUT_MULTI_LBL__(
1431  0, captureStderrAndStdout_("do_start_running"), "do_start_running");
1432 
1433  thread_progress_bar_.step();
1434 
1435  if(res == NULL)
1436  {
1437  std::string err = capturePyErr();
1438  __SS__ << "Error calling start transition: " << err << __E__;
1439  __GEN_SS_THROW__;
1440  }
1441  getDAQState_();
1442 
1443  thread_progress_bar_.step();
1444 
1445  __GEN_COUT__ << "Status after start: " << daqinterface_state_ << __E__;
1446  if(daqinterface_state_ != "running")
1447  {
1448  __SS__ << "DAQInterface start transition failed!" << __E__;
1449  __GEN_SS_THROW__;
1450  }
1451 
1452  thread_progress_bar_.step();
1453  }
1454  start_runner_();
1455  set_thread_message_("Started");
1456  thread_progress_bar_.step();
1457 
1458  __GEN_COUT_INFO__ << "Started." << __E__;
1459  thread_progress_bar_.complete();
1460 
1461 } // end startingThread()
1462 catch(const std::runtime_error& e)
1463 {
1464  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1465  __COUT_ERR__ << "\n" << ss.str();
1466  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1467  thread_error_message_ = ss.str();
1468 }
1469 catch(...)
1470 {
1471  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1472  << __E__;
1473  __COUT_ERR__ << "\n" << ss.str();
1474 
1475  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1476 
1477  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1478  thread_error_message_ = ss.str();
1479 } // end startingThread() error handling
1480 
1481 //==============================================================================
1482 void ARTDAQSupervisor::transitionStopping(toolbox::Event::Reference /*event*/)
1483 try
1484 {
1485  __SUP_COUT__ << "Stopping..." << __E__;
1486  set_thread_message_("Stopping");
1487  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1488  getDAQState_();
1489  __SUP_COUT__ << "Status before stop: " << daqinterface_state_ << __E__;
1490  PyObject* pName = PyUnicode_FromString("do_stop_running");
1491  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1492  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_stop_running"), "do_stop_running");
1493 
1494  if(res == NULL)
1495  {
1496  std::string err = capturePyErr();
1497  __SS__ << "Error calling DAQ Interface stop transition: " << err << __E__;
1498  __SUP_SS_THROW__;
1499  }
1500  getDAQState_();
1501  __SUP_COUT__ << "Status after stop: " << daqinterface_state_ << __E__;
1502  __SUP_COUT__ << "Stopped." << __E__;
1503  set_thread_message_("Stopped");
1504 } // end transitionStopping()
1505 catch(const std::runtime_error& e)
1506 {
1507  __SS__ << "Error was caught while Stopping: " << e.what() << __E__;
1508  __SS_THROW__;
1509 }
1510 catch(...)
1511 {
1512  __SS__ << "Unknown error was caught while Stopping. Please checked the logs."
1513  << __E__;
1514  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1515  __SS_THROW__;
1516 } // end transitionStopping() error handling
1517 
1518 //==============================================================================
1519 void ots::ARTDAQSupervisor::enteringError(toolbox::Event::Reference /*event*/)
1520 {
1521  __SUP_COUT__ << "Entering error recovery state" << __E__;
1522  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1523  getDAQState_();
1524  __SUP_COUT__ << "Status before error: " << daqinterface_state_ << __E__;
1525 
1526  PyObject* pName = PyUnicode_FromString("do_recover");
1527  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1528  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_recover"), "do_recover");
1529 
1530  if(res == NULL)
1531  {
1532  std::string err = capturePyErr();
1533  //do not throw exception when entering error, because failing DAQ interface could be the reason for error in first place
1534  __SUP_COUT_WARN__ << "Error calling DAQ Interface recover transition: " << err
1535  << __E__;
1536  return;
1537  }
1538  getDAQState_();
1539  __SUP_COUT__ << "Status after error: " << daqinterface_state_ << __E__;
1540  __SUP_COUT__ << "EnteringError DONE." << __E__;
1541 
1542 } // end enteringError()
1543 
1544 std::vector<SupervisorInfo::SubappInfo> ots::ARTDAQSupervisor::getSubappInfo(void)
1545 {
1546  auto apps = getAndParseProcessInfo_();
1547  std::vector<SupervisorInfo::SubappInfo> output;
1548  for(auto& app : apps)
1549  {
1551 
1552  info.name = app.label;
1553  info.detail = "Rank " + std::to_string(app.rank) + ", subsystem " +
1554  std::to_string(app.subsystem);
1555  info.lastStatusTime = time(0);
1556  info.progress = 100;
1557  info.status = artdaqStateToOtsState(app.state);
1558  info.url = "http://" + app.host + ":" + std::to_string(app.port) + "/RPC2";
1559  info.class_name = "ARTDAQ " + labelToProcType_(app.label);
1560 
1561  output.push_back(info);
1562  }
1563  return output;
1564 }
1565 
1566 //==============================================================================
1567 std::string ots::ARTDAQSupervisor::capturePyErr(std::string label /* = "" */)
1568 {
1569  return captureStderrAndStdout_(
1570  label); //new way with Tee output of stdout and stderr to tee_buffer
1571 
1572  PyErr_Print(); // dump the Python exception <-- this writes into stringIO_err, not the terminal
1573  if(label.size())
1574  label += ' '; //for nice printing
1575 
1576  PyObject* err_text = PyObject_CallMethod(stringIO_err, "getvalue", NULL);
1577  std::string err = "";
1578  if(!err_text)
1579  err = "Capture of " + label + "PyErr failed.";
1580  else
1581  err = "Capture of " + label + "PyErr: " + std::string(PyUnicode_AsUTF8(err_text));
1582 
1583  //clear buffer for reuse
1584  {
1585  PyObject* r1 = PyObject_CallMethod(stringIO_err, "seek", "i", 0);
1586  Py_XDECREF(r1);
1587  PyObject* r2 = PyObject_CallMethod(stringIO_err, "truncate", NULL);
1588  Py_XDECREF(r2);
1589  }
1590  return err;
1591 } //end captureStderr()
1592 
1593 //==============================================================================
1594 std::string ots::ARTDAQSupervisor::captureStderrAndStdout_(std::string label /* = "" */)
1595 {
1596  if(!stringIO_out)
1597  return ""; // Not defined
1598  if(label.size())
1599  label += ' '; //for nice printing
1600 
1601  std::string outString = "";
1602  PyObject* out = PyObject_CallMethod(stringIO_out, "getvalue", NULL);
1603  if(out == NULL)
1604  return "";
1605 
1606  const char* text = PyUnicode_AsUTF8(out);
1607  // use text
1608  Py_DECREF(out);
1609 
1610  return text; //new way with Tee output of stdout and stderr to tee_buffer
1611 
1612  try
1613  {
1614  //------------- capture stdout and stderr
1615  PyObject* out_text = PyObject_CallMethod(stringIO_out, "getvalue", NULL);
1616  if(!out_text)
1617  PyErr_Print(); // dump the Python exception <-- this writes into stringIO_err, not the terminal
1618  else
1619  {
1620  const char* out_cstr = PyUnicode_AsUTF8(out_text);
1621  if(out_cstr && strlen(out_cstr))
1622  outString = "Captured " + label + "stdout:\n" +
1623  std::string(out_cstr ? out_cstr : "") + "\n";
1624  else
1625  outString = "Captured " + label + "stdout empty.\n";
1626  }
1627 
1628  std::string errString = "";
1629  PyObject* err_text = PyObject_CallMethod(stringIO_err, "getvalue", NULL);
1630  if(!err_text)
1631  __SUP_COUT__ << "Capture of " << label << "stderr failed.";
1632  else
1633  {
1634  const char* err_cstr = PyUnicode_AsUTF8(err_text);
1635  if(err_cstr && strlen(err_cstr))
1636  errString = "Captured " + label + "stderr:\n" +
1637  std::string(err_cstr ? err_cstr : "") + "\n";
1638  else
1639  errString = "Captured " + label + "stderr empty.\n";
1640  }
1641 
1642  //clear buffers for reuse
1643  {
1644  PyObject* r1 = PyObject_CallMethod(stringIO_out, "seek", "i", 0);
1645  Py_XDECREF(r1);
1646  PyObject* r2 = PyObject_CallMethod(stringIO_out, "truncate", NULL);
1647  Py_XDECREF(r2);
1648  }
1649  {
1650  PyObject* r1 = PyObject_CallMethod(stringIO_err, "seek", "i", 0);
1651  Py_XDECREF(r1);
1652  PyObject* r2 = PyObject_CallMethod(stringIO_err, "truncate", NULL);
1653  Py_XDECREF(r2);
1654  }
1655  //------------- end capture stdout and stderr
1656  return errString + outString;
1657  }
1658  catch(...) //make sure to release python thread lock
1659  {
1660  __COUT_ERR__ << "Exception caught while capturing python output!" << __E__;
1661  throw;
1662  }
1663 
1664 } //end captureStderrAndStdout_()
1665 
1666 //==============================================================================
1667 void ots::ARTDAQSupervisor::getDAQState_()
1668 {
1669  __SUP_COUTS__(50) << "Getting DAQInterface python lock" << __E__;
1670  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1671  __SUP_COUTS__(50) << "Have DAQInterface python lock" << __E__;
1672 
1673  if(daqinterface_ptr_ == nullptr)
1674  {
1675  daqinterface_state_ = "";
1676  __SUP_COUT_WARN__
1677  << "daqinterface_ptr_ is not initialized! Check logs for errors." << __E__;
1678  return;
1679  }
1680 
1681  int tries = 0;
1682  while(tries < 5)
1683  {
1684  PyObject* pName = PyUnicode_FromString("state");
1685  PyObject* pArg = PyUnicode_FromString("DAQInterface");
1686  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1687 
1688  if(res == NULL)
1689  {
1690  tries++;
1691  std::string err = capturePyErr("getDAQState_");
1692  __SS__
1693  << "Attempt n " << tries
1694  << ". Error calling 'state' function from getDAQState_() - here was the "
1695  "value: "
1696  << err << "\n\n"
1697  << StringMacros::stackTrace() << __E__;
1698  // __SUP_SS_THROW__;
1699  //do not throw, just mark state empty
1700  daqinterface_state_ = "";
1701  __COUT__ << ss.str();
1702  if(tries >= 5)
1703  __COUT_ERR__ << ss.str();
1704  usleep(100000);
1705  __SUP_COUTS__(2) << "no getDAQState_ state=" << daqinterface_state_ << __E__;
1706  continue;
1707  }
1708  daqinterface_state_ = std::string(PyUnicode_AsUTF8(res));
1709  __SUP_COUTS__(20) << "getDAQState_ state=" << daqinterface_state_ << __E__;
1710  break;
1711  }
1712 
1713 } // end getDAQState_()
1714 
1715 //==============================================================================
1716 std::string ots::ARTDAQSupervisor::getProcessInfo_(void)
1717 {
1718  __SUP_COUTS__(50) << "Getting DAQInterface state lock" << __E__;
1719  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1720  __SUP_COUTS__(50) << "Have DAQInterface state lock" << __E__;
1721 
1722  if(daqinterface_ptr_ == nullptr)
1723  {
1724  return "";
1725  }
1726 
1727  PyObject* pName = PyUnicode_FromString("artdaq_process_info");
1728  PyObject* pArg = PyUnicode_FromString("DAQInterface");
1729  PyObject* pArg2 = PyBool_FromLong(true);
1730  PyObject* res =
1731  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, pArg2, NULL);
1732 
1733  if(res == NULL)
1734  {
1735  std::string err = capturePyErr();
1736  __SS__ << "Error calling artdaq_process_info function: " << err << __E__;
1737  __SUP_SS_THROW__;
1738  return "";
1739  }
1740  //cache status as latest
1741  std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1742  daqinterface_status_ = std::string(PyUnicode_AsUTF8(res));
1743  return daqinterface_status_;
1744 } // end getProcessInfo_()
1745 
1746 std::string ots::ARTDAQSupervisor::artdaqStateToOtsState(std::string state)
1747 {
1748  if(state == "nonexistant")
1749  return RunControlStateMachine::INITIAL_STATE_NAME;
1750  if(state == "Ready")
1751  return "Configured";
1752  if(state == "Running")
1753  return RunControlStateMachine::RUNNING_STATE_NAME;
1754  if(state == "Paused")
1755  return RunControlStateMachine::PAUSED_STATE_NAME;
1756  if(state == "Stopped")
1757  return RunControlStateMachine::HALTED_STATE_NAME;
1758 
1759  TLOG(TLVL_WARNING) << "Unrecognized state name " << state;
1760  return RunControlStateMachine::FAILED_STATE_NAME;
1761 }
1762 
1763 std::string ots::ARTDAQSupervisor::labelToProcType_(std::string label)
1764 {
1765  if(label_to_proc_type_map_.count(label))
1766  {
1767  return label_to_proc_type_map_[label];
1768  }
1769  return "UNKNOWN";
1770 }
1771 
1772 //==============================================================================
1774 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo>
1775 ots::ARTDAQSupervisor::getAndParseProcessInfo_()
1776 {
1777  std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> output;
1778  // full acquire from getProcessInfo_ creates mutex locking up!
1779  // auto info = getProcessInfo_();
1780  std::string info;
1781 
1782  std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1783  std::try_to_lock);
1784  if(!lk.owns_lock()) //if lock not availabe, just report last status
1785  {
1786  __COUTS__(50) << "Do not have python lock." << __E__;
1787  std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1788  info = daqinterface_status_;
1789  }
1790  else //have lock! so retrieve Python Interface status
1791  {
1792  __COUTS__(50) << "Have python lock!" << __E__;
1793  info = getProcessInfo_();
1794  }
1795  __COUTVS__(20, info);
1796 
1797  auto procs = tokenize_(info);
1798 
1799  // 0: Whole string
1800  // 1: Process Label
1801  // 2: Process host
1802  // 3: Process port
1803  // 4: Process subsystem
1804  // 5: Process Rank
1805  // 6: Process state
1806  std::regex re("(.*?) at ([^:]*):(\\d+) \\(subsystem (\\d+), rank (\\d+)\\): (.*)");
1807 
1808  for(auto& proc : procs)
1809  {
1810  std::smatch match;
1811  if(std::regex_match(proc, match, re))
1812  {
1813  DAQInterfaceProcessInfo info;
1814 
1815  info.label = match[1];
1816  info.host = match[2];
1817  info.port = std::stoi(match[3]);
1818  info.subsystem = std::stoi(match[4]);
1819  info.rank = std::stoi(match[5]);
1820  info.state = match[6];
1821 
1822  output.push_back(info);
1823  }
1824  }
1825  return output;
1826 } // end getAndParseProcessInfo_()
1827 
1828 //==============================================================================
1830  std::unique_ptr<artdaq::CommanderInterface>>>
1831 ots::ARTDAQSupervisor::makeCommandersFromProcessInfo()
1832 {
1833  std::list<
1834  std::pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>>
1835  output;
1836  auto infos = getAndParseProcessInfo_();
1837 
1838  for(auto& info : infos)
1839  {
1840  artdaq::Commandable cm;
1841  fhicl::ParameterSet ps;
1842 
1843  ps.put<std::string>("commanderPluginType", "xmlrpc");
1844  ps.put<int>("id", info.port);
1845  ps.put<std::string>("server_url", info.host);
1846 
1847  output.emplace_back(std::make_pair<DAQInterfaceProcessInfo,
1848  std::unique_ptr<artdaq::CommanderInterface>>(
1849  std::move(info), artdaq::MakeCommanderPlugin(ps, cm)));
1850  }
1851 
1852  return output;
1853 } // end makeCommandersFromProcessInfo()
1854 
1855 //==============================================================================
1856 std::list<std::string> ots::ARTDAQSupervisor::tokenize_(std::string const& input)
1857 {
1858  size_t pos = 0;
1859  std::list<std::string> output;
1860 
1861  while(pos != std::string::npos && pos < input.size())
1862  {
1863  auto newpos = input.find('\n', pos);
1864  if(newpos != std::string::npos)
1865  {
1866  output.emplace_back(input, pos, newpos - pos);
1867  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1868  pos = newpos + 1;
1869  }
1870  else
1871  {
1872  output.emplace_back(input, pos);
1873  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1874  pos = newpos;
1875  }
1876  }
1877  return output;
1878 } // end tokenize_()
1879 
1880 //==============================================================================
1881 void ots::ARTDAQSupervisor::daqinterfaceRunner_()
1882 {
1883  TLOG(TLVL_TRACE) << "Runner thread starting";
1884  runner_running_ = true;
1885  while(runner_running_)
1886  {
1887  if(daqinterface_ptr_ != NULL)
1888  {
1889  std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1890  getDAQState_();
1891  std::string state_before = daqinterface_state_;
1892 
1893  __SUP_COUTS__(2) << "Runner state_before=" << state_before
1894  << " state now=" << daqinterface_state_
1895  << " ?= running, ready, or booted" << __E__;
1896 
1897  if(daqinterface_state_ == "running" || daqinterface_state_ == "ready" ||
1898  daqinterface_state_ == "booted")
1899  {
1900  try
1901  {
1902  TLOG(TLVL_TRACE) << "Calling DAQInterface::check_proc_heartbeats";
1903  PyObject* pName = PyUnicode_FromString("check_proc_heartbeats");
1904  PyObject* res =
1905  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1906  __COUT_MULTI_LBL__(1,
1907  captureStderrAndStdout_("check_proc_heartbeats"),
1908  "check_proc_heartbeats");
1909  TLOG(TLVL_TRACE)
1910  << "Done with DAQInterface::check_proc_heartbeats call";
1911 
1912  if(res == NULL)
1913  {
1914  runner_running_ = false;
1915  std::string err = capturePyErr("check_proc_heartbeats");
1916  __SS__ << "Error calling check_proc_heartbeats function: " << err
1917  << __E__;
1918  __SUP_SS_THROW__;
1919  break;
1920  }
1921  }
1922  catch(cet::exception& ex)
1923  {
1924  runner_running_ = false;
1925  std::string err = capturePyErr("check_proc_heartbeats");
1926  __SS__ << "An cet::exception occurred while calling "
1927  "check_proc_heartbeats function "
1928  << ex.explain_self() << ": " << err << __E__;
1929  __SUP_SS_THROW__;
1930  break;
1931  }
1932  catch(std::exception& ex)
1933  {
1934  runner_running_ = false;
1935  std::string err = capturePyErr("check_proc_heartbeats");
1936  __SS__ << "An std::exception occurred while calling "
1937  "check_proc_heartbeats function: "
1938  << ex.what() << "\n\n"
1939  << err << __E__;
1940  __SUP_SS_THROW__;
1941  break;
1942  }
1943  catch(...)
1944  {
1945  runner_running_ = false;
1946  std::string err = capturePyErr("check_proc_heartbeats");
1947  __SS__ << "An unknown Error occurred while calling "
1948  "check_proc_heartbeats function: "
1949  << err << __E__;
1950  __SUP_SS_THROW__;
1951  break;
1952  }
1953 
1954  lk.unlock();
1955  getDAQState_();
1956  if(daqinterface_state_ != state_before)
1957  {
1958  runner_running_ = false;
1959  lk.unlock();
1960  __SS__ << "DAQInterface state unexpectedly changed from "
1961  << state_before << " to " << daqinterface_state_
1962  << ". Check supervisor log file for more info!" << __E__;
1963  __SUP_SS_THROW__;
1964  break;
1965  }
1966  }
1967  }
1968  else
1969  {
1970  __SUP_COUT__ << "daqinterface_ptr_ is null" << __E__;
1971  break;
1972  }
1973  usleep(1000000);
1974  }
1975  runner_running_ = false;
1976  TLOG(TLVL_TRACE) << "Runner thread complete";
1977 } // end daqinterfaceRunner_()
1978 
1979 //==============================================================================
1980 void ots::ARTDAQSupervisor::stop_runner_()
1981 {
1982  runner_running_ = false;
1983  if(runner_thread_ && runner_thread_->joinable())
1984  {
1985  runner_thread_->join();
1986  runner_thread_.reset(nullptr);
1987  }
1988 } // end stop_runner_()
1989 
1990 //==============================================================================
1991 void ots::ARTDAQSupervisor::start_runner_()
1992 {
1993  stop_runner_();
1994  runner_thread_ =
1995  std::make_unique<std::thread>(&ots::ARTDAQSupervisor::daqinterfaceRunner_, this);
1996 } // end start_runner_()
virtual void transitionHalting(toolbox::Event::Reference event) override
virtual void transitionInitializing(toolbox::Event::Reference event) override
static const std::string ARTDAQ_FCL_PATH
Tree-path rule is, if the last link in the path is a group link with a specified group ID,...
void loadTableGroup(const std::string &tableGroupName, const TableGroupKey &tableGroupKey, bool doActivate=false, std::map< std::string, TableVersion > *groupMembers=0, ProgressBar *progressBar=0, std::string *accumulateWarnings=0, std::string *groupComment=0, std::string *groupAuthor=0, std::string *groupCreateTime=0, bool doNotLoadMember=false, std::string *groupTypeString=0, std::map< std::string, std::string > *groupAliases=0, ConfigurationManager::LoadGroupType groupTypeToLoad=ConfigurationManager::LoadGroupType::ALL_TYPES, bool ignoreVersionTracking=false)
ConfigurationTree getNode(const std::string &nodeString, bool doNotThrowOnBrokenUIDLinks=false) const
"root/parent/parent/"
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
const std::string & getValueAsString(bool returnLinkTableValue=false) const
void getValue(T &value) const
ITRACEController * theTRACEController_
only define for an app that receives a command
bool isComplete()
get functions
Definition: ProgressBar.cc:88
void step()
thread safe
Definition: ProgressBar.cc:74
int read()
if stepsToComplete==0, then define any progress as 50%, thread safe
Definition: ProgressBar.cc:120
void complete()
declare complete, thread safe
Definition: ProgressBar.cc:95
void INIT_MF(const char *name)
static std::string stackTrace(void)
std::string name
Also key in map.