otsdaq  3.06.00
ARTDAQSupervisor.cc
1 
2 
3 #define TRACEMF_USE_VERBATIM 1 // for trace longer path filenames
4 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisor.hh"
5 
6 #include "artdaq-core/Utilities/configureMessageFacility.hh"
7 #include "artdaq/BuildInfo/GetPackageBuildInfo.hh"
8 #include "artdaq/DAQdata/Globals.hh"
9 #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
10 #include "cetlib_except/exception.h"
11 #include "fhiclcpp/make_ParameterSet.h"
12 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisorTRACEController.h"
13 
14 #include "artdaq-core/Utilities/ExceptionHandler.hh" /*for artdaq::ExceptionHandler*/
15 
16 #include <boost/exception/all.hpp>
17 #include <boost/filesystem.hpp>
18 
19 #include <signal.h>
20 #include <regex>
21 
22 #define OUT_ON_ERR_SIZE 2000 //tail size of output to include on error
23 
24 using namespace ots;
25 
26 XDAQ_INSTANTIATOR_IMPL(ARTDAQSupervisor)
27 
28 #define FAKE_CONFIG_NAME "ots_config"
29 #define DAQINTERFACE_PORT \
30  std::atoi(__ENV__("ARTDAQ_BASE_PORT")) + \
31  (partition_ * std::atoi(__ENV__("ARTDAQ_PORTS_PER_PARTITION")))
32 
33 static ARTDAQSupervisor* instance = nullptr;
34 static std::unordered_map<int, struct sigaction> old_actions =
35  std::unordered_map<int, struct sigaction>();
36 static bool sighandler_init = false;
37 static void signal_handler(int signum)
38 {
39 // Messagefacility may already be gone at this point, TRACE ONLY!
40 #if TRACE_REVNUM < 1459
41  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
42 #else
43  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
44 #endif
45  << "A signal of type " << signum
46  << " was caught by ARTDAQSupervisor. Shutting down DAQInterface, "
47  "then proceeding with default handlers!";
48 
49  if(instance)
50  instance->destroy();
51 
52  sigset_t set;
53  pthread_sigmask(SIG_UNBLOCK, NULL, &set);
54  pthread_sigmask(SIG_UNBLOCK, &set, NULL);
55 
56 #if TRACE_REVNUM < 1459
57  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
58 #else
59  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
60 #endif
61  << "Calling default signal handler";
62  if(signum != SIGUSR2)
63  {
64  sigaction(signum, &old_actions[signum], NULL);
65  kill(getpid(), signum); // Only send signal to self
66  }
67  else
68  {
69  // Send Interrupt signal if parsing SIGUSR2 (i.e. user-defined exception that
70  // should tear down ARTDAQ)
71  sigaction(SIGINT, &old_actions[SIGINT], NULL);
72  kill(getpid(), SIGINT); // Only send signal to self
73  }
74 }
75 
76 static void init_sighandler(ARTDAQSupervisor* inst)
77 {
78  static std::mutex sighandler_mutex;
79  std::unique_lock<std::mutex> lk(sighandler_mutex);
80 
81  if(!sighandler_init)
82  {
83  sighandler_init = true;
84  instance = inst;
85  std::vector<int> signals = {
86  SIGINT,
87  SIGILL,
88  SIGABRT,
89  SIGFPE,
90  SIGSEGV,
91  SIGPIPE,
92  SIGALRM,
93  SIGTERM,
94  SIGUSR2,
95  SIGHUP}; // SIGQUIT is used by art in normal operation
96  for(auto signal : signals)
97  {
98  struct sigaction old_action;
99  sigaction(signal, NULL, &old_action);
100 
101  // If the old handler wasn't SIG_IGN (it's a handler that just
102  // "ignore" the signal)
103  if(old_action.sa_handler != SIG_IGN)
104  {
105  struct sigaction action;
106  action.sa_handler = signal_handler;
107  sigemptyset(&action.sa_mask);
108  for(auto sigblk : signals)
109  {
110  sigaddset(&action.sa_mask, sigblk);
111  }
112  action.sa_flags = 0;
113 
114  // Replace the signal handler of SIGINT with the one described by
115  // new_action
116  sigaction(signal, &action, NULL);
117  old_actions[signal] = old_action;
118  }
119  }
120  }
121 }
122 
123 //==============================================================================
124 ARTDAQSupervisor::ARTDAQSupervisor(xdaq::ApplicationStub* stub)
125  : CoreSupervisorBase(stub)
126  , daqinterface_ptr_(NULL)
127  , partition_(getSupervisorProperty("partition", 0))
128  , daqinterface_state_("notrunning")
129  , runner_thread_(nullptr)
130 {
131  __SUP_COUT__ << "Constructor." << __E__;
132 
133  INIT_MF("." /*directory used is USER_DATA/LOG/.*/);
134  init_sighandler(this);
135 
136  // Only use system Python
137  // unsetenv("PYTHONPATH");
138  // unsetenv("PYTHONHOME");
139 
140  // Write out settings file
141  auto settings_file = __ENV__("DAQINTERFACE_SETTINGS");
142  std::ofstream o(settings_file, std::ios::trunc);
143 
144  setenv("DAQINTERFACE_PARTITION_NUMBER", std::to_string(partition_).c_str(), 1);
145  auto logfileName = std::string(__ENV__("OTSDAQ_LOG_DIR")) +
146  "/DAQInteface/DAQInterface_partition" +
147  std::to_string(partition_) + ".log";
148  setenv("DAQINTERFACE_LOGFILE", logfileName.c_str(), 1);
149 
150  o << "log_directory: "
151  << getSupervisorProperty("log_directory", std::string(__ENV__("OTSDAQ_LOG_DIR")))
152  << std::endl;
153 
154  {
155  const std::string record_directory = getSupervisorProperty(
156  "record_directory", ARTDAQTableBase::ARTDAQ_FCL_PATH + "/run_records/");
157  mkdir(record_directory.c_str(), 0755);
158  o << "record_directory: " << record_directory << std::endl;
159  }
160 
161  o << "package_hashes_to_save: "
162  << getSupervisorProperty("package_hashes_to_save", "[artdaq]") << std::endl;
163 
164  o << "spack_root_for_bash_scripts: "
165  << getSupervisorProperty("spack_root_for_bash_scripts",
166  std::string(__ENV__("SPACK_ROOT")))
167  << std::endl;
168  o << "boardreader timeout: " << getSupervisorProperty("boardreader_timeout", 30)
169  << std::endl;
170  o << "eventbuilder timeout: " << getSupervisorProperty("eventbuilder_timeout", 30)
171  << std::endl;
172  o << "datalogger timeout: " << getSupervisorProperty("datalogger_timeout", 30)
173  << std::endl;
174  o << "dispatcher timeout: " << getSupervisorProperty("dispatcher_timeout", 30)
175  << std::endl;
176  // Only put max_fragment_size_bytes into DAQInterface settings file if advanced_memory_usage is disabled
177  if(!getSupervisorProperty("advanced_memory_usage", false))
178  {
179  o << "max_fragment_size_bytes: "
180  << getSupervisorProperty("max_fragment_size_bytes", 1048576) << std::endl;
181  }
182  o << "transfer_plugin_to_use: "
183  << getSupervisorProperty("transfer_plugin_to_use", "TCPSocket") << std::endl;
184  if(getSupervisorProperty("transfer_plugin_from_brs", "") != "")
185  {
186  o << "transfer_plugin_from_brs: "
187  << getSupervisorProperty("transfer_plugin_from_brs", "") << std::endl;
188  }
189  if(getSupervisorProperty("transfer_plugin_from_ebs", "") != "")
190  {
191  o << "transfer_plugin_from_ebs: "
192  << getSupervisorProperty("transfer_plugin_from_ebs", "") << std::endl;
193  }
194  if(getSupervisorProperty("transfer_plugin_from_dls", "") != "")
195  {
196  o << "transfer_plugin_from_dls: "
197  << getSupervisorProperty("transfer_plugin_from_dls", "") << std::endl;
198  }
199  o << "all_events_to_all_dispatchers: " << std::boolalpha
200  << getSupervisorProperty("all_events_to_all_dispatchers", true) << std::endl;
201  if(getSupervisorProperty("data_directory_override", "") != "")
202  {
203  o << "data_directory_override: "
204  << getSupervisorProperty("data_directory_override", "") << std::endl;
205  }
206  o << "max_configurations_to_list: "
207  << getSupervisorProperty("max_configurations_to_list", 10) << std::endl;
208  o << "disable_unique_rootfile_labels: "
209  << getSupervisorProperty("disable_unique_rootfile_labels", false) << std::endl;
210  o << "use_messageviewer: " << std::boolalpha
211  << getSupervisorProperty("use_messageviewer", false) << std::endl;
212  o << "use_messagefacility: " << std::boolalpha
213  << getSupervisorProperty("use_messagefacility", true) << std::endl;
214  o << "fake_messagefacility: " << std::boolalpha
215  << getSupervisorProperty("fake_messagefacility", false) << std::endl;
216  o << "kill_existing_processes: " << std::boolalpha
217  << getSupervisorProperty("kill_existing_processes", true) << std::endl;
218  o << "advanced_memory_usage: " << std::boolalpha
219  << getSupervisorProperty("advanced_memory_usage", false) << std::endl;
220  o << "strict_fragment_id_mode: " << std::boolalpha
221  << getSupervisorProperty("strict_fragment_id_mode", false) << std::endl;
222  o << "disable_private_network_bookkeeping: " << std::boolalpha
223  << getSupervisorProperty("disable_private_network_bookkeeping", false) << std::endl;
224  o << "allowed_processors: "
225  << getSupervisorProperty(
226  "allowed_processors",
227  "0-255") // Note this sets a taskset for ALL processes, on all nodes (ex. "1,2,5-7")
228  << std::endl;
229  if(getSupervisorProperty("partition_label_format", "") !=
230  "") //Add to ARTDAQSupervisor properties partition_label_format: "-P%s"
231  o << "partition_label_format: "
232  << getSupervisorProperty("partition_label_format", "") << std::endl;
233 
234  o.close();
235 
236  // destroy current TRACEController and instantiate ARTDAQSupervisorTRACEController
237  if(CorePropertySupervisorBase::theTRACEController_)
238  {
239  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
240  delete CorePropertySupervisorBase::
241  theTRACEController_; // destruct current TRACEController
242  CorePropertySupervisorBase::theTRACEController_ = nullptr;
243  }
244  CorePropertySupervisorBase::theTRACEController_ =
246  ((ARTDAQSupervisorTRACEController*)CorePropertySupervisorBase::theTRACEController_)
247  ->setSupervisorPtr(this);
248 
249  __SUP_COUT__ << "Constructed." << __E__;
250 } // end constructor()
251 
252 //==============================================================================
253 ARTDAQSupervisor::~ARTDAQSupervisor(void)
254 {
255  __SUP_COUT__ << "Destructor." << __E__;
256  destroy();
257 
258  __SUP_COUT__ << "Calling Py_Finalize()" << __E__;
259  Py_Finalize();
260 
261  // CorePropertySupervisorBase would destroy, but since it was created here, attempt to destroy
263  {
264  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
267  }
268 
269  __SUP_COUT__ << "Destructed." << __E__;
270 } // end destructor()
271 
272 //==============================================================================
273 void ARTDAQSupervisor::destroy(void)
274 {
275  __SUP_COUT__ << "Destroying..." << __E__;
276 
277  if(daqinterface_ptr_ != NULL)
278  {
279  __SUP_COUT__ << "Calling recover transition" << __E__;
280  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
281 
282  PyObjectGuard pName(PyUnicode_FromString("do_recover"));
283  PyObjectGuard res(
284  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
285 
286  __SUP_COUT__ << "Making sure that correct state has been reached" << __E__;
287  getDAQState_();
288  while(daqinterface_state_ != "stopped")
289  {
290  getDAQState_();
291  __SUP_COUT__ << "State is " << daqinterface_state_
292  << ", waiting 1s and retrying..." << __E__;
293  usleep(1000000);
294  }
295 
296  // Cleanup
297  Py_XDECREF(daqinterface_ptr_);
298  daqinterface_ptr_ = NULL;
299  }
300 
301  __SUP_COUT__ << "Flusing printouts" << __E__;
302 
303  //make sure to flush printouts
304  PyRun_SimpleString(R"(
305 import sys
306 sys.stdout = sys.__stdout__
307 sys.stderr = sys.__stderr__
308 )");
309  // stringIO_out_ and stringIO_err_ may refer to borrowed Python objects
310  // (e.g., via PyDict_GetItemString in the tee-buffer path). Do not DECREF
311  // them here to avoid corrupting their reference counts; treat them as
312  // non-owning pointers and clear them instead.
313  stringIO_out_ = nullptr;
314  stringIO_err_ = nullptr;
315 
316  __SUP_COUT__ << "Thread and garbage cleanup" << __E__;
317  // force python thread cleanup:
318  PyRun_SimpleString(
319  "import threading; [t.join() for t in threading.enumerate() if t is not "
320  "threading.main_thread() and not isinstance(t, threading._DummyThread)]");
321  PyRun_SimpleString("import gc; gc.collect()");
322  // __SUP_COUT__ << "Calling Py_Finalize()" << __E__;
323  // Py_Finalize();
324 
325  __SUP_COUT__ << "Destroyed." << __E__;
326 } // end destroy()
327 
328 //==============================================================================
329 void ARTDAQSupervisor::init(void)
330 {
331  stop_runner_();
332 
333  __SUP_COUT__ << "Initializing..." << __E__;
334  {
335  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
336 
337  // allSupervisorInfo_.init(getApplicationContext());
338  artdaq::configureMessageFacility("ARTDAQSupervisor");
339  __SUP_COUT__ << "artdaq MF configured." << __E__;
340 
341  // initialization
342  char* daqinterface_dir = getenv("ARTDAQ_DAQINTERFACE_DIR");
343  if(daqinterface_dir == NULL)
344  {
345  __SS__ << "ARTDAQ_DAQINTERFACE_DIR environment variable not set! This "
346  "means that DAQInterface has not been setup!"
347  << __E__;
348  __SUP_SS_THROW__;
349  }
350  else
351  {
352  __SUP_COUT__ << "Initializing Python" << __E__;
353  Py_Initialize();
354 
355  //setup Python output to tee output to stdout/err and to StringIO buffer "tee_buffer"
356  PyRun_SimpleString(
357  "import sys\n"
358  "from io import StringIO\n"
359  "\n"
360  "class TeeOut:\n"
361  " def __init__(self, real, buf):\n"
362  " self.real = real\n"
363  " self.buf = buf\n"
364  " def write(self, data):\n"
365  " self.real.write(data)\n"
366  " self.buf.write(data)\n"
367  " def flush(self):\n"
368  " self.real.flush()\n"
369  " self.buf.flush()\n"
370  "\n"
371  "tee_buffer = StringIO()\n"
372  "sys.stdout = TeeOut(sys.stdout, tee_buffer)\n"
373  "sys.stderr = TeeOut(sys.stderr, tee_buffer)\n");
374 
375  __SUP_COUT__ << "Adding DAQInterface directory to PYTHON_PATH" << __E__;
376  PyObject* sysPath = PySys_GetObject(
377  (char*)"path"); //do NOT DECREF borrowed objects through GetObject!
378  PyObjectGuard programName(PyUnicode_FromString(daqinterface_dir));
379  PyList_Append(sysPath, programName.get());
380 
381  __SUP_COUT__ << "Creating Module name" << __E__;
382  PyObjectGuard pName(PyUnicode_FromString("rc.control.daqinterface"));
383  /* Error checking of pName left out */
384 
385  __SUP_COUT__ << "Importing module" << __E__;
386  PyObjectGuard pModule(PyImport_Import(pName.get()));
387 
388  if(pModule.get() == NULL)
389  {
390  std::string err = capturePyErr("import rc.control.daqinterface");
391  __SS__ << "Failed to load rc.control.daqinterface. Python Exception: "
392  << err << __E__;
393  __SUP_SS_THROW__;
394  }
395  else
396  {
397  __SUP_COUT__ << "Loading python module dictionary" << __E__;
398  PyObject* pDict = PyModule_GetDict(
399  pModule.get()); //do NOT DECREF borrowed objects through GetDict!
400  if(pDict == NULL)
401  {
402  std::string err = capturePyErr("module dict");
403  __SS__ << "Unable to load module dictionary. Python Exception: "
404  << err << __E__;
405  __SUP_SS_THROW__;
406  }
407  else
408  {
409  __SUP_COUT__ << "Getting DAQInterface object pointer" << __E__;
410  PyObject* di_obj_raw = PyDict_GetItemString(
411  pDict, "DAQInterface"); // borrowed reference
412  if(di_obj_raw == NULL)
413  {
414  std::string err = capturePyErr("DAQInterface lookup");
415  __SS__ << "Unable to find 'DAQInterface' in module dictionary. "
416  "Python Exception: "
417  << err << __E__;
418  __SUP_SS_THROW__;
419  }
420  Py_INCREF(di_obj_raw); // convert borrowed reference to owned
421  PyObjectGuard di_obj_ptr(di_obj_raw);
422 
423  __SUP_COUT__ << "Filling out DAQInterface args struct" << __E__;
424  PyObjectGuard pArgs(PyTuple_New(0));
425 
426  PyObjectGuard kwargs(Py_BuildValue("{s:s, s:s, s:i, s:i, s:s, s:s}",
427  "logpath",
428  ".daqint.log",
429  "name",
430  "DAQInterface",
431  "partition_number",
432  partition_,
433  "rpc_port",
434  DAQINTERFACE_PORT,
435  "rpc_host",
436  "localhost",
437  "control_host",
438  "localhost"));
439 
440  __SUP_COUT__ << "Calling DAQInterface Object Constructor" << __E__;
441 
442  // Get sys and io
443  PyObjectGuard sys(PyImport_ImportModule("sys"));
444  PyObjectGuard io(PyImport_ImportModule("io"));
445 
446  if(0)
447  {
448  //------------- redirect stdout to string
449 
450  // Create StringIO objects for stdout and stderr
451  stringIO_out_ = PyObject_CallMethod(io.get(), "StringIO", NULL);
452  stringIO_err_ = PyObject_CallMethod(io.get(), "StringIO", NULL);
453 
454  // Save originals (not needed, since just keep the redirection until daqinterface_ptr_ is destructed)
455  // PyObject* sys_stdout = PyObject_GetAttrString(sys, "stdout");
456  // PyObject* sys_stderr = PyObject_GetAttrString(sys, "stderr");
457 
458  // Redirect
459  PyObject_SetAttrString(sys.get(), "stdout", stringIO_out_);
460  PyObject_SetAttrString(sys.get(), "stderr", stringIO_err_);
461  //------------- end redirect stdout to string
462  }
463  else //capture tee buffer instead so output to console continues
464  {
465  PyObject* mainmod =
466  PyImport_AddModule("__main__"); // borrowed ref
467  PyObject* globals = PyModule_GetDict(mainmod); // borrowed ref
468 
469  stringIO_out_ =
470  PyDict_GetItemString(globals, "tee_buffer"); // borrowed ref
471 
472  // Do not Py_DECREF borrowed references.
473  }
474 
475  daqinterface_ptr_ =
476  PyObject_Call(di_obj_ptr.get(), pArgs.get(), kwargs.get());
477  if(checkPythonError(daqinterface_ptr_))
478  {
479  std::string err = capturePyErr("DAQInterface constructor");
480  __SS__ << "DAQInterface constructor failed. Python Exception: "
481  << err << __E__;
482  __SUP_SS_THROW__;
483  }
484 
485  if(0) //example printout handling
486  {
487  // Force an error
488  PyObjectGuard bad(
489  PyObject_CallMethod(sys.get(), "does_not_exist", NULL));
490  if(!bad.get())
491  PyErr_Print(); // <-- this writes into stringIO_err_, not the terminal
492 
493  // Grab stderr contents
494  PyObjectGuard err_text(
495  PyObject_CallMethod(stringIO_err_, "getvalue", NULL));
496  if(err_text.get())
497  __COUT__ << "Captured stderr:\n"
498  << PyUnicode_AsUTF8(err_text.get()) << "\n";
499  else
500  __COUT__ << "Capture of stderr failed.";
501  } //end example printout handling
502  }
503  }
504  }
505 
506  getDAQState_();
507 
508  // { //attempt to cleanup old artdaq processes DOES NOT WORK because artdaq interface knows it hasn't started
509  // __SUP_COUT__ << "Attempting artdaq stale cleanup..." << __E__;
510  // std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
511  // getDAQState_();
512  // __SUP_COUT__ << "Status before cleanup: " << daqinterface_state_ << __E__;
513 
514  // PyObjectGuard pName(PyUnicode_FromString("do_recover"));
515  // PyObjectGuard res(PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL));
516  // __COUT_MULTI_LBL__(0,captureStderrAndStdout_("do_recover"),"do_recover");
517 
518  // if(res == NULL)
519  // {
520  // std::string err = capturePyErr("do_recover");
521  // __SS__ << "Error with clean up calling do_recover: " << err << __E__;
522  // __SUP_SS_THROW__;
523  // }
524  // getDAQState_();
525  // __SUP_COUT__ << "Status after cleanup: " << daqinterface_state_ << __E__;
526  // __SUP_COUT__ << "cleanup DONE." << __E__;
527  // }
528  }
529  start_runner_();
530  __SUP_COUT__ << "Initialized." << __E__;
531 } // end init()
532 
533 //==============================================================================
534 void ARTDAQSupervisor::transitionConfiguring(toolbox::Event::Reference /*event*/)
535 {
536  __SUP_COUT__ << "transitionConfiguring" << __E__;
537 
538  // activate the configuration tree (the first iteration)
539  if(RunControlStateMachine::getIterationIndex() == 0 &&
540  RunControlStateMachine::getSubIterationIndex() == 0)
541  {
542  thread_error_message_ = "";
543  thread_progress_bar_.resetProgressBar(0);
544  last_thread_progress_update_ = time(0); // initialize timeout timer
545 
546  CoreSupervisorBase::configureInit();
547 
548  // start configuring thread
549  std::thread(&ARTDAQSupervisor::configuringThread, this).detach();
550 
551  __SUP_COUT__ << "Configuring thread started." << __E__;
552 
553  RunControlStateMachine::
554  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
555  }
556  else // not first time
557  {
558  std::string errorMessage;
559  {
560  std::lock_guard<std::mutex> lock(
561  thread_mutex_); // lock out for remainder of scope
562  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
563  }
564  int progress = thread_progress_bar_.read();
565  __SUP_COUTVS__(2, errorMessage);
566  __SUP_COUTVS__(2, progress);
567  __SUP_COUTVS__(2, thread_progress_bar_.isComplete());
568 
569  // check for done and error messages
570  if(errorMessage == "" && // if no update in 600 seconds, give up
571  time(0) - last_thread_progress_update_ > 600)
572  {
573  __SUP_SS__ << "There has been no update from the configuration thread for "
574  << (time(0) - last_thread_progress_update_)
575  << " seconds, assuming something is wrong and giving up! "
576  << "Last progress received was " << progress << __E__;
577  errorMessage = ss.str();
578  }
579 
580  if(errorMessage != "")
581  {
582  __SUP_SS__ << "Error was caught in configuring thread: " << errorMessage
583  << __E__;
584  __SUP_COUT_ERR__ << "\n" << ss.str();
585 
586  theStateMachine_.setErrorMessage(ss.str());
587  throw toolbox::fsm::exception::Exception(
588  "Transition Error" /*name*/,
589  ss.str() /* message*/,
590  "CoreSupervisorBase::transitionConfiguring" /*module*/,
591  __LINE__ /*line*/,
592  __FUNCTION__ /*function*/
593  );
594  }
595 
596  if(!thread_progress_bar_.isComplete())
597  {
598  __SUP_COUT__ << "Not done yet..." << __E__;
599  //attempt to get live view of python output
600  // __COUT_MULTI_LBL__(0, captureStderrAndStdout_("statuscheck"), "statuscheck");
601 
602  RunControlStateMachine::
603  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
604 
605  if(last_thread_progress_read_ != progress)
606  {
607  last_thread_progress_read_ = progress;
608  last_thread_progress_update_ = time(0);
609  }
610 
611  sleep(1 /*seconds*/);
612  }
613  else
614  {
615  __SUP_COUT_INFO__ << "Complete configuring transition!" << __E__;
616  __SUP_COUTV__(getProcessInfo_());
617  }
618  }
619 
620  return;
621 } // end transitionConfiguring()
622 
623 //==============================================================================
624 void ARTDAQSupervisor::configuringThread()
625 try
626 {
627  std::string uid = theConfigurationManager_
628  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
629  "/" + CorePropertySupervisorBase::getSupervisorUID() +
630  "/" + "LinkToSupervisorTable")
631  .getValueAsString();
632 
633  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
634 
635  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
636 
637  ConfigurationTree theSupervisorNode = getSupervisorTableNode();
638 
639  thread_progress_bar_.step();
640 
641  set_thread_message_("ConfigGen");
642 
643  auto info = ARTDAQTableBase::extractARTDAQInfo(
644  theSupervisorNode,
645  false /*getStatusFalseNodes*/,
646  true /*doWriteFHiCL*/,
647  getSupervisorProperty("max_fragment_size_bytes", 8888),
648  getSupervisorProperty("routing_timeout_ms", 1999),
649  getSupervisorProperty("routing_retry_count", 12),
650  &thread_progress_bar_);
651 
652  // Check lists
653  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::BoardReader) == 0)
654  {
655  __GEN_SS__ << "There must be at least one enabled BoardReader!" << __E__;
656  __GEN_SS_THROW__;
657  }
658  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::EventBuilder) == 0)
659  {
660  __GEN_SS__ << "There must be at least one enabled EventBuilder!" << __E__;
661  __GEN_SS_THROW__;
662  }
663 
664  thread_progress_bar_.step();
665  set_thread_message_("Writing boot.txt");
666 
667  __GEN_COUT__ << "Writing boot.txt" << __E__;
668 
669  int debugLevel = theSupervisorNode.getNode("DAQInterfaceDebugLevel").getValue<int>();
670  std::string setupScript = theSupervisorNode.getNode("DAQSetupScript").getValue();
671 
672  // Generate boot file content using helper function
673  std::string bootContent =
674  ARTDAQTableBase::getBootFileContentFromInfo(info, setupScript, debugLevel);
675 
676  // Populate label_to_proc_type_map_ (still needed for later)
677  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
678  label_to_proc_type_map_[builder.label] = "EventBuilder";
679  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
680  label_to_proc_type_map_[logger.label] = "DataLogger";
681  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
682  label_to_proc_type_map_[dispatcher.label] = "Dispatcher";
683  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
684  label_to_proc_type_map_[rmanager.label] = "RoutingManager";
685 
686  // Write boot.txt file
687  std::ofstream o(ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt", std::ios::trunc);
688  o << bootContent;
689  o.close();
690 
691  // TODO: To save to runlog, store bootContent in metadata/configuration archive
692  // Example (add when implementing runlog integration):
693  // saveToRunlog("boot.txt", bootContent, run_number);
694 
695  thread_progress_bar_.step();
696  set_thread_message_("Writing Fhicl Files");
697 
698  __GEN_COUT__ << "Building configuration directory" << __E__;
699 
700  boost::system::error_code ignored;
701  boost::filesystem::remove_all(ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME,
702  ignored);
703  mkdir((ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME).c_str(), 0755);
704 
705  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
706  {
707  symlink(ARTDAQTableBase::getFlatFHICLFilename(
708  ARTDAQTableBase::ARTDAQAppType::BoardReader, reader.label)
709  .c_str(),
710  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
711  reader.label + ".fcl")
712  .c_str());
713  }
714  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
715  {
716  symlink(ARTDAQTableBase::getFlatFHICLFilename(
717  ARTDAQTableBase::ARTDAQAppType::EventBuilder, builder.label)
718  .c_str(),
719  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
720  builder.label + ".fcl")
721  .c_str());
722  }
723  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
724  {
725  symlink(ARTDAQTableBase::getFlatFHICLFilename(
726  ARTDAQTableBase::ARTDAQAppType::DataLogger, logger.label)
727  .c_str(),
728  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
729  logger.label + ".fcl")
730  .c_str());
731  }
732  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
733  {
734  symlink(ARTDAQTableBase::getFlatFHICLFilename(
735  ARTDAQTableBase::ARTDAQAppType::Dispatcher, dispatcher.label)
736  .c_str(),
737  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
738  dispatcher.label + ".fcl")
739  .c_str());
740  }
741  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
742  {
743  symlink(ARTDAQTableBase::getFlatFHICLFilename(
744  ARTDAQTableBase::ARTDAQAppType::RoutingManager, rmanager.label)
745  .c_str(),
746  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
747  rmanager.label + ".fcl")
748  .c_str());
749  }
750 
751  thread_progress_bar_.step();
752 
753  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
754  getDAQState_();
755  if(daqinterface_state_ != "stopped" && daqinterface_state_ != "")
756  {
757  __GEN_SS__ << "Cannot configure DAQInterface because it is in the wrong state"
758  << " (" << daqinterface_state_ << " != stopped)!" << __E__;
759  __GEN_SS_THROW__
760  }
761 
762  set_thread_message_("Calling setdaqcomps");
763  __GEN_COUT__ << "Calling setdaqcomps" << __E__;
764  __GEN_COUT__ << "Status before setdaqcomps: " << daqinterface_state_ << __E__;
765  if(daqinterface_ptr_ == nullptr)
766  {
767  __GEN_SS__ << "DAQInterface is not initialized. "
768  "Check earlier Python import/constructor errors (e.g. syntax) "
769  "in DAQInterface."
770  << __E__;
771  __GEN_SS_THROW__;
772  }
773  PyObjectGuard pName1(PyUnicode_FromString("setdaqcomps"));
774 
775  PyObjectGuard readerDict(PyDict_New());
776  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
777  {
778  // PyDict_SetItem INCREFs key/value, so use PyObjectGuard to manage references
779  label_to_proc_type_map_[reader.label] = "BoardReader";
780  PyObjectGuard readerName(PyUnicode_FromString(reader.label.c_str()));
781 
782  int list_size = reader.allowed_processors != "" ? 4 : 3;
783 
784  PyObjectGuard readerData(PyList_New(list_size));
785  PyObject* readerHost = PyUnicode_FromString(reader.hostname.c_str());
786  PyObject* readerPort = PyUnicode_FromString("-1");
787  PyObject* readerSubsystem =
788  PyUnicode_FromString(std::to_string(reader.subsystem).c_str());
789  PyList_SetItem(readerData.get(), 0, readerHost);
790  PyList_SetItem(readerData.get(), 1, readerPort);
791  PyList_SetItem(readerData.get(), 2, readerSubsystem);
792  if(reader.allowed_processors != "")
793  {
794  PyObject* readerAllowedProcessors =
795  PyUnicode_FromString(reader.allowed_processors.c_str());
796  PyList_SetItem(readerData.get(), 3, readerAllowedProcessors);
797  }
798  PyDict_SetItem(readerDict.get(), readerName.get(), readerData.get());
799  }
800  PyObjectGuard res1(PyObject_CallMethodObjArgs(
801  daqinterface_ptr_, pName1.get(), readerDict.get(), NULL));
802  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("setdaqcomps"), "setdaqcomps");
803 
804  if(checkPythonError(res1.get()))
805  {
806  std::string err_msg = capturePyErr("setdaqcomps");
807  __GEN_SS__ << "Error calling setdaqcomps: " << err_msg << __E__;
808  __GEN_SS_THROW__;
809  }
810 
811  getDAQState_();
812  __GEN_COUT__ << "Status after setdaqcomps: " << daqinterface_state_ << __E__;
813 
814  thread_progress_bar_.step();
815  set_thread_message_("Calling do_boot");
816  __GEN_COUT_INFO__ << "Calling do_boot" << __E__;
817  __GEN_COUT__ << "Status before boot: " << daqinterface_state_ << __E__;
818 
819  // 1. Create Python Strings (Must DECREF later)
820  PyObjectGuard pNameBoot(PyUnicode_FromString("do_boot"));
821  PyObjectGuard pBootArgs(
822  PyUnicode_FromString((ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt").c_str()));
823 
824  // 2. First Attempt: Call do_boot
825  PyObjectGuard resBoot1(PyObject_CallMethodObjArgs(
826  daqinterface_ptr_, pNameBoot.get(), pBootArgs.get(), NULL));
827 
828  std::string doBootOutput = captureStderrAndStdout_("do_boot");
829  __COUT_MULTI_LBL__(0, doBootOutput, "do_boot");
830 
831  if(checkPythonError(resBoot1.get()))
832  {
833  // --- FAILURE PATH ---
834 
835  std::string err1 = capturePyErr("do_boot");
836 
837  __GEN_COUT_INFO__ << "Error on first boot attempt: " << err1
838  << ". Recovering and retrying..." << __E__;
839 
840  // B. Attempt 'do_recover'
841  PyObjectGuard pNameRecover(PyUnicode_FromString("do_recover"));
842  PyObjectGuard resRecover(
843  PyObject_CallMethodObjArgs(daqinterface_ptr_, pNameRecover.get(), NULL));
844  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_recover"), "do_recover");
845 
846  if(checkPythonError(resRecover.get()))
847  {
848  // Recover failed - Critical Error
849  std::string errRec = capturePyErr("do_recover");
850 
851  std::stringstream oss;
852  oss << "Error calling recover transition!!!! " << errRec;
853  if(doBootOutput.size() > OUT_ON_ERR_SIZE)
854  oss << "... last " << OUT_ON_ERR_SIZE
855  << " chars: " << doBootOutput.substr(doBootOutput.size() - 1000);
856  else
857  oss << doBootOutput;
858 
859  // Clean up original args before throwing
860  __GEN_SS__ << oss.str() << __E__;
861  __GEN_SS_THROW__;
862  }
863 
864  // C. Retry 'do_boot'
865  thread_progress_bar_.step();
866  set_thread_message_("Calling do_boot (retry)");
867  __GEN_COUT_INFO__ << "Calling do_boot again" << __E__;
868 
869  // Reuse pNameBoot and pBootArgs (valid until we DECREF them at the very end)
870  PyObjectGuard resBoot2(PyObject_CallMethodObjArgs(
871  daqinterface_ptr_, pNameBoot.get(), pBootArgs.get(), NULL));
872 
873  doBootOutput = captureStderrAndStdout_("do_boot (retry)");
874  __COUT_MULTI_LBL__(0, doBootOutput, "do_boot (retry)");
875 
876  if(checkPythonError(resBoot2.get()))
877  {
878  // Second boot failed
879  std::string err2 = capturePyErr("do_boot retry");
880 
881  std::stringstream oss;
882  oss << "Error calling boot transition (2nd try): " << err2;
883  if(doBootOutput.size() > OUT_ON_ERR_SIZE)
884  oss << "... last " << OUT_ON_ERR_SIZE
885  << " chars: " << doBootOutput.substr(doBootOutput.size() - 1000);
886  else
887  oss << doBootOutput;
888 
889  __GEN_SS__ << oss.str() << __E__;
890  __GEN_SS_THROW__;
891  }
892  }
893 
894  getDAQState_();
895  if(daqinterface_state_ != "booted")
896  {
897  std::cout << "Do boot output on error: \n" << doBootOutput << __E__;
898  __GEN_SS__ << "DAQInterface boot transition failed! "
899  << "Status after boot attempt: " << daqinterface_state_ << __E__;
900 
901  if(doBootOutput.size() > OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
902  ss << "... last " << OUT_ON_ERR_SIZE
903  << " characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
904  else
905  ss << doBootOutput;
906  __GEN_SS_THROW__;
907  }
908  __GEN_COUT__ << "Status after boot: " << daqinterface_state_ << __E__;
909 
910  thread_progress_bar_.step();
911  set_thread_message_("Calling do_config");
912  __GEN_COUT_INFO__ << "Calling do_config" << __E__;
913  __GEN_COUT__ << "Status before config: " << daqinterface_state_ << __E__;
914  std::string doConfigOutput = "";
915  { //do_config call
916  // RAII wrapper for Python objects to ensure cleanup even on exception
917 
918  PyObjectGuard pName3(PyUnicode_FromString("do_config"));
919  // 2. Create the argument - list containing config name: ["my_config"]
920  PyObjectGuard pArg(Py_BuildValue("[s]", FAKE_CONFIG_NAME));
921 
922  // 3. Call the method
923  PyObjectGuard res3(PyObject_CallMethodObjArgs(
924  daqinterface_ptr_, pName3.get(), pArg.get(), NULL));
925 
926  // 4. Check for errors FIRST before capturing output (which might clear error state)
927  if(checkPythonError(res3.get()))
928  {
929  // Get the error message before doing anything else
930  std::string err = capturePyErr("do_config");
931 
932  // Now capture output for diagnostics
933  doConfigOutput = captureStderrAndStdout_("do_config");
934 
935  __GEN_SS__ << "Error calling config transition: " << err << __E__;
936  __GEN_SS_THROW__;
937  }
938 
939  // 5. Success path - capture output
940  doConfigOutput = captureStderrAndStdout_("do_config");
941  __COUT_MULTI_LBL__(0, doConfigOutput, "do_config");
942 
943  // 6. Success Handling (Safe conversion to string)
944  // We use PyObject_Str to safely convert any return type (None, Int, String) to text
945  PyObjectGuard strRes(PyObject_Str(res3.get()));
946  const char* res_cstr = "";
947  if(strRes.get())
948  {
949  res_cstr = PyUnicode_AsUTF8(strRes.get());
950  }
951 
952  __SUP_COUTT__ << "do_config result=" << (res_cstr ? res_cstr : "N/A") << __E__;
953  } //end do_config call
954 
955  getDAQState_();
956  if(daqinterface_state_ != "ready")
957  {
958  __GEN_SS__ << "DAQInterface config transition failed!" << __E__
959  << "Supervisor state: \"" << daqinterface_state_ << "\" != \"ready\" "
960  << __E__;
961  auto doConfigOutput_recover_i =
962  doConfigOutput.find("RECOVER transition underway");
963  if(doConfigOutput_recover_i == std::string::npos)
964  ss << doConfigOutput;
965  else if(doConfigOutput_recover_i >
966  OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
967  ss << "... tail of " << OUT_ON_ERR_SIZE << " characters before recovery: "
968  << doConfigOutput.substr(
969  doConfigOutput_recover_i - OUT_ON_ERR_SIZE +
970  std::string("RECOVER transition underway").size(),
971  OUT_ON_ERR_SIZE);
972  else
973  ss << doConfigOutput.substr(
974  0,
975  doConfigOutput_recover_i +
976  std::string("RECOVER transition underway").size());
977  __GEN_SS_THROW__;
978  }
979  __GEN_COUT__ << "Status after config: " << daqinterface_state_ << __E__;
980  thread_progress_bar_.complete();
981  set_thread_message_("Configured");
982  __GEN_COUT_INFO__ << "Configured." << __E__;
983 
984 } // end configuringThread()
985 catch(const std::runtime_error& e)
986 {
987  set_thread_message_("ERROR");
988  __SS__ << "Error was caught while configuring: " << e.what() << __E__;
989  __COUT_ERR__ << "\n" << ss.str();
990  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
991  thread_error_message_ = ss.str();
992 }
993 catch(...)
994 {
995  set_thread_message_("ERROR");
996  __SS__ << "Unknown error was caught while configuring. Please checked the logs."
997  << __E__;
998  __COUT_ERR__ << "\n" << ss.str();
999 
1000  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1001 
1002  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1003  thread_error_message_ = ss.str();
1004 } // end configuringThread() error handling
1005 
1006 //==============================================================================
1007 void ARTDAQSupervisor::transitionHalting(toolbox::Event::Reference /*event*/)
1008 try
1009 {
1010  set_thread_message_("Halting");
1011  __SUP_COUT__ << "Halting..." << __E__;
1012 
1013  int tries = 0;
1014  while(tries++ < 5)
1015  {
1016  std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1017  std::try_to_lock);
1018  if(!lk.owns_lock()) //if lock not availabe, just report last status
1019  {
1020  __COUTS__(50) << "Do not have python lock for halt. tries=" << tries << __E__;
1021  sleep(1);
1022  continue;
1023  }
1024  __COUTS__(50) << "Have python lock!" << __E__;
1025 
1026  // std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1027  getDAQState_();
1028  __SUP_COUT__ << "Status before halt: " << daqinterface_state_ << __E__;
1029 
1030  if(daqinterface_state_ == "running")
1031  {
1032  // First stop before halting
1033  PyObjectGuard pName(PyUnicode_FromString("do_stop_running"));
1034  PyObjectGuard res(
1035  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1036  __COUT_MULTI_LBL__(
1037  0, captureStderrAndStdout_("do_stop_running"), "do_stop_running");
1038 
1039  if(res.get() == NULL)
1040  {
1041  std::string err = capturePyErr();
1042  __SS__ << "Error calling DAQ Interface stop transition: " << err
1043  << __E__;
1044  __SUP_SS_THROW__;
1045  }
1046  }
1047 
1048  PyObjectGuard pName(PyUnicode_FromString("do_command"));
1049  PyObjectGuard pArg(PyUnicode_FromString("Shutdown"));
1050  PyObjectGuard res(
1051  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1052  __COUT_MULTI_LBL__(
1053  0, captureStderrAndStdout_("do_command Shutdown"), "do_command Shutdown");
1054 
1055  if(checkPythonError(res.get()))
1056  {
1057  std::string err = capturePyErr("do_command Shutdown");
1058  __SS__ << "Error calling DAQ Interface halt transition: " << err << __E__;
1059  __SUP_SS_THROW__;
1060  }
1061 
1062  getDAQState_();
1063  __SUP_COUT__ << "Status after halt: " << daqinterface_state_ << __E__;
1064  break;
1065  } //end retry loop
1066 
1067  __SUP_COUT__ << "Halted." << __E__;
1068  set_thread_message_("Halted");
1069 } // end transitionHalting()
1070 catch(const std::runtime_error& e)
1071 {
1072  const std::string transitionName = "Halting";
1073  // if halting from Failed state, then ignore errors
1074  if(theStateMachine_.getProvenanceStateName() ==
1075  RunControlStateMachine::FAILED_STATE_NAME ||
1076  theStateMachine_.getProvenanceStateName() ==
1077  RunControlStateMachine::HALTED_STATE_NAME)
1078  {
1079  __SUP_COUT_INFO__ << "Error was caught while halting (but ignoring because "
1080  "previous state was '"
1081  << RunControlStateMachine::FAILED_STATE_NAME
1082  << "'): " << e.what() << __E__;
1083  }
1084  else // if not previously in Failed state, then fail
1085  {
1086  __SUP_SS__ << "Error was caught while " << transitionName << ": " << e.what()
1087  << __E__;
1088  __SUP_COUT_ERR__ << "\n" << ss.str();
1089  theStateMachine_.setErrorMessage(ss.str());
1090  throw toolbox::fsm::exception::Exception(
1091  "Transition Error" /*name*/,
1092  ss.str() /* message*/,
1093  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
1094  __LINE__ /*line*/,
1095  __FUNCTION__ /*function*/
1096  );
1097  }
1098 } // end transitionHalting() std::runtime_error exception handling
1099 catch(...)
1100 {
1101  const std::string transitionName = "Halting";
1102  // if halting from Failed state, then ignore errors
1103  if(theStateMachine_.getProvenanceStateName() ==
1104  RunControlStateMachine::FAILED_STATE_NAME ||
1105  theStateMachine_.getProvenanceStateName() ==
1106  RunControlStateMachine::HALTED_STATE_NAME)
1107  {
1108  __SUP_COUT_INFO__ << "Unknown error was caught while halting (but ignoring "
1109  "because previous state was '"
1110  << RunControlStateMachine::FAILED_STATE_NAME << "')." << __E__;
1111  }
1112  else // if not previously in Failed state, then fail
1113  {
1114  __SUP_SS__ << "Unknown error was caught while " << transitionName
1115  << ". Please checked the logs." << __E__;
1116  __SUP_COUT_ERR__ << "\n" << ss.str();
1117  theStateMachine_.setErrorMessage(ss.str());
1118 
1119  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1120 
1121  throw toolbox::fsm::exception::Exception(
1122  "Transition Error" /*name*/,
1123  ss.str() /* message*/,
1124  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
1125  __LINE__ /*line*/,
1126  __FUNCTION__ /*function*/
1127  );
1128  }
1129 } // end transitionHalting() exception handling
1130 
1131 //==============================================================================
1132 void ARTDAQSupervisor::transitionInitializing(toolbox::Event::Reference /*event*/)
1133 try
1134 {
1135  set_thread_message_("Initializing");
1136  __SUP_COUT__ << "Initializing..." << __E__;
1137  init();
1138  __SUP_COUT__ << "Initialized." << __E__;
1139  set_thread_message_("Initialized");
1140 } // end transitionInitializing()
1141 catch(const std::runtime_error& e)
1142 {
1143  __SS__ << "Error was caught while Initializing: " << e.what() << __E__;
1144  __SS_THROW__;
1145 }
1146 catch(...)
1147 {
1148  __SS__ << "Unknown error was caught while Initializing. Please checked the logs."
1149  << __E__;
1150  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1151  __SS_THROW__;
1152 } // end transitionInitializing() error handling
1153 
1154 //==============================================================================
1155 void ARTDAQSupervisor::transitionPausing(toolbox::Event::Reference /*event*/)
1156 try
1157 {
1158  set_thread_message_("Pausing");
1159  __SUP_COUT__ << "Pausing..." << __E__;
1160  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1161 
1162  getDAQState_();
1163  __SUP_COUT__ << "Status before pause: " << daqinterface_state_ << __E__;
1164 
1165  PyObjectGuard pName(PyUnicode_FromString("do_command"));
1166  PyObjectGuard pArg(PyUnicode_FromString("Pause"));
1167  PyObjectGuard res(
1168  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1169  __COUT_MULTI_LBL__(
1170  0, captureStderrAndStdout_("do_command Pause"), "do_command Pause");
1171 
1172  if(checkPythonError(res.get()))
1173  {
1174  std::string err = capturePyErr("do_command Pause");
1175  __SS__ << "Error calling DAQ Interface Pause transition: " << err << __E__;
1176  __SUP_SS_THROW__;
1177  }
1178 
1179  getDAQState_();
1180  __SUP_COUT__ << "Status after pause: " << daqinterface_state_ << __E__;
1181 
1182  __SUP_COUT__ << "Paused." << __E__;
1183  set_thread_message_("Paused");
1184 } // end transitionPausing()
1185 catch(const std::runtime_error& e)
1186 {
1187  __SS__ << "Error was caught while Pausing: " << e.what() << __E__;
1188  __SS_THROW__;
1189 }
1190 catch(...)
1191 {
1192  __SS__ << "Unknown error was caught while Pausing. Please checked the logs." << __E__;
1193  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1194  __SS_THROW__;
1195 } // end transitionPausing() error handling
1196 
1197 //==============================================================================
1198 void ARTDAQSupervisor::transitionResuming(toolbox::Event::Reference /*event*/)
1199 try
1200 {
1201  set_thread_message_("Resuming");
1202  __SUP_COUT__ << "Resuming..." << __E__;
1203  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1204 
1205  getDAQState_();
1206  __SUP_COUT__ << "Status before resume: " << daqinterface_state_ << __E__;
1207  PyObjectGuard pName(PyUnicode_FromString("do_command"));
1208  PyObjectGuard pArg(PyUnicode_FromString("Resume"));
1209  PyObjectGuard res(
1210  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1211  __COUT_MULTI_LBL__(
1212  0, captureStderrAndStdout_("do_command Resume"), "do_command Resume");
1213 
1214  if(checkPythonError(res.get()))
1215  {
1216  std::string err = capturePyErr("do_command Resume");
1217  __SS__ << "Error calling DAQ Interface Resume transition: " << err << __E__;
1218  __SUP_SS_THROW__;
1219  }
1220 
1221  getDAQState_();
1222  __SUP_COUT__ << "Status after resume: " << daqinterface_state_ << __E__;
1223  __SUP_COUT__ << "Resumed." << __E__;
1224  set_thread_message_("Resumed");
1225 } // end transitionResuming()
1226 catch(const std::runtime_error& e)
1227 {
1228  __SS__ << "Error was caught while Resuming: " << e.what() << __E__;
1229  __SS_THROW__;
1230 }
1231 catch(...)
1232 {
1233  __SS__ << "Unknown error was caught while Resuming. Please checked the logs."
1234  << __E__;
1235  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1236  __SS_THROW__;
1237 } // end transitionResuming() error handling
1238 
1239 //==============================================================================
1240 void ARTDAQSupervisor::transitionStarting(toolbox::Event::Reference /*event*/)
1241 try
1242 {
1243  __SUP_COUT__ << "transitionStarting" << __E__;
1244 
1245  // first time launch thread because artdaq Supervisor may take a while
1246  if(RunControlStateMachine::getIterationIndex() == 0 &&
1247  RunControlStateMachine::getSubIterationIndex() == 0)
1248  {
1249  thread_error_message_ = "";
1250  thread_progress_bar_.resetProgressBar(0);
1251  last_thread_progress_update_ = time(0); // initialize timeout timer
1252 
1253  // start configuring thread
1254  std::thread(&ARTDAQSupervisor::startingThread, this).detach();
1255 
1256  __SUP_COUT_INFO__ << "Starting thread started." << __E__;
1257 
1258  RunControlStateMachine::
1259  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1260  }
1261  else // not first time
1262  {
1263  std::string errorMessage;
1264  {
1265  std::lock_guard<std::mutex> lock(
1266  thread_mutex_); // lock out for remainder of scope
1267  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
1268  }
1269  int progress = thread_progress_bar_.read();
1270  __SUP_COUTV__(errorMessage);
1271  __SUP_COUTV__(progress);
1272  __SUP_COUTV__(thread_progress_bar_.isComplete());
1273 
1274  // check for done and error messages
1275  if(errorMessage == "" && // if no update in 600 seconds, give up
1276  time(0) - last_thread_progress_update_ > 600)
1277  {
1278  __SUP_SS__ << "There has been no update from the start thread for "
1279  << (time(0) - last_thread_progress_update_)
1280  << " seconds, assuming something is wrong and giving up! "
1281  << "Last progress received was " << progress << __E__;
1282  errorMessage = ss.str();
1283  }
1284 
1285  if(errorMessage != "")
1286  {
1287  __SUP_SS__ << "Error was caught in starting thread: " << errorMessage
1288  << __E__;
1289  __SUP_COUT_ERR__ << "\n" << ss.str();
1290 
1291  theStateMachine_.setErrorMessage(ss.str());
1292  throw toolbox::fsm::exception::Exception(
1293  "Transition Error" /*name*/,
1294  ss.str() /* message*/,
1295  "CoreSupervisorBase::transitionStarting" /*module*/,
1296  __LINE__ /*line*/,
1297  __FUNCTION__ /*function*/
1298  );
1299  }
1300 
1301  if(!thread_progress_bar_.isComplete())
1302  {
1303  __SUP_COUT__ << "Not done yet..." << __E__;
1304  //attempt to get live view of python output (not working and not needed with new Tee Buffer solution)
1305  // __COUT_MULTI_LBL__(0, captureStderrAndStdout_("statuscheck"), "statuscheck");
1306 
1307  RunControlStateMachine::
1308  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1309 
1310  if(last_thread_progress_read_ != progress)
1311  {
1312  last_thread_progress_read_ = progress;
1313  last_thread_progress_update_ = time(0);
1314  }
1315 
1316  sleep(1 /*seconds*/);
1317  }
1318  else
1319  {
1320  __SUP_COUT_INFO__ << "Starting transition completed!" << __E__;
1321  __SUP_COUTV__(getProcessInfo_());
1322  }
1323  }
1324 
1325  return;
1326 
1327 } // end transitionStarting()
1328 catch(const std::runtime_error& e)
1329 {
1330  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1331  __SS_THROW__;
1332 }
1333 catch(...)
1334 {
1335  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1336  << __E__;
1337  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1338  __SS_THROW__;
1339 } // end transitionStarting() error handling
1340 
1341 //==============================================================================
1342 void ARTDAQSupervisor::startingThread()
1343 try
1344 {
1345  std::string uid = theConfigurationManager_
1346  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
1347  "/" + CorePropertySupervisorBase::getSupervisorUID() +
1348  "/" + "LinkToSupervisorTable")
1349  .getValueAsString();
1350 
1351  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
1352  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
1353  __GEN_COUT__ << "Starting..." << __E__;
1354  set_thread_message_("Starting");
1355 
1356  thread_progress_bar_.step();
1357  stop_runner_();
1358  {
1359  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1360  getDAQState_();
1361  __GEN_COUT__ << "Status before start: " << daqinterface_state_ << __E__;
1362  auto runNumber = SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
1363  .getParameters()
1364  .getValue("RunNumber");
1365 
1366  thread_progress_bar_.step();
1367 
1368  __GEN_COUT_INFO__ << "Calling do_start_running" << __E__;
1369  PyObjectGuard pName(PyUnicode_FromString("do_start_running"));
1370  int run_number = std::stoi(runNumber);
1371  PyObjectGuard pStateArgs(PyLong_FromLong(run_number));
1372  PyObjectGuard res(PyObject_CallMethodObjArgs(
1373  daqinterface_ptr_, pName.get(), pStateArgs.get(), NULL));
1374  __COUT_MULTI_LBL__(
1375  0, captureStderrAndStdout_("do_start_running"), "do_start_running");
1376 
1377  thread_progress_bar_.step();
1378 
1379  if(res.get() == NULL)
1380  {
1381  std::string err = capturePyErr();
1382  __SS__ << "Error calling start transition: " << err << __E__;
1383  __GEN_SS_THROW__;
1384  }
1385  getDAQState_();
1386 
1387  thread_progress_bar_.step();
1388 
1389  __GEN_COUT__ << "Status after start: " << daqinterface_state_ << __E__;
1390  if(daqinterface_state_ != "running")
1391  {
1392  __SS__ << "DAQInterface start transition failed!" << __E__;
1393  __GEN_SS_THROW__;
1394  }
1395 
1396  thread_progress_bar_.step();
1397  }
1398  start_runner_();
1399  set_thread_message_("Started");
1400  thread_progress_bar_.step();
1401 
1402  __GEN_COUT_INFO__ << "Started." << __E__;
1403  thread_progress_bar_.complete();
1404 
1405 } // end startingThread()
1406 catch(const std::runtime_error& e)
1407 {
1408  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1409  __COUT_ERR__ << "\n" << ss.str();
1410  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1411  thread_error_message_ = ss.str();
1412 }
1413 catch(...)
1414 {
1415  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1416  << __E__;
1417  __COUT_ERR__ << "\n" << ss.str();
1418 
1419  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1420 
1421  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1422  thread_error_message_ = ss.str();
1423 } // end startingThread() error handling
1424 
1425 //==============================================================================
1426 void ARTDAQSupervisor::transitionStopping(toolbox::Event::Reference /*event*/)
1427 try
1428 {
1429  __SUP_COUT__ << "Stopping..." << __E__;
1430  set_thread_message_("Stopping");
1431  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1432  getDAQState_();
1433  __SUP_COUT__ << "Status before stop: " << daqinterface_state_ << __E__;
1434  PyObjectGuard pName(PyUnicode_FromString("do_stop_running"));
1435  PyObjectGuard res(PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1436  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_stop_running"), "do_stop_running");
1437 
1438  if(checkPythonError(res.get()))
1439  {
1440  std::string err = capturePyErr("do_stop_running");
1441  __SS__ << "Error calling DAQ Interface stop transition: " << err << __E__;
1442  __SUP_SS_THROW__;
1443  }
1444  getDAQState_();
1445  __SUP_COUT__ << "Status after stop: " << daqinterface_state_ << __E__;
1446  __SUP_COUT__ << "Stopped." << __E__;
1447  set_thread_message_("Stopped");
1448 } // end transitionStopping()
1449 catch(const std::runtime_error& e)
1450 {
1451  __SS__ << "Error was caught while Stopping: " << e.what() << __E__;
1452  __SS_THROW__;
1453 }
1454 catch(...)
1455 {
1456  __SS__ << "Unknown error was caught while Stopping. Please checked the logs."
1457  << __E__;
1458  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1459  __SS_THROW__;
1460 } // end transitionStopping() error handling
1461 
1462 //==============================================================================
1463 void ots::ARTDAQSupervisor::enteringError(toolbox::Event::Reference /*event*/)
1464 {
1465  __SUP_COUT__ << "Entering error recovery state" << __E__;
1466  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1467  getDAQState_();
1468  __SUP_COUT__ << "Status before error: " << daqinterface_state_ << __E__;
1469 
1470  PyObjectGuard pName(PyUnicode_FromString("do_recover"));
1471  PyObjectGuard res(PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1472  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_recover"), "do_recover");
1473 
1474  if(checkPythonError(res.get()))
1475  {
1476  std::string err = capturePyErr("do_recover");
1477  //do not throw exception when entering error, because failing DAQ interface could be the reason for error in first place
1478  __SUP_COUT_WARN__ << "Error calling DAQ Interface recover transition: " << err
1479  << __E__;
1480  return;
1481  }
1482 
1483  getDAQState_();
1484  __SUP_COUT__ << "Status after error: " << daqinterface_state_ << __E__;
1485  __SUP_COUT__ << "EnteringError DONE." << __E__;
1486 
1487 } // end enteringError()
1488 
1489 std::vector<SupervisorInfo::SubappInfo> ots::ARTDAQSupervisor::getSubappInfo(void)
1490 {
1491  auto apps = getAndParseProcessInfo_();
1492 
1493  std::map<int, SupervisorInfo::SubappInfo> subapp_infos;
1494  for(auto& app : apps)
1495  {
1497 
1498  info.name = app.label;
1499  info.detail = "Rank " + std::to_string(app.rank) + ", subsystem " +
1500  std::to_string(app.subsystem);
1501  info.lastStatusTime = time(0);
1502  info.progress = 100;
1503  info.status = artdaqStateToOtsState(app.state);
1504  info.url = "http://" + app.host + ":" + std::to_string(app.port) + "/RPC2";
1505  info.class_name = "ARTDAQ " + labelToProcType_(app.label);
1506 
1507  subapp_infos[app.rank] = info;
1508  }
1509 
1510  std::vector<SupervisorInfo::SubappInfo> output;
1511  for(auto& [rank, info] : subapp_infos)
1512  {
1513  output.push_back(info);
1514  }
1515  return output;
1516 } //end getSubappInfo()
1517 
1518 //==============================================================================
1519 // Helper function to check if a Python call failed
1520 // Returns true if there was an error (result is NULL or PyErr_Occurred)
1521 // NOTE: Does NOT clear the Python error state - caller should call capturePyErr() to fetch it
1522 bool ots::ARTDAQSupervisor::checkPythonError(PyObject* result)
1523 {
1524  if(result == NULL || PyErr_Occurred())
1525  {
1526  // Assume result is cleaned up by its PyObjectGuard if needed
1527 
1528  // Note: We keep the Python error state so caller can extract the message with capturePyErr()
1529  return true; // Error occurred
1530  }
1531  return false; // No error
1532 } //end checkPythonError()
1533 
1534 //==============================================================================
1535 std::string ots::ARTDAQSupervisor::capturePyErr(std::string label /* = "" */)
1536 {
1537  std::string err_msg = "Unknown Python Error";
1538  PyObject * pType, *pValue, *pTraceback;
1539  PyErr_Fetch(&pType, &pValue, &pTraceback);
1540  PyErr_NormalizeException(&pType, &pValue, &pTraceback);
1541 
1542  if(pType != NULL)
1543  {
1544  // Format the full traceback like Python does
1545  PyObjectGuard traceback_module(PyImport_ImportModule("traceback"));
1546  if(traceback_module.get() != NULL)
1547  {
1548  PyObjectGuard format_exception(
1549  PyObject_GetAttrString(traceback_module.get(), "format_exception"));
1550  if(format_exception.get() != NULL)
1551  {
1552  PyObjectGuard formatted(
1553  PyObject_CallFunctionObjArgs(format_exception.get(),
1554  pType,
1555  pValue ? pValue : Py_None,
1556  pTraceback ? pTraceback : Py_None,
1557  NULL));
1558  if(formatted.get() != NULL)
1559  {
1560  // formatted is a list of strings, join them
1561  PyObjectGuard empty_string(PyUnicode_FromString(""));
1562  PyObjectGuard joined(
1563  PyUnicode_Join(empty_string.get(), formatted.get()));
1564  if(joined.get() != NULL)
1565  {
1566  const char* traceback_cstr = PyUnicode_AsUTF8(joined.get());
1567  if(traceback_cstr)
1568  err_msg = traceback_cstr;
1569  }
1570  }
1571  }
1572  }
1573 
1574  // Fallback to simple message if traceback formatting failed
1575  if(err_msg == "Unknown Python Error" && pValue != NULL)
1576  {
1577  PyObjectGuard pStr(PyObject_Str(pValue));
1578  if(pStr.get() != NULL)
1579  {
1580  const char* error_cstr = PyUnicode_AsUTF8(pStr.get());
1581  if(error_cstr)
1582  err_msg = error_cstr;
1583  }
1584  }
1585  }
1586 
1587  Py_XDECREF(pType);
1588  Py_XDECREF(pValue);
1589  Py_XDECREF(pTraceback);
1590 
1591  // Add label prefix if provided
1592  if(!label.empty())
1593  err_msg = label + ":\n" + err_msg;
1594 
1595  return err_msg;
1596 } //end capturePyErr()
1597 
1598 //==============================================================================
1599 std::string ots::ARTDAQSupervisor::captureStderrAndStdout_(std::string label /* = "" */)
1600 {
1601  if(!stringIO_out_)
1602  return ""; // Not defined
1603  // If a Python error is already pending, do not consume it here
1604  if(PyErr_Occurred())
1605  return "";
1606  if(label.size())
1607  label += ' '; //for nice printing
1608 
1609  std::string outString = "";
1610  PyObjectGuard out(PyObject_CallMethod(stringIO_out_, "getvalue", NULL));
1611 
1612  if(checkPythonError(out.get()))
1613  {
1614  // Error getting output - clear the error and return empty string
1615  capturePyErr("captureStderrAndStdout getvalue");
1616  return "";
1617  }
1618 
1619  const char* text = PyUnicode_AsUTF8(out.get());
1620 
1621  return text ? text : "";
1622 } //end captureStderrAndStdout_()
1623 
1624 void ots::ARTDAQSupervisor::getDAQState_()
1625 {
1626  __SUP_COUTS__(50) << "Getting DAQInterface python lock" << __E__;
1627  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1628  __SUP_COUTS__(50) << "Have DAQInterface python lock" << __E__;
1629 
1630  if(daqinterface_ptr_ == NULL)
1631  {
1632  daqinterface_state_ = "";
1633  __SUP_COUT_WARN__ << "daqinterface_ptr_ is not initialized!" << __E__;
1634  return;
1635  }
1636 
1637  // Prepare Python Strings ONCE (Move outside loop to prevent 5x memory leak)
1638  PyObjectGuard pName(PyUnicode_FromString("state"));
1639  PyObjectGuard pArg(PyUnicode_FromString("DAQInterface"));
1640 
1641  // WARNING: Verify your Python 'state' method accepts an argument.
1642  // If 'def state(self):' is the signature, passing pArg will fail.
1643  // If so, call: PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1644 
1645  int tries = 0;
1646  while(tries < 5)
1647  {
1648  // Call the method
1649  PyObjectGuard res(
1650  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1651 
1652  if(checkPythonError(res.get()))
1653  {
1654  tries++;
1655 
1656  // Get the error message
1657  std::string err_msg = capturePyErr("state");
1658 
1659  std::ostringstream ss;
1660  ss << "Attempt n " << tries
1661  << ". Error calling 'state'. Python Exception: " << err_msg;
1662 
1663  if(tries >= 5)
1664  {
1665  __COUT_ERR__ << ss.str() << __E__; // Log error on final fail
1666  daqinterface_state_ = "ERROR"; // distinct from empty
1667  }
1668  else
1669  {
1670  __COUT__ << ss.str() << __E__; // Log warning
1671  usleep(100000); // 100ms
1672  }
1673  continue;
1674  }
1675 
1676  // --- SUCCESS CASE ---
1677 
1678  // Safely convert result to string (res might not be a string!)
1679  PyObjectGuard strRes(PyObject_Str(res.get())); // Force conversion to string
1680  if(strRes.get())
1681  {
1682  daqinterface_state_ = std::string(PyUnicode_AsUTF8(strRes.get()));
1683  }
1684  else
1685  {
1686  // Rare case: object couldn't be converted to string
1687  daqinterface_state_ = "UNKNOWN";
1688  }
1689 
1690  __SUP_COUTS__(20) << "getDAQState_ state=" << daqinterface_state_ << __E__;
1691  break;
1692  }
1693 
1694  // Cleanup the string objects we created
1695 } //end getDAQState_()
1696 
1697 //==============================================================================
1698 std::string ots::ARTDAQSupervisor::getProcessInfo_(void)
1699 {
1700  __SUP_COUTS__(50) << "Getting DAQInterface state lock" << __E__;
1701  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1702  __SUP_COUTS__(50) << "Have DAQInterface state lock" << __E__;
1703 
1704  if(daqinterface_ptr_ == nullptr)
1705  {
1706  return "";
1707  }
1708 
1709  PyObjectGuard pName(PyUnicode_FromString("artdaq_process_info"));
1710  PyObjectGuard pArg(PyUnicode_FromString("DAQInterface"));
1711  PyObjectGuard pArg2(PyBool_FromLong(true));
1712  PyObjectGuard res(PyObject_CallMethodObjArgs(
1713  daqinterface_ptr_, pName.get(), pArg.get(), pArg2.get(), NULL));
1714 
1715  if(checkPythonError(res.get()))
1716  {
1717  std::string err = capturePyErr("artdaq_process_info");
1718  __SS__ << "Error calling artdaq_process_info function: " << err << __E__;
1719  __SUP_SS_THROW__;
1720  return "";
1721  }
1722  //cache status as latest
1723  std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1724  daqinterface_status_ = std::string(PyUnicode_AsUTF8(res.get()));
1725  return daqinterface_status_;
1726 } // end getProcessInfo_()
1727 
1728 std::string ots::ARTDAQSupervisor::artdaqStateToOtsState(std::string state)
1729 {
1730  if(state == "nonexistent" || state == "nonexistant")
1731  return RunControlStateMachine::INITIAL_STATE_NAME;
1732  if(state == "Ready")
1733  return "Configured";
1734  if(state == "Running")
1735  return RunControlStateMachine::RUNNING_STATE_NAME;
1736  if(state == "Paused")
1737  return RunControlStateMachine::PAUSED_STATE_NAME;
1738  if(state == "Stopped")
1739  return RunControlStateMachine::HALTED_STATE_NAME;
1740 
1741  TLOG(TLVL_WARNING) << "Unrecognized state name " << state;
1742  return RunControlStateMachine::FAILED_STATE_NAME;
1743 }
1744 
1745 std::string ots::ARTDAQSupervisor::labelToProcType_(std::string label)
1746 {
1747  if(label_to_proc_type_map_.count(label))
1748  {
1749  return label_to_proc_type_map_[label];
1750  }
1751  return "UNKNOWN";
1752 }
1753 
1754 //==============================================================================
1756 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo>
1757 ots::ARTDAQSupervisor::getAndParseProcessInfo_()
1758 {
1759  std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> output;
1760  // full acquire from getProcessInfo_ creates mutex locking up!
1761  // auto info = getProcessInfo_();
1762  std::string info;
1763 
1764  std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1765  std::try_to_lock);
1766  if(!lk.owns_lock()) //if lock not availabe, just report last status
1767  {
1768  __COUTS__(50) << "Do not have python lock." << __E__;
1769  std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1770  info = daqinterface_status_;
1771  }
1772  else //have lock! so retrieve Python Interface status
1773  {
1774  __COUTS__(50) << "Have python lock!" << __E__;
1775  info = getProcessInfo_();
1776  }
1777  __COUTVS__(20, info);
1778 
1779  auto procs = tokenize_(info);
1780 
1781  // 0: Whole string
1782  // 1: Process Label
1783  // 2: Process host
1784  // 3: Process port
1785  // 4: Process subsystem
1786  // 5: Process Rank
1787  // 6: Process state
1788  std::regex re("(.*?) at ([^:]*):(\\d+) \\(subsystem (\\d+), rank (\\d+)\\): (.*)");
1789 
1790  for(auto& proc : procs)
1791  {
1792  std::smatch match;
1793  if(std::regex_match(proc, match, re))
1794  {
1795  DAQInterfaceProcessInfo info;
1796 
1797  info.label = match[1];
1798  info.host = match[2];
1799  info.port = std::stoi(match[3]);
1800  info.subsystem = std::stoi(match[4]);
1801  info.rank = std::stoi(match[5]);
1802  info.state = match[6];
1803 
1804  output.push_back(info);
1805  }
1806  }
1807  return output;
1808 } // end getAndParseProcessInfo_()
1809 
1810 //==============================================================================
1812  std::unique_ptr<artdaq::CommanderInterface>>>
1813 ots::ARTDAQSupervisor::makeCommandersFromProcessInfo()
1814 {
1815  std::list<
1816  std::pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>>
1817  output;
1818  auto infos = getAndParseProcessInfo_();
1819 
1820  for(auto& info : infos)
1821  {
1822  artdaq::Commandable cm;
1823  fhicl::ParameterSet ps;
1824 
1825  ps.put<std::string>("commanderPluginType", "xmlrpc");
1826  ps.put<int>("id", info.port);
1827  ps.put<std::string>("server_url", info.host);
1828 
1829  output.emplace_back(std::make_pair<DAQInterfaceProcessInfo,
1830  std::unique_ptr<artdaq::CommanderInterface>>(
1831  std::move(info), artdaq::MakeCommanderPlugin(ps, cm)));
1832  }
1833 
1834  return output;
1835 } // end makeCommandersFromProcessInfo()
1836 
1837 //==============================================================================
1838 std::list<std::string> ots::ARTDAQSupervisor::tokenize_(std::string const& input)
1839 {
1840  size_t pos = 0;
1841  std::list<std::string> output;
1842 
1843  while(pos != std::string::npos && pos < input.size())
1844  {
1845  auto newpos = input.find('\n', pos);
1846  if(newpos != std::string::npos)
1847  {
1848  output.emplace_back(input, pos, newpos - pos);
1849  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1850  pos = newpos + 1;
1851  }
1852  else
1853  {
1854  output.emplace_back(input, pos);
1855  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1856  pos = newpos;
1857  }
1858  }
1859  return output;
1860 } // end tokenize_()
1861 
1862 //==============================================================================
1863 void ots::ARTDAQSupervisor::daqinterfaceRunner_()
1864 try
1865 {
1866  TLOG(TLVL_TRACE) << "Runner thread starting";
1867  runner_running_ = true;
1868  while(runner_running_)
1869  {
1870  if(daqinterface_ptr_ != NULL)
1871  {
1872  std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1873  getDAQState_();
1874  std::string state_before = daqinterface_state_;
1875 
1876  __SUP_COUTS__(2) << "Runner state_before=" << state_before
1877  << " state now=" << daqinterface_state_
1878  << " ?= running, ready, or booted" << __E__;
1879 
1880  if(daqinterface_state_ == "running" || daqinterface_state_ == "ready" ||
1881  daqinterface_state_ == "booted")
1882  {
1883  try
1884  {
1885  TLOG(TLVL_TRACE) << "Calling DAQInterface::check_proc_heartbeats";
1886  PyObjectGuard pName(PyUnicode_FromString("check_proc_heartbeats"));
1887  PyObjectGuard res(
1888  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1889  __COUT_MULTI_LBL__(1,
1890  captureStderrAndStdout_("check_proc_heartbeats"),
1891  "check_proc_heartbeats");
1892  TLOG(TLVL_TRACE)
1893  << "Done with DAQInterface::check_proc_heartbeats call";
1894 
1895  if(res.get() == NULL)
1896  {
1897  runner_running_ = false;
1898  std::string err = capturePyErr("check_proc_heartbeats");
1899  __SS__ << "Error calling check_proc_heartbeats function: " << err
1900  << __E__;
1901  __SUP_SS_THROW__;
1902  break;
1903  }
1904  }
1905  catch(cet::exception& ex)
1906  {
1907  runner_running_ = false;
1908  std::string err = capturePyErr("check_proc_heartbeats");
1909  __SS__ << "An cet::exception occurred while calling "
1910  "check_proc_heartbeats function "
1911  << ex.explain_self() << ": " << err << __E__;
1912  __SUP_SS_THROW__;
1913  break;
1914  }
1915  catch(std::exception& ex)
1916  {
1917  runner_running_ = false;
1918  std::string err = capturePyErr("check_proc_heartbeats");
1919  __SS__ << "An std::exception occurred while calling "
1920  "check_proc_heartbeats function: "
1921  << ex.what() << "\n\n"
1922  << err << __E__;
1923  __SUP_SS_THROW__;
1924  break;
1925  }
1926  catch(...)
1927  {
1928  runner_running_ = false;
1929  std::string err = capturePyErr("check_proc_heartbeats");
1930  __SS__ << "An unknown Error occurred while calling "
1931  "check_proc_heartbeats function: "
1932  << err << __E__;
1933  __SUP_SS_THROW__;
1934  break;
1935  }
1936 
1937  lk.unlock();
1938  getDAQState_();
1939  if(daqinterface_state_ != state_before)
1940  {
1941  runner_running_ = false;
1942  lk.unlock();
1943  __SS__ << "DAQInterface state unexpectedly changed from "
1944  << state_before << " to " << daqinterface_state_
1945  << ". Check supervisor log file for more info!" << __E__;
1946  __SUP_SS_THROW__;
1947  break;
1948  }
1949  }
1950  }
1951  else
1952  {
1953  __SUP_COUT__ << "daqinterface_ptr_ is null" << __E__;
1954  break;
1955  }
1956  usleep(1000000);
1957  }
1958  runner_running_ = false;
1959  TLOG(TLVL_TRACE) << "Runner thread complete";
1960 } // end daqinterfaceRunner_()
1961 catch(...)
1962 {
1963  __SS__ << "An error occurred in "
1964  "start_runner_/daqinterfaceRunner_ thread "
1965  << __E__;
1966  try
1967  {
1968  throw;
1969  }
1970  catch(const std::runtime_error& e)
1971  {
1972  ss << "Here is the error: " << e.what() << __E__;
1973  }
1974  catch(...)
1975  {
1976  ss << "Unexpected error!" << __E__;
1977  }
1978  __COUT_ERR__ << ss.str();
1979 
1980  {
1981  std::lock_guard<std::mutex> lock(
1982  thread_mutex_); // lock out for remainder of scope
1983  thread_error_message_ = ss.str();
1984  }
1985 
1986  theStateMachine_.setErrorMessage(ss.str());
1987 
1988  sendAsyncExceptionToGateway( //0 for both pause/stop indicates error
1989  ss.str(),
1990  0 /* isPauseException */,
1991  0 /* isStopException */);
1992 
1993 } // end daqinterfaceRunner_() catch
1994 
1995 //==============================================================================
1996 void ots::ARTDAQSupervisor::stop_runner_()
1997 {
1998  runner_running_ = false;
1999  if(runner_thread_ && runner_thread_->joinable())
2000  {
2001  runner_thread_->join();
2002  runner_thread_.reset(nullptr);
2003  }
2004 } // end stop_runner_()
2005 
2006 //==============================================================================
2007 void ots::ARTDAQSupervisor::start_runner_()
2008 {
2009  stop_runner_();
2010  runner_thread_ =
2011  std::make_unique<std::thread>(&ots::ARTDAQSupervisor::daqinterfaceRunner_, this);
2012 } // end start_runner_()
virtual void transitionHalting(toolbox::Event::Reference event) override
virtual void transitionInitializing(toolbox::Event::Reference event) override
static const std::string ARTDAQ_FCL_PATH
Tree-path rule is, if the last link in the path is a group link with a specified group ID,...
static std::string getBootFileContentFromInfo(const ARTDAQInfo &info, const std::string &setupScript, int debugLevel)
ConfigurationTree getNode(const std::string &nodeString, bool doNotThrowOnBrokenUIDLinks=false) const
"root/parent/parent/"
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
const std::string & getValueAsString(bool returnLinkTableValue=false) const
void getValue(T &value) const
ITRACEController * theTRACEController_
only define for an app that receives a command
bool isComplete()
get functions
Definition: ProgressBar.cc:88
void step()
thread safe
Definition: ProgressBar.cc:74
int read()
if stepsToComplete==0, then define any progress as 50%, thread safe
Definition: ProgressBar.cc:120
void complete()
declare complete, thread safe
Definition: ProgressBar.cc:95
defines used also by OtsConfigurationWizardSupervisor
void INIT_MF(const char *name)
std::string name
Also key in map.