otsdaq  3.09.00
ARTDAQSupervisor.cc
1 
2 
3 #define TRACEMF_USE_VERBATIM 1 // for trace longer path filenames
4 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisor.hh"
5 
6 #include "artdaq-core/Utilities/configureMessageFacility.hh"
7 #include "artdaq/BuildInfo/GetPackageBuildInfo.hh"
8 #include "artdaq/DAQdata/Globals.hh"
9 #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
10 #include "cetlib_except/exception.h"
11 #include "fhiclcpp/make_ParameterSet.h"
12 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisorTRACEController.h"
13 
14 #include "artdaq-core/Utilities/ExceptionHandler.hh" /*for artdaq::ExceptionHandler*/
15 
16 #include <boost/exception/all.hpp>
17 #include <boost/filesystem.hpp>
18 
19 #include <signal.h>
20 #include <cerrno>
21 #include <cstring>
22 #include <regex>
23 
24 #define OUT_ON_ERR_SIZE 2000 //tail size of output to include on error
25 
26 using namespace ots;
27 
28 XDAQ_INSTANTIATOR_IMPL(ARTDAQSupervisor)
29 
30 #define FAKE_CONFIG_NAME "ots_config"
31 #define DAQINTERFACE_PORT \
32  std::atoi(__ENV__("ARTDAQ_BASE_PORT")) + \
33  (partition_ * std::atoi(__ENV__("ARTDAQ_PORTS_PER_PARTITION")))
34 
35 static ARTDAQSupervisor* instance = nullptr;
36 static std::unordered_map<int, struct sigaction> old_actions =
37  std::unordered_map<int, struct sigaction>();
38 static bool sighandler_init = false;
39 static void signal_handler(int signum)
40 {
41 // Messagefacility may already be gone at this point, TRACE ONLY!
42 #if TRACE_REVNUM < 1459
43  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
44 #else
45  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
46 #endif
47  << "A signal of type " << signum
48  << " was caught by ARTDAQSupervisor. Shutting down DAQInterface, "
49  "then proceeding with default handlers!";
50 
51  if(instance)
52  instance->destroy();
53 
54  sigset_t set;
55  pthread_sigmask(SIG_UNBLOCK, NULL, &set);
56  pthread_sigmask(SIG_UNBLOCK, &set, NULL);
57 
58 #if TRACE_REVNUM < 1459
59  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
60 #else
61  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
62 #endif
63  << "Calling default signal handler";
64  if(signum != SIGUSR2)
65  {
66  sigaction(signum, &old_actions[signum], NULL);
67  kill(getpid(), signum); // Only send signal to self
68  }
69  else
70  {
71  // Send Interrupt signal if parsing SIGUSR2 (i.e. user-defined exception that
72  // should tear down ARTDAQ)
73  sigaction(SIGINT, &old_actions[SIGINT], NULL);
74  kill(getpid(), SIGINT); // Only send signal to self
75  }
76 }
77 
78 static void init_sighandler(ARTDAQSupervisor* inst)
79 {
80  static std::mutex sighandler_mutex;
81  std::unique_lock<std::mutex> lk(sighandler_mutex);
82 
83  if(!sighandler_init)
84  {
85  sighandler_init = true;
86  instance = inst;
87  std::vector<int> signals = {
88  SIGINT,
89  SIGILL,
90  SIGABRT,
91  SIGFPE,
92  SIGSEGV,
93  SIGPIPE,
94  SIGALRM,
95  SIGTERM,
96  SIGUSR2,
97  SIGHUP}; // SIGQUIT is used by art in normal operation
98  for(auto signal : signals)
99  {
100  struct sigaction old_action;
101  sigaction(signal, NULL, &old_action);
102 
103  // If the old handler wasn't SIG_IGN (it's a handler that just
104  // "ignore" the signal)
105  if(old_action.sa_handler != SIG_IGN)
106  {
107  struct sigaction action;
108  action.sa_handler = signal_handler;
109  sigemptyset(&action.sa_mask);
110  for(auto sigblk : signals)
111  {
112  sigaddset(&action.sa_mask, sigblk);
113  }
114  action.sa_flags = 0;
115 
116  // Replace the signal handler of SIGINT with the one described by
117  // new_action
118  sigaction(signal, &action, NULL);
119  old_actions[signal] = old_action;
120  }
121  }
122  }
123 }
124 
125 //==============================================================================
126 ARTDAQSupervisor::ARTDAQSupervisor(xdaq::ApplicationStub* stub)
127  : CoreSupervisorBase(stub)
128  , daqinterface_ptr_(NULL)
129  , partition_(getSupervisorProperty("partition", 0))
130  , daqinterface_state_("notrunning")
131  , runner_thread_(nullptr)
132 {
133  __SUP_COUT__ << "Constructor." << __E__;
134 
135  INIT_MF("." /*directory used is USER_DATA/LOG/.*/);
136  init_sighandler(this);
137 
138  // Only use system Python
139  // unsetenv("PYTHONPATH");
140  // unsetenv("PYTHONHOME");
141 
142  // Write out settings file
143  auto settings_file = __ENV__("DAQINTERFACE_SETTINGS");
144  std::ofstream of(settings_file, std::ios::trunc);
145  const int openErrno = errno; // capture errno immediately after open attempt
146  if(!of.is_open() || of.fail())
147  {
148  __SS__ << "Failed to open DAQINTERFACE_SETTINGS file '" << settings_file
149  << "' for writing: " << strerror(openErrno) << __E__;
150  __SS_THROW__;
151  }
152  std::stringstream o;
153 
154  setenv("DAQINTERFACE_PARTITION_NUMBER", std::to_string(partition_).c_str(), 1);
155  auto logfileName = std::string(__ENV__("OTSDAQ_LOG_DIR")) +
156  "/DAQInteface/DAQInterface_partition" +
157  std::to_string(partition_) + ".log";
158  setenv("DAQINTERFACE_LOGFILE", logfileName.c_str(), 1);
159 
160  o << "log_directory: "
161  << getSupervisorProperty("log_directory", std::string(__ENV__("OTSDAQ_LOG_DIR")))
162  << std::endl;
163 
164  {
165  const std::string record_directory = getSupervisorProperty(
166  "record_directory", ARTDAQTableBase::ARTDAQ_FCL_PATH + "/run_records/");
167  mkdir(record_directory.c_str(), 0755);
168  o << "record_directory: " << record_directory << std::endl;
169  }
170 
171  o << "package_hashes_to_save: "
172  << getSupervisorProperty("package_hashes_to_save", "[artdaq]") << std::endl;
173 
174  o << "spack_root_for_bash_scripts: "
175  << getSupervisorProperty("spack_root_for_bash_scripts",
176  std::string(__ENV__("SPACK_ROOT")))
177  << std::endl;
178  o << "boardreader timeout: " << getSupervisorProperty("boardreader_timeout", 30)
179  << std::endl;
180  o << "eventbuilder timeout: " << getSupervisorProperty("eventbuilder_timeout", 30)
181  << std::endl;
182  o << "datalogger timeout: " << getSupervisorProperty("datalogger_timeout", 30)
183  << std::endl;
184  o << "dispatcher timeout: " << getSupervisorProperty("dispatcher_timeout", 30)
185  << std::endl;
186  // Only put max_fragment_size_bytes into DAQInterface settings file if advanced_memory_usage is disabled
187  if(!getSupervisorProperty("advanced_memory_usage", false))
188  {
189  o << "max_fragment_size_bytes: "
190  << getSupervisorProperty("max_fragment_size_bytes", 1048576) << std::endl;
191  }
192  o << "transfer_plugin_to_use: "
193  << getSupervisorProperty("transfer_plugin_to_use", "TCPSocket") << std::endl;
194  if(getSupervisorProperty("transfer_plugin_from_brs", "") != "")
195  {
196  o << "transfer_plugin_from_brs: "
197  << getSupervisorProperty("transfer_plugin_from_brs", "") << std::endl;
198  }
199  if(getSupervisorProperty("transfer_plugin_from_ebs", "") != "")
200  {
201  o << "transfer_plugin_from_ebs: "
202  << getSupervisorProperty("transfer_plugin_from_ebs", "") << std::endl;
203  }
204  if(getSupervisorProperty("transfer_plugin_from_dls", "") != "")
205  {
206  o << "transfer_plugin_from_dls: "
207  << getSupervisorProperty("transfer_plugin_from_dls", "") << std::endl;
208  }
209  o << "all_events_to_all_dispatchers: " << std::boolalpha
210  << getSupervisorProperty("all_events_to_all_dispatchers", true) << std::endl;
211  if(getSupervisorProperty("data_directory_override", "") != "")
212  {
213  o << "data_directory_override: "
214  << getSupervisorProperty("data_directory_override", "") << std::endl;
215  }
216  o << "max_configurations_to_list: "
217  << getSupervisorProperty("max_configurations_to_list", 10) << std::endl;
218  o << "disable_unique_rootfile_labels: "
219  << getSupervisorProperty("disable_unique_rootfile_labels", false) << std::endl;
220  o << "use_messageviewer: " << std::boolalpha
221  << getSupervisorProperty("use_messageviewer", false) << std::endl;
222  o << "use_messagefacility: " << std::boolalpha
223  << getSupervisorProperty("use_messagefacility", true) << std::endl;
224  o << "fake_messagefacility: " << std::boolalpha
225  << getSupervisorProperty("fake_messagefacility", false) << std::endl;
226  o << "kill_existing_processes: " << std::boolalpha
227  << getSupervisorProperty("kill_existing_processes", true) << std::endl;
228  o << "advanced_memory_usage: " << std::boolalpha
229  << getSupervisorProperty("advanced_memory_usage", false) << std::endl;
230  o << "strict_fragment_id_mode: " << std::boolalpha
231  << getSupervisorProperty("strict_fragment_id_mode", false) << std::endl;
232  o << "disable_private_network_bookkeeping: " << std::boolalpha
233  << getSupervisorProperty("disable_private_network_bookkeeping", false) << std::endl;
234  o << "allowed_processors: "
235  << getSupervisorProperty(
236  "allowed_processors",
237  "0-255") // Note this sets a taskset for ALL processes, on all nodes (ex. "1,2,5-7")
238  << std::endl;
239  if(getSupervisorProperty("partition_label_format", "") !=
240  "") //Add to ARTDAQSupervisor properties partition_label_format: "-P%s"
241  o << "partition_label_format: "
242  << getSupervisorProperty("partition_label_format", "") << std::endl;
243 
244  __COUT_MULTI__(0, o.str());
245 
246  of << o.str();
247  of.close();
248 
249  // destroy current TRACEController and instantiate ARTDAQSupervisorTRACEController
250  if(CorePropertySupervisorBase::theTRACEController_)
251  {
252  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
253  delete CorePropertySupervisorBase::
254  theTRACEController_; // destruct current TRACEController
255  CorePropertySupervisorBase::theTRACEController_ = nullptr;
256  }
257  CorePropertySupervisorBase::theTRACEController_ =
259  ((ARTDAQSupervisorTRACEController*)CorePropertySupervisorBase::theTRACEController_)
260  ->setSupervisorPtr(this);
261 
262  __SUP_COUT__ << "Constructed." << __E__;
263 } // end constructor()
264 
265 //==============================================================================
266 ARTDAQSupervisor::~ARTDAQSupervisor(void)
267 {
268  __SUP_COUT__ << "Destructor." << __E__;
269  destroy();
270 
271  __SUP_COUT__ << "Calling Py_Finalize()" << __E__;
272  Py_Finalize();
273 
274  // CorePropertySupervisorBase would destroy, but since it was created here, attempt to destroy
276  {
277  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
280  }
281 
282  __SUP_COUT__ << "Destructed." << __E__;
283 } // end destructor()
284 
285 //==============================================================================
286 void ARTDAQSupervisor::destroy(void)
287 {
288  __SUP_COUT__ << "Destroying..." << __E__;
289 
290  if(daqinterface_ptr_ != NULL)
291  {
292  __SUP_COUT__ << "Calling recover transition" << __E__;
293  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
294 
295  PyObjectGuard pName(PyUnicode_FromString("do_recover"));
296  PyObjectGuard res(
297  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
298 
299  __SUP_COUT__ << "Making sure that correct state has been reached" << __E__;
300  getDAQState_();
301  while(daqinterface_state_ != "stopped")
302  {
303  getDAQState_();
304  __SUP_COUT__ << "State is " << daqinterface_state_
305  << ", waiting 1s and retrying..." << __E__;
306  usleep(1000000);
307  }
308 
309  // Cleanup
310  Py_XDECREF(daqinterface_ptr_);
311  daqinterface_ptr_ = NULL;
312  }
313 
314  __SUP_COUT__ << "Flusing printouts" << __E__;
315 
316  //make sure to flush printouts
317  PyRun_SimpleString(R"(
318 import sys
319 sys.stdout = sys.__stdout__
320 sys.stderr = sys.__stderr__
321 )");
322  // stringIO_out_ and stringIO_err_ may refer to borrowed Python objects
323  // (e.g., via PyDict_GetItemString in the tee-buffer path). Do not DECREF
324  // them here to avoid corrupting their reference counts; treat them as
325  // non-owning pointers and clear them instead.
326  stringIO_out_ = nullptr;
327  stringIO_err_ = nullptr;
328 
329  __SUP_COUT__ << "Thread and garbage cleanup" << __E__;
330  // force python thread cleanup:
331  PyRun_SimpleString(
332  "import threading; [t.join() for t in threading.enumerate() if t is not "
333  "threading.main_thread() and not isinstance(t, threading._DummyThread)]");
334  PyRun_SimpleString("import gc; gc.collect()");
335  // __SUP_COUT__ << "Calling Py_Finalize()" << __E__;
336  // Py_Finalize();
337 
338  __SUP_COUT__ << "Destroyed." << __E__;
339 } // end destroy()
340 
341 //==============================================================================
342 void ARTDAQSupervisor::init(void)
343 {
344  stop_runner_();
345 
346  __SUP_COUT__ << "Initializing..." << __E__;
347  {
348  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
349 
350  // allSupervisorInfo_.init(getApplicationContext());
351  artdaq::configureMessageFacility("ARTDAQSupervisor");
352  __SUP_COUT__ << "artdaq MF configured." << __E__;
353 
354  // initialization
355  char* daqinterface_dir = getenv("ARTDAQ_DAQINTERFACE_DIR");
356  if(daqinterface_dir == NULL)
357  {
358  __SS__ << "ARTDAQ_DAQINTERFACE_DIR environment variable not set! This "
359  "means that DAQInterface has not been setup!"
360  << __E__;
361  __SUP_SS_THROW__;
362  }
363  else
364  {
365  __SUP_COUT__ << "Initializing Python" << __E__;
366  Py_Initialize();
367 
368  //setup Python output to tee output to stdout/err and to StringIO buffer "tee_buffer"
369  PyRun_SimpleString(
370  "import sys\n"
371  "from io import StringIO\n"
372  "\n"
373  "class TeeOut:\n"
374  " def __init__(self, real, buf):\n"
375  " self.real = real\n"
376  " self.buf = buf\n"
377  " def write(self, data):\n"
378  " self.real.write(data)\n"
379  " self.buf.write(data)\n"
380  " def flush(self):\n"
381  " self.real.flush()\n"
382  " self.buf.flush()\n"
383  "\n"
384  "tee_buffer = StringIO()\n"
385  "sys.stdout = TeeOut(sys.stdout, tee_buffer)\n"
386  "sys.stderr = TeeOut(sys.stderr, tee_buffer)\n");
387 
388  __SUP_COUT__ << "Adding DAQInterface directory to PYTHON_PATH" << __E__;
389  PyObject* sysPath = PySys_GetObject(
390  (char*)"path"); //do NOT DECREF borrowed objects through GetObject!
391  PyObjectGuard programName(PyUnicode_FromString(daqinterface_dir));
392  PyList_Append(sysPath, programName.get());
393 
394  __SUP_COUT__ << "Creating Module name" << __E__;
395  PyObjectGuard pName(PyUnicode_FromString("rc.control.daqinterface"));
396  /* Error checking of pName left out */
397 
398  __SUP_COUT__ << "Importing module" << __E__;
399  PyObjectGuard pModule(PyImport_Import(pName.get()));
400 
401  if(pModule.get() == NULL)
402  {
403  std::string err = capturePyErr("import rc.control.daqinterface");
404  __SS__ << "Failed to load rc.control.daqinterface. Python Exception: "
405  << err << __E__;
406  __SUP_SS_THROW__;
407  }
408  else
409  {
410  __SUP_COUT__ << "Loading python module dictionary" << __E__;
411  PyObject* pDict = PyModule_GetDict(
412  pModule.get()); //do NOT DECREF borrowed objects through GetDict!
413  if(pDict == NULL)
414  {
415  std::string err = capturePyErr("module dict");
416  __SS__ << "Unable to load module dictionary. Python Exception: "
417  << err << __E__;
418  __SUP_SS_THROW__;
419  }
420  else
421  {
422  __SUP_COUT__ << "Getting DAQInterface object pointer" << __E__;
423  PyObject* di_obj_raw = PyDict_GetItemString(
424  pDict, "DAQInterface"); // borrowed reference
425  if(di_obj_raw == NULL)
426  {
427  std::string err = capturePyErr("DAQInterface lookup");
428  __SS__ << "Unable to find 'DAQInterface' in module dictionary. "
429  "Python Exception: "
430  << err << __E__;
431  __SUP_SS_THROW__;
432  }
433  Py_INCREF(di_obj_raw); // convert borrowed reference to owned
434  PyObjectGuard di_obj_ptr(di_obj_raw);
435 
436  __SUP_COUT__ << "Filling out DAQInterface args struct" << __E__;
437  PyObjectGuard pArgs(PyTuple_New(0));
438 
439  PyObjectGuard kwargs(Py_BuildValue("{s:s, s:s, s:i, s:i, s:s, s:s}",
440  "logpath",
441  ".daqint.log",
442  "name",
443  "DAQInterface",
444  "partition_number",
445  partition_,
446  "rpc_port",
447  DAQINTERFACE_PORT,
448  "rpc_host",
449  "localhost",
450  "control_host",
451  "localhost"));
452 
453  __SUP_COUT__ << "Calling DAQInterface Object Constructor" << __E__;
454 
455  // Get sys and io
456  PyObjectGuard sys(PyImport_ImportModule("sys"));
457  PyObjectGuard io(PyImport_ImportModule("io"));
458 
459  if(0)
460  {
461  //------------- redirect stdout to string
462 
463  // Create StringIO objects for stdout and stderr
464  stringIO_out_ = PyObject_CallMethod(io.get(), "StringIO", NULL);
465  stringIO_err_ = PyObject_CallMethod(io.get(), "StringIO", NULL);
466 
467  // Save originals (not needed, since just keep the redirection until daqinterface_ptr_ is destructed)
468  // PyObject* sys_stdout = PyObject_GetAttrString(sys, "stdout");
469  // PyObject* sys_stderr = PyObject_GetAttrString(sys, "stderr");
470 
471  // Redirect
472  PyObject_SetAttrString(sys.get(), "stdout", stringIO_out_);
473  PyObject_SetAttrString(sys.get(), "stderr", stringIO_err_);
474  //------------- end redirect stdout to string
475  }
476  else //capture tee buffer instead so output to console continues
477  {
478  PyObject* mainmod =
479  PyImport_AddModule("__main__"); // borrowed ref
480  PyObject* globals = PyModule_GetDict(mainmod); // borrowed ref
481 
482  stringIO_out_ =
483  PyDict_GetItemString(globals, "tee_buffer"); // borrowed ref
484 
485  // Do not Py_DECREF borrowed references.
486  }
487 
488  daqinterface_ptr_ =
489  PyObject_Call(di_obj_ptr.get(), pArgs.get(), kwargs.get());
490  if(checkPythonError(daqinterface_ptr_))
491  {
492  std::string err = capturePyErr("DAQInterface constructor");
493  __SS__ << "DAQInterface constructor failed. Python Exception: "
494  << err << __E__;
495  __SUP_SS_THROW__;
496  }
497 
498  if(0) //example printout handling
499  {
500  // Force an error
501  PyObjectGuard bad(
502  PyObject_CallMethod(sys.get(), "does_not_exist", NULL));
503  if(!bad.get())
504  PyErr_Print(); // <-- this writes into stringIO_err_, not the terminal
505 
506  // Grab stderr contents
507  PyObjectGuard err_text(
508  PyObject_CallMethod(stringIO_err_, "getvalue", NULL));
509  if(err_text.get())
510  __COUT__ << "Captured stderr:\n"
511  << PyUnicode_AsUTF8(err_text.get()) << "\n";
512  else
513  __COUT__ << "Capture of stderr failed.";
514  } //end example printout handling
515  }
516  }
517  }
518 
519  getDAQState_();
520 
521  // { //attempt to cleanup old artdaq processes DOES NOT WORK because artdaq interface knows it hasn't started
522  // __SUP_COUT__ << "Attempting artdaq stale cleanup..." << __E__;
523  // std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
524  // getDAQState_();
525  // __SUP_COUT__ << "Status before cleanup: " << daqinterface_state_ << __E__;
526 
527  // PyObjectGuard pName(PyUnicode_FromString("do_recover"));
528  // PyObjectGuard res(PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL));
529  // __COUT_MULTI_LBL__(0,captureStderrAndStdout_("do_recover"),"do_recover");
530 
531  // if(res == NULL)
532  // {
533  // std::string err = capturePyErr("do_recover");
534  // __SS__ << "Error with clean up calling do_recover: " << err << __E__;
535  // __SUP_SS_THROW__;
536  // }
537  // getDAQState_();
538  // __SUP_COUT__ << "Status after cleanup: " << daqinterface_state_ << __E__;
539  // __SUP_COUT__ << "cleanup DONE." << __E__;
540  // }
541  }
542  start_runner_();
543  __SUP_COUT__ << "Initialized." << __E__;
544 } // end init()
545 
546 //==============================================================================
547 void ARTDAQSupervisor::transitionConfiguring(toolbox::Event::Reference /*event*/)
548 {
549  __SUP_COUTT__ << "transitionConfiguring" << __E__;
550 
551  // activate the configuration tree (the first iteration)
552  if(RunControlStateMachine::getIterationIndex() == 0 &&
553  RunControlStateMachine::getSubIterationIndex() == 0)
554  {
555  thread_error_message_ = "";
556  thread_progress_bar_.resetProgressBar(0);
557  last_thread_progress_update_ = time(0); // initialize timeout timer
558 
559  CoreSupervisorBase::configureInit();
560 
561  // start configuring thread
562  std::thread(&ARTDAQSupervisor::configuringThread, this).detach();
563 
564  __SUP_COUT__ << "Configuring thread started." << __E__;
565 
566  RunControlStateMachine::
567  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
568  }
569  else // not first time
570  {
571  std::string errorMessage;
572  {
573  std::lock_guard<std::mutex> lock(
574  thread_mutex_); // lock out for remainder of scope
575  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
576  }
577  int progress = thread_progress_bar_.read();
578  __SUP_COUTVS__(2, errorMessage);
579  __SUP_COUTVS__(2, progress);
580  __SUP_COUTVS__(2, thread_progress_bar_.isComplete());
581 
582  // check for done and error messages
583  if(errorMessage == "" && // if no update in 600 seconds, give up
584  time(0) - last_thread_progress_update_ > 600)
585  {
586  __SUP_SS__ << "There has been no update from the configuration thread for "
587  << (time(0) - last_thread_progress_update_)
588  << " seconds, assuming something is wrong and giving up! "
589  << "Last progress received was " << progress << __E__;
590  errorMessage = ss.str();
591  }
592 
593  // Check if any subapp (artdaq component) has entered a Failed state.
594  // This can happen when a component fails during do_config/do_boot while the
595  // configuringThread is blocked waiting for DAQInterface, and would otherwise
596  // cause a 600-second timeout before the error is detected.
597  if(errorMessage == "")
598  {
599  auto subapps = getSubappInfo();
600  for(auto& subapp : subapps)
601  {
602  if(subapp.status == RunControlStateMachine::FAILED_STATE_NAME)
603  {
604  __SUP_SS__ << "Component '" << subapp.name
605  << "' entered Failed state during configuration! "
606  << "(url: " << subapp.url << ")" << __E__;
607  errorMessage = ss.str();
608  __SUP_COUT_ERR__ << "\n" << ss.str();
609  break;
610  }
611  }
612  }
613 
614  if(errorMessage != "")
615  {
616  __SUP_SS__ << "Error was caught in configuring thread: " << errorMessage
617  << __E__;
618  __SUP_COUT_ERR__ << "\n" << ss.str();
619 
620  theStateMachine_.setErrorMessage(ss.str());
621  throw toolbox::fsm::exception::Exception(
622  "Transition Error" /*name*/,
623  ss.str() /* message*/,
624  "CoreSupervisorBase::transitionConfiguring" /*module*/,
625  __LINE__ /*line*/,
626  __FUNCTION__ /*function*/
627  );
628  }
629 
630  if(!thread_progress_bar_.isComplete())
631  {
632  __SUP_COUTT__ << "Not done yet..." << __E__;
633  //attempt to get live view of python output
634  // __COUT_MULTI_LBL__(0, captureStderrAndStdout_("statuscheck"), "statuscheck");
635 
636  RunControlStateMachine::
637  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
638 
639  if(last_thread_progress_read_ != progress)
640  {
641  last_thread_progress_read_ = progress;
642  last_thread_progress_update_ = time(0);
643  }
644 
645  sleep(1 /*seconds*/);
646  }
647  else
648  {
649  __SUP_COUT_INFO__ << "Complete configuring transition!" << __E__;
650  __SUP_COUTV__(getProcessInfo_());
651  }
652  }
653 
654  return;
655 } // end transitionConfiguring()
656 
657 //==============================================================================
658 void ARTDAQSupervisor::configuringThread()
659 try
660 {
661  std::string uid = theConfigurationManager_
662  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
663  "/" + CorePropertySupervisorBase::getSupervisorUID() +
664  "/" + "LinkToSupervisorTable")
665  .getValueAsString();
666 
667  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
668 
669  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
670 
671  ConfigurationTree theSupervisorNode = getSupervisorTableNode();
672 
673  thread_progress_bar_.step();
674 
675  set_thread_message_("ConfigGen");
676 
677  auto info = ARTDAQTableBase::extractARTDAQInfo(
678  theSupervisorNode,
679  false /*getStatusFalseNodes*/,
680  true /*doWriteFHiCL*/,
681  getSupervisorProperty("max_fragment_size_bytes", 8888),
682  getSupervisorProperty("routing_timeout_ms", 1999),
683  getSupervisorProperty("routing_retry_count", 12),
684  &thread_progress_bar_);
685 
686  // Check lists
687  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::BoardReader) == 0)
688  {
689  __GEN_SS__ << "There must be at least one enabled BoardReader!" << __E__;
690  __GEN_SS_THROW__;
691  }
692  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::EventBuilder) == 0)
693  {
694  __GEN_SS__ << "There must be at least one enabled EventBuilder!" << __E__;
695  __GEN_SS_THROW__;
696  }
697 
698  thread_progress_bar_.step();
699  set_thread_message_("Writing boot.txt");
700 
701  __GEN_COUT__ << "Writing boot.txt" << __E__;
702 
703  int debugLevel = theSupervisorNode.getNode("DAQInterfaceDebugLevel").getValue<int>();
704  std::string setupScript = theSupervisorNode.getNode("DAQSetupScript").getValue();
705 
706  // Generate boot file content using helper function
707  std::string bootContent =
708  ARTDAQTableBase::getBootFileContentFromInfo(info, setupScript, debugLevel);
709 
710  // Populate label_to_proc_type_map_ (still needed for later)
711  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
712  label_to_proc_type_map_[builder.label] = "EventBuilder";
713  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
714  label_to_proc_type_map_[logger.label] = "DataLogger";
715  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
716  label_to_proc_type_map_[dispatcher.label] = "Dispatcher";
717  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
718  label_to_proc_type_map_[rmanager.label] = "RoutingManager";
719 
720  // Write boot.txt file
721  std::ofstream o(ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt", std::ios::trunc);
722  o << bootContent;
723  o.close();
724 
725  // TODO: To save to runlog, store bootContent in metadata/configuration archive
726  // Example (add when implementing runlog integration):
727  // saveToRunlog("boot.txt", bootContent, run_number);
728 
729  thread_progress_bar_.step();
730  set_thread_message_("Writing Fhicl Files");
731 
732  __GEN_COUT__ << "Building configuration directory" << __E__;
733 
734  boost::system::error_code ignored;
735  boost::filesystem::remove_all(ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME,
736  ignored);
737  mkdir((ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME).c_str(), 0755);
738 
739  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
740  {
741  symlink(ARTDAQTableBase::getFlatFHICLFilename(
742  ARTDAQTableBase::ARTDAQAppType::BoardReader, reader.label)
743  .c_str(),
744  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
745  reader.label + ".fcl")
746  .c_str());
747  }
748  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
749  {
750  symlink(ARTDAQTableBase::getFlatFHICLFilename(
751  ARTDAQTableBase::ARTDAQAppType::EventBuilder, builder.label)
752  .c_str(),
753  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
754  builder.label + ".fcl")
755  .c_str());
756  }
757  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
758  {
759  symlink(ARTDAQTableBase::getFlatFHICLFilename(
760  ARTDAQTableBase::ARTDAQAppType::DataLogger, logger.label)
761  .c_str(),
762  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
763  logger.label + ".fcl")
764  .c_str());
765  }
766  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
767  {
768  symlink(ARTDAQTableBase::getFlatFHICLFilename(
769  ARTDAQTableBase::ARTDAQAppType::Dispatcher, dispatcher.label)
770  .c_str(),
771  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
772  dispatcher.label + ".fcl")
773  .c_str());
774  }
775  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
776  {
777  symlink(ARTDAQTableBase::getFlatFHICLFilename(
778  ARTDAQTableBase::ARTDAQAppType::RoutingManager, rmanager.label)
779  .c_str(),
780  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
781  rmanager.label + ".fcl")
782  .c_str());
783  }
784 
785  thread_progress_bar_.step();
786 
787  // Block 1: State check — acquire and release daqinterface_pythonMutex_
788  // so the runner thread and halt transition can interleave between steps
789  {
790  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
791  getDAQState_();
792  if(daqinterface_state_ != "stopped" && daqinterface_state_ != "")
793  {
794  __GEN_SS__ << "Cannot configure DAQInterface because it is in the wrong state"
795  << " (" << daqinterface_state_ << " != stopped)!" << __E__;
796  __GEN_SS_THROW__
797  }
798 
799  if(daqinterface_ptr_ == nullptr)
800  {
801  __GEN_SS__ << "DAQInterface is not initialized. "
802  "Check earlier Python import/constructor errors (e.g. syntax) "
803  "in DAQInterface."
804  << __E__;
805  __GEN_SS_THROW__;
806  }
807  } // end Block 1 — release daqinterface_pythonMutex_
808 
809  // Block 2: setdaqcomps
810  set_thread_message_("Calling setdaqcomps");
811  __GEN_COUT__ << "Calling setdaqcomps" << __E__;
812  {
813  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
814 
815  __GEN_COUT__ << "Status before setdaqcomps: " << daqinterface_state_ << __E__;
816 
817  PyObjectGuard pName1(PyUnicode_FromString("setdaqcomps"));
818 
819  PyObjectGuard readerDict(PyDict_New());
820  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
821  {
822  // PyDict_SetItem INCREFs key/value, so use PyObjectGuard to manage references
823  label_to_proc_type_map_[reader.label] = "BoardReader";
824  PyObjectGuard readerName(PyUnicode_FromString(reader.label.c_str()));
825 
826  int list_size = reader.allowed_processors != "" ? 4 : 3;
827 
828  PyObjectGuard readerData(PyList_New(list_size));
829  PyObject* readerHost = PyUnicode_FromString(reader.hostname.c_str());
830  PyObject* readerPort = PyUnicode_FromString("-1");
831  PyObject* readerSubsystem =
832  PyUnicode_FromString(std::to_string(reader.subsystem).c_str());
833  PyList_SetItem(readerData.get(), 0, readerHost);
834  PyList_SetItem(readerData.get(), 1, readerPort);
835  PyList_SetItem(readerData.get(), 2, readerSubsystem);
836  if(reader.allowed_processors != "")
837  {
838  PyObject* readerAllowedProcessors =
839  PyUnicode_FromString(reader.allowed_processors.c_str());
840  PyList_SetItem(readerData.get(), 3, readerAllowedProcessors);
841  }
842  PyDict_SetItem(readerDict.get(), readerName.get(), readerData.get());
843  }
844  PyObjectGuard res1(PyObject_CallMethodObjArgs(
845  daqinterface_ptr_, pName1.get(), readerDict.get(), NULL));
846  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("setdaqcomps"), "setdaqcomps");
847 
848  if(checkPythonError(res1.get()))
849  {
850  std::string err_msg = capturePyErr("setdaqcomps");
851  __GEN_SS__ << "Error calling setdaqcomps: " << err_msg << __E__;
852  __GEN_SS_THROW__;
853  }
854 
855  getDAQState_();
856  __GEN_COUT__ << "Status after setdaqcomps: " << daqinterface_state_ << __E__;
857  } // end Block 2 — release daqinterface_pythonMutex_
858 
859  thread_progress_bar_.step();
860 
861  // Block 3: do_boot (with recover + retry)
862  set_thread_message_("Calling do_boot");
863  __GEN_COUT_INFO__ << "Calling do_boot" << __E__;
864  std::string doBootOutput = "";
865  {
866  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
867 
868  __GEN_COUT__ << "Status before boot: " << daqinterface_state_ << __E__;
869 
870  // 1. Create Python Strings
871  PyObjectGuard pNameBoot(PyUnicode_FromString("do_boot"));
872  PyObjectGuard pBootArgs(PyUnicode_FromString(
873  (ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt").c_str()));
874 
875  // 2. First Attempt: Call do_boot
876  PyObjectGuard resBoot1(PyObject_CallMethodObjArgs(
877  daqinterface_ptr_, pNameBoot.get(), pBootArgs.get(), NULL));
878 
879  doBootOutput = captureStderrAndStdout_("do_boot");
880  __COUT_MULTI_LBL__(0, doBootOutput, "do_boot");
881 
882  if(checkPythonError(resBoot1.get()))
883  {
884  // --- FAILURE PATH ---
885 
886  std::string err1 = capturePyErr("do_boot");
887 
888  __GEN_COUT_INFO__ << "Error on first boot attempt: " << err1
889  << ". Recovering and retrying..." << __E__;
890 
891  // B. Attempt 'do_recover'
892  PyObjectGuard pNameRecover(PyUnicode_FromString("do_recover"));
893  PyObjectGuard resRecover(
894  PyObject_CallMethodObjArgs(daqinterface_ptr_, pNameRecover.get(), NULL));
895  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_recover"), "do_recover");
896 
897  if(checkPythonError(resRecover.get()))
898  {
899  // Recover failed - Critical Error
900  std::string errRec = capturePyErr("do_recover");
901 
902  std::stringstream oss;
903  oss << "Error calling recover transition!!!! " << errRec;
904  if(doBootOutput.size() > OUT_ON_ERR_SIZE)
905  oss << "... last " << OUT_ON_ERR_SIZE
906  << " chars: " << doBootOutput.substr(doBootOutput.size() - 1000);
907  else
908  oss << doBootOutput;
909 
910  // Clean up original args before throwing
911  __GEN_SS__ << oss.str() << __E__;
912  __GEN_SS_THROW__;
913  }
914 
915  // C. Retry 'do_boot'
916  thread_progress_bar_.step();
917  set_thread_message_("Calling do_boot (retry)");
918  __GEN_COUT_INFO__ << "Calling do_boot again" << __E__;
919 
920  // Reuse pNameBoot and pBootArgs (valid until end of scope)
921  PyObjectGuard resBoot2(PyObject_CallMethodObjArgs(
922  daqinterface_ptr_, pNameBoot.get(), pBootArgs.get(), NULL));
923 
924  doBootOutput = captureStderrAndStdout_("do_boot (retry)");
925  __COUT_MULTI_LBL__(0, doBootOutput, "do_boot (retry)");
926 
927  if(checkPythonError(resBoot2.get()))
928  {
929  // Second boot failed
930  std::string err2 = capturePyErr("do_boot retry");
931 
932  std::stringstream oss;
933  oss << "Error calling boot transition (2nd try): " << err2;
934  if(doBootOutput.size() > OUT_ON_ERR_SIZE)
935  oss << "... last " << OUT_ON_ERR_SIZE
936  << " chars: " << doBootOutput.substr(doBootOutput.size() - 1000);
937  else
938  oss << doBootOutput;
939 
940  __GEN_SS__ << oss.str() << __E__;
941  __GEN_SS_THROW__;
942  }
943  }
944 
945  getDAQState_();
946  if(daqinterface_state_ != "booted")
947  {
948  std::cout << "Do boot output on error: \n" << doBootOutput << __E__;
949  __GEN_SS__ << "DAQInterface boot transition failed! "
950  << "Status after boot attempt: " << daqinterface_state_ << __E__;
951 
952  if(doBootOutput.size() > OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
953  ss << "... last " << OUT_ON_ERR_SIZE
954  << " characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
955  else
956  ss << doBootOutput;
957  __GEN_SS_THROW__;
958  }
959  __GEN_COUT__ << "Status after boot: " << daqinterface_state_ << __E__;
960  } // end Block 3 — release daqinterface_pythonMutex_
961 
962  thread_progress_bar_.step();
963 
964  // Block 4: do_config
965  set_thread_message_("Calling do_config");
966  __GEN_COUT_INFO__ << "Calling do_config" << __E__;
967  std::string doConfigOutput = "";
968  {
969  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
970 
971  __GEN_COUT__ << "Status before config: " << daqinterface_state_ << __E__;
972 
973  { //do_config call
974  // RAII wrapper for Python objects to ensure cleanup even on exception
975 
976  PyObjectGuard pName3(PyUnicode_FromString("do_config"));
977  // 2. Create the argument - list containing config name: ["my_config"]
978  PyObjectGuard pArg(Py_BuildValue("[s]", FAKE_CONFIG_NAME));
979 
980  // 3. Call the method
981  PyObjectGuard res3(PyObject_CallMethodObjArgs(
982  daqinterface_ptr_, pName3.get(), pArg.get(), NULL));
983 
984  // 4. Check for errors FIRST before capturing output (which might clear error state)
985  if(checkPythonError(res3.get()))
986  {
987  // Get the error message before doing anything else
988  std::string err = capturePyErr("do_config");
989 
990  // Now capture output for diagnostics
991  doConfigOutput = captureStderrAndStdout_("do_config");
992 
993  __GEN_SS__ << "Error calling config transition: " << err << __E__;
994  __GEN_SS_THROW__;
995  }
996 
997  // 5. Success path - capture output
998  doConfigOutput = captureStderrAndStdout_("do_config");
999  __COUT_MULTI_LBL__(0, doConfigOutput, "do_config");
1000 
1001  // 6. Success Handling (Safe conversion to string)
1002  // We use PyObject_Str to safely convert any return type (None, Int, String) to text
1003  PyObjectGuard strRes(PyObject_Str(res3.get()));
1004  const char* res_cstr = "";
1005  if(strRes.get())
1006  {
1007  res_cstr = PyUnicode_AsUTF8(strRes.get());
1008  }
1009 
1010  __SUP_COUTT__ << "do_config result=" << (res_cstr ? res_cstr : "N/A")
1011  << __E__;
1012  } //end do_config call
1013 
1014  getDAQState_();
1015  if(daqinterface_state_ != "ready")
1016  {
1017  __GEN_SS__ << "DAQInterface config transition failed!" << __E__
1018  << "Supervisor state: \"" << daqinterface_state_
1019  << "\" != \"ready\" " << __E__;
1020  auto doConfigOutput_recover_i =
1021  doConfigOutput.find("RECOVER transition underway");
1022  if(doConfigOutput_recover_i == std::string::npos)
1023  ss << doConfigOutput;
1024  else if(doConfigOutput_recover_i >
1025  OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
1026  ss << "... tail of " << OUT_ON_ERR_SIZE << " characters before recovery: "
1027  << doConfigOutput.substr(
1028  doConfigOutput_recover_i - OUT_ON_ERR_SIZE +
1029  std::string("RECOVER transition underway").size(),
1030  OUT_ON_ERR_SIZE);
1031  else
1032  ss << doConfigOutput.substr(
1033  0,
1034  doConfigOutput_recover_i +
1035  std::string("RECOVER transition underway").size());
1036  __GEN_SS_THROW__;
1037  }
1038  __GEN_COUT__ << "Status after config: " << daqinterface_state_ << __E__;
1039  } // end Block 4 — release daqinterface_pythonMutex_
1040 
1041  thread_progress_bar_.complete();
1042  set_thread_message_("Configured");
1043  __GEN_COUT_INFO__ << "Configured." << __E__;
1044 
1045 } // end configuringThread()
1046 catch(const std::runtime_error& e)
1047 {
1048  set_thread_message_("ERROR");
1049  __SS__ << "Error was caught while configuring: " << e.what() << __E__;
1050  __COUT_ERR__ << "\n" << ss.str();
1051  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1052  thread_error_message_ = ss.str();
1053 }
1054 catch(...)
1055 {
1056  set_thread_message_("ERROR");
1057  __SS__ << "Unknown error was caught while configuring. Please checked the logs."
1058  << __E__;
1059  __COUT_ERR__ << "\n" << ss.str();
1060 
1061  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1062 
1063  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1064  thread_error_message_ = ss.str();
1065 } // end configuringThread() error handling
1066 
1067 //==============================================================================
1068 void ARTDAQSupervisor::transitionHalting(toolbox::Event::Reference /*event*/)
1069 try
1070 {
1071  set_thread_message_("Halting");
1072  __SUP_COUT__ << "Halting..." << __E__;
1073 
1074  int tries = 0;
1075  while(tries++ < 5)
1076  {
1077  std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1078  std::try_to_lock);
1079  if(!lk.owns_lock()) //if lock not availabe, just report last status
1080  {
1081  __COUTS__(50) << "Do not have python lock for halt. tries=" << tries << __E__;
1082  sleep(1);
1083  continue;
1084  }
1085  __COUTS__(50) << "Have python lock!" << __E__;
1086 
1087  // std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1088  getDAQState_();
1089  __SUP_COUT__ << "Status before halt: " << daqinterface_state_ << __E__;
1090 
1091  if(daqinterface_state_ == "running")
1092  {
1093  // First stop before halting
1094  PyObjectGuard pName(PyUnicode_FromString("do_stop_running"));
1095  PyObjectGuard res(
1096  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1097  __COUT_MULTI_LBL__(
1098  0, captureStderrAndStdout_("do_stop_running"), "do_stop_running");
1099 
1100  if(res.get() == NULL)
1101  {
1102  std::string err = capturePyErr();
1103  __SS__ << "Error calling DAQ Interface stop transition: " << err
1104  << __E__;
1105  __SUP_SS_THROW__;
1106  }
1107  }
1108 
1109  // If DAQInterface is already stopped (e.g. after enteringError ran do_recover),
1110  // there are no artdaq processes to send Shutdown to — skip do_command.
1111  if(daqinterface_state_ == "stopped" || daqinterface_state_ == "")
1112  {
1113  __SUP_COUT__ << "DAQInterface already stopped, skipping Shutdown command."
1114  << __E__;
1115  }
1116  else
1117  {
1118  PyObjectGuard pName(PyUnicode_FromString("do_command"));
1119  PyObjectGuard pArg(PyUnicode_FromString("Shutdown"));
1120  PyObjectGuard res(PyObject_CallMethodObjArgs(
1121  daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1122  __COUT_MULTI_LBL__(
1123  0, captureStderrAndStdout_("do_command Shutdown"), "do_command Shutdown");
1124 
1125  if(checkPythonError(res.get()))
1126  {
1127  std::string err = capturePyErr("do_command Shutdown");
1128  __SS__ << "Error calling DAQ Interface halt transition: " << err << __E__;
1129  __SUP_SS_THROW__;
1130  }
1131  }
1132 
1133  getDAQState_();
1134  __SUP_COUT__ << "Status after halt: " << daqinterface_state_ << __E__;
1135  break;
1136  } //end retry loop
1137 
1138  if(tries >= 5)
1139  {
1140  __SUP_SS__ << "Failed to acquire python lock for halting after " << tries
1141  << " tries, giving up! Is it possible the configure thread is stuck?"
1142  << __E__;
1143  __SUP_SS_THROW__;
1144  }
1145 
1146  __SUP_COUT__ << "Halted." << __E__;
1147  set_thread_message_("Halted");
1148 } // end transitionHalting()
1149 catch(const std::runtime_error& e)
1150 {
1151  const std::string transitionName = "Halting";
1152  // if halting from Failed state, then ignore errors
1153  if(theStateMachine_.getProvenanceStateName() ==
1154  RunControlStateMachine::FAILED_STATE_NAME ||
1155  theStateMachine_.getProvenanceStateName() ==
1156  RunControlStateMachine::HALTED_STATE_NAME)
1157  {
1158  __SUP_COUT_INFO__ << "Error was caught while halting (but ignoring because "
1159  "previous state was '"
1160  << RunControlStateMachine::FAILED_STATE_NAME
1161  << "'): " << e.what() << __E__;
1162  }
1163  else // if not previously in Failed state, then fail
1164  {
1165  __SUP_SS__ << "Error was caught while " << transitionName << ": " << e.what()
1166  << __E__;
1167  __SUP_COUT_ERR__ << "\n" << ss.str();
1168  theStateMachine_.setErrorMessage(ss.str());
1169  throw toolbox::fsm::exception::Exception(
1170  "Transition Error" /*name*/,
1171  ss.str() /* message*/,
1172  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
1173  __LINE__ /*line*/,
1174  __FUNCTION__ /*function*/
1175  );
1176  }
1177 } // end transitionHalting() std::runtime_error exception handling
1178 catch(...)
1179 {
1180  const std::string transitionName = "Halting";
1181  // if halting from Failed state, then ignore errors
1182  if(theStateMachine_.getProvenanceStateName() ==
1183  RunControlStateMachine::FAILED_STATE_NAME ||
1184  theStateMachine_.getProvenanceStateName() ==
1185  RunControlStateMachine::HALTED_STATE_NAME)
1186  {
1187  __SUP_COUT_INFO__ << "Unknown error was caught while halting (but ignoring "
1188  "because previous state was '"
1189  << RunControlStateMachine::FAILED_STATE_NAME << "')." << __E__;
1190  }
1191  else // if not previously in Failed state, then fail
1192  {
1193  __SUP_SS__ << "Unknown error was caught while " << transitionName
1194  << ". Please checked the logs." << __E__;
1195  __SUP_COUT_ERR__ << "\n" << ss.str();
1196  theStateMachine_.setErrorMessage(ss.str());
1197 
1198  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1199 
1200  throw toolbox::fsm::exception::Exception(
1201  "Transition Error" /*name*/,
1202  ss.str() /* message*/,
1203  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
1204  __LINE__ /*line*/,
1205  __FUNCTION__ /*function*/
1206  );
1207  }
1208 } // end transitionHalting() exception handling
1209 
1210 //==============================================================================
1211 void ARTDAQSupervisor::transitionInitializing(toolbox::Event::Reference /*event*/)
1212 try
1213 {
1214  set_thread_message_("Initializing");
1215  __SUP_COUT__ << "Initializing..." << __E__;
1216  init();
1217  __SUP_COUT__ << "Initialized." << __E__;
1218  set_thread_message_("Initialized");
1219 } // end transitionInitializing()
1220 catch(const std::runtime_error& e)
1221 {
1222  __SS__ << "Error was caught while Initializing: " << e.what() << __E__;
1223  __SS_THROW__;
1224 }
1225 catch(...)
1226 {
1227  __SS__ << "Unknown error was caught while Initializing. Please checked the logs."
1228  << __E__;
1229  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1230  __SS_THROW__;
1231 } // end transitionInitializing() error handling
1232 
1233 //==============================================================================
1234 void ARTDAQSupervisor::transitionPausing(toolbox::Event::Reference /*event*/)
1235 try
1236 {
1237  set_thread_message_("Pausing");
1238  __SUP_COUT__ << "Pausing..." << __E__;
1239  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1240 
1241  getDAQState_();
1242  __SUP_COUT__ << "Status before pause: " << daqinterface_state_ << __E__;
1243 
1244  PyObjectGuard pName(PyUnicode_FromString("do_command"));
1245  PyObjectGuard pArg(PyUnicode_FromString("Pause"));
1246  PyObjectGuard res(
1247  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1248  __COUT_MULTI_LBL__(
1249  0, captureStderrAndStdout_("do_command Pause"), "do_command Pause");
1250 
1251  if(checkPythonError(res.get()))
1252  {
1253  std::string err = capturePyErr("do_command Pause");
1254  __SS__ << "Error calling DAQ Interface Pause transition: " << err << __E__;
1255  __SUP_SS_THROW__;
1256  }
1257 
1258  getDAQState_();
1259  __SUP_COUT__ << "Status after pause: " << daqinterface_state_ << __E__;
1260 
1261  __SUP_COUT__ << "Paused." << __E__;
1262  set_thread_message_("Paused");
1263 } // end transitionPausing()
1264 catch(const std::runtime_error& e)
1265 {
1266  __SS__ << "Error was caught while Pausing: " << e.what() << __E__;
1267  __SS_THROW__;
1268 }
1269 catch(...)
1270 {
1271  __SS__ << "Unknown error was caught while Pausing. Please checked the logs." << __E__;
1272  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1273  __SS_THROW__;
1274 } // end transitionPausing() error handling
1275 
1276 //==============================================================================
1277 void ARTDAQSupervisor::transitionResuming(toolbox::Event::Reference /*event*/)
1278 try
1279 {
1280  set_thread_message_("Resuming");
1281  __SUP_COUT__ << "Resuming..." << __E__;
1282  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1283 
1284  getDAQState_();
1285  __SUP_COUT__ << "Status before resume: " << daqinterface_state_ << __E__;
1286  PyObjectGuard pName(PyUnicode_FromString("do_command"));
1287  PyObjectGuard pArg(PyUnicode_FromString("Resume"));
1288  PyObjectGuard res(
1289  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1290  __COUT_MULTI_LBL__(
1291  0, captureStderrAndStdout_("do_command Resume"), "do_command Resume");
1292 
1293  if(checkPythonError(res.get()))
1294  {
1295  std::string err = capturePyErr("do_command Resume");
1296  __SS__ << "Error calling DAQ Interface Resume transition: " << err << __E__;
1297  __SUP_SS_THROW__;
1298  }
1299 
1300  getDAQState_();
1301  __SUP_COUT__ << "Status after resume: " << daqinterface_state_ << __E__;
1302  __SUP_COUT__ << "Resumed." << __E__;
1303  set_thread_message_("Resumed");
1304 } // end transitionResuming()
1305 catch(const std::runtime_error& e)
1306 {
1307  __SS__ << "Error was caught while Resuming: " << e.what() << __E__;
1308  __SS_THROW__;
1309 }
1310 catch(...)
1311 {
1312  __SS__ << "Unknown error was caught while Resuming. Please checked the logs."
1313  << __E__;
1314  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1315  __SS_THROW__;
1316 } // end transitionResuming() error handling
1317 
1318 //==============================================================================
1319 void ARTDAQSupervisor::transitionStarting(toolbox::Event::Reference /*event*/)
1320 try
1321 {
1322  __SUP_COUT__ << "transitionStarting" << __E__;
1323 
1324  // first time launch thread because artdaq Supervisor may take a while
1325  if(RunControlStateMachine::getIterationIndex() == 0 &&
1326  RunControlStateMachine::getSubIterationIndex() == 0)
1327  {
1328  thread_error_message_ = "";
1329  thread_progress_bar_.resetProgressBar(0);
1330  last_thread_progress_update_ = time(0); // initialize timeout timer
1331 
1332  // start configuring thread
1333  std::thread(&ARTDAQSupervisor::startingThread, this).detach();
1334 
1335  __SUP_COUT_INFO__ << "Starting thread started." << __E__;
1336 
1337  RunControlStateMachine::
1338  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1339  }
1340  else // not first time
1341  {
1342  std::string errorMessage;
1343  {
1344  std::lock_guard<std::mutex> lock(
1345  thread_mutex_); // lock out for remainder of scope
1346  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
1347  }
1348  int progress = thread_progress_bar_.read();
1349  __SUP_COUTV__(errorMessage);
1350  __SUP_COUTV__(progress);
1351  __SUP_COUTV__(thread_progress_bar_.isComplete());
1352 
1353  // check for done and error messages
1354  if(errorMessage == "" && // if no update in 600 seconds, give up
1355  time(0) - last_thread_progress_update_ > 600)
1356  {
1357  __SUP_SS__ << "There has been no update from the start thread for "
1358  << (time(0) - last_thread_progress_update_)
1359  << " seconds, assuming something is wrong and giving up! "
1360  << "Last progress received was " << progress << __E__;
1361  errorMessage = ss.str();
1362  }
1363 
1364  if(errorMessage != "")
1365  {
1366  __SUP_SS__ << "Error was caught in starting thread: " << errorMessage
1367  << __E__;
1368  __SUP_COUT_ERR__ << "\n" << ss.str();
1369 
1370  theStateMachine_.setErrorMessage(ss.str());
1371  throw toolbox::fsm::exception::Exception(
1372  "Transition Error" /*name*/,
1373  ss.str() /* message*/,
1374  "CoreSupervisorBase::transitionStarting" /*module*/,
1375  __LINE__ /*line*/,
1376  __FUNCTION__ /*function*/
1377  );
1378  }
1379 
1380  if(!thread_progress_bar_.isComplete())
1381  {
1382  __SUP_COUT__ << "Not done yet..." << __E__;
1383  //attempt to get live view of python output (not working and not needed with new Tee Buffer solution)
1384  // __COUT_MULTI_LBL__(0, captureStderrAndStdout_("statuscheck"), "statuscheck");
1385 
1386  RunControlStateMachine::
1387  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1388 
1389  if(last_thread_progress_read_ != progress)
1390  {
1391  last_thread_progress_read_ = progress;
1392  last_thread_progress_update_ = time(0);
1393  }
1394 
1395  sleep(1 /*seconds*/);
1396  }
1397  else
1398  {
1399  __SUP_COUT_INFO__ << "Starting transition completed!" << __E__;
1400  __SUP_COUTV__(getProcessInfo_());
1401  }
1402  }
1403 
1404  return;
1405 
1406 } // end transitionStarting()
1407 catch(const std::runtime_error& e)
1408 {
1409  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1410  __SS_THROW__;
1411 }
1412 catch(...)
1413 {
1414  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1415  << __E__;
1416  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1417  __SS_THROW__;
1418 } // end transitionStarting() error handling
1419 
1420 //==============================================================================
1421 void ARTDAQSupervisor::startingThread()
1422 try
1423 {
1424  std::string uid = theConfigurationManager_
1425  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
1426  "/" + CorePropertySupervisorBase::getSupervisorUID() +
1427  "/" + "LinkToSupervisorTable")
1428  .getValueAsString();
1429 
1430  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
1431  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
1432  __GEN_COUT__ << "Starting..." << __E__;
1433  set_thread_message_("Starting");
1434 
1435  thread_progress_bar_.step();
1436  stop_runner_();
1437  {
1438  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1439  getDAQState_();
1440  __GEN_COUT__ << "Status before start: " << daqinterface_state_ << __E__;
1441  auto runNumber = SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
1442  .getParameters()
1443  .getValue("RunNumber");
1444 
1445  thread_progress_bar_.step();
1446 
1447  __GEN_COUT_INFO__ << "Calling do_start_running" << __E__;
1448  PyObjectGuard pName(PyUnicode_FromString("do_start_running"));
1449  int run_number = std::stoi(runNumber);
1450  PyObjectGuard pStateArgs(PyLong_FromLong(run_number));
1451  PyObjectGuard res(PyObject_CallMethodObjArgs(
1452  daqinterface_ptr_, pName.get(), pStateArgs.get(), NULL));
1453  std::string doStartOutput;
1454 
1455  thread_progress_bar_.step();
1456 
1457  if(checkPythonError(res.get()))
1458  {
1459  std::string err = capturePyErr("do_start_running");
1460  doStartOutput = captureStderrAndStdout_("do_start_running");
1461  __SS__ << "Error calling start transition: " << err << __E__;
1462  if(doStartOutput.size() > OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
1463  ss << "... last " << OUT_ON_ERR_SIZE << " characters: "
1464  << doStartOutput.substr(doStartOutput.size() - OUT_ON_ERR_SIZE);
1465  else
1466  ss << doStartOutput;
1467  __GEN_SS_THROW__;
1468  }
1469 
1470  doStartOutput = captureStderrAndStdout_("do_start_running");
1471  __COUT_MULTI_LBL__(0, doStartOutput, "do_start_running");
1472  getDAQState_();
1473 
1474  thread_progress_bar_.step();
1475 
1476  __GEN_COUT__ << "Status after start: " << daqinterface_state_ << __E__;
1477  if(daqinterface_state_ != "running")
1478  {
1479  __SS__ << "DAQInterface start transition failed!" << __E__
1480  << "DAQInterface state: \"" << daqinterface_state_
1481  << "\" != \"running\" " << __E__;
1482  if(doStartOutput.size() > OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
1483  ss << "... last " << OUT_ON_ERR_SIZE << " characters: "
1484  << doStartOutput.substr(doStartOutput.size() - OUT_ON_ERR_SIZE);
1485  else
1486  ss << doStartOutput;
1487  __GEN_SS_THROW__;
1488  }
1489 
1490  thread_progress_bar_.step();
1491  }
1492  start_runner_();
1493  set_thread_message_("Started");
1494  thread_progress_bar_.step();
1495 
1496  __GEN_COUT_INFO__ << "Started." << __E__;
1497  thread_progress_bar_.complete();
1498 
1499 } // end startingThread()
1500 catch(const std::runtime_error& e)
1501 {
1502  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1503  __COUT_ERR__ << "\n" << ss.str();
1504  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1505  thread_error_message_ = ss.str();
1506 }
1507 catch(...)
1508 {
1509  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1510  << __E__;
1511  __COUT_ERR__ << "\n" << ss.str();
1512 
1513  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1514 
1515  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1516  thread_error_message_ = ss.str();
1517 } // end startingThread() error handling
1518 
1519 //==============================================================================
1520 void ARTDAQSupervisor::transitionStopping(toolbox::Event::Reference /*event*/)
1521 try
1522 {
1523  __SUP_COUT__ << "Stopping..." << __E__;
1524  set_thread_message_("Stopping");
1525  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1526  getDAQState_();
1527  __SUP_COUT__ << "Status before stop: " << daqinterface_state_ << __E__;
1528  PyObjectGuard pName(PyUnicode_FromString("do_stop_running"));
1529  PyObjectGuard res(PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1530  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_stop_running"), "do_stop_running");
1531 
1532  if(checkPythonError(res.get()))
1533  {
1534  std::string err = capturePyErr("do_stop_running");
1535  __SS__ << "Error calling DAQ Interface stop transition: " << err << __E__;
1536  __SUP_SS_THROW__;
1537  }
1538  getDAQState_();
1539  __SUP_COUT__ << "Status after stop: " << daqinterface_state_ << __E__;
1540  __SUP_COUT__ << "Stopped." << __E__;
1541  set_thread_message_("Stopped");
1542 } // end transitionStopping()
1543 catch(const std::runtime_error& e)
1544 {
1545  __SS__ << "Error was caught while Stopping: " << e.what() << __E__;
1546  __SS_THROW__;
1547 }
1548 catch(...)
1549 {
1550  __SS__ << "Unknown error was caught while Stopping. Please checked the logs."
1551  << __E__;
1552  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1553  __SS_THROW__;
1554 } // end transitionStopping() error handling
1555 
1556 //==============================================================================
1557 void ots::ARTDAQSupervisor::enteringError(toolbox::Event::Reference /*event*/)
1558 {
1559  __SUP_COUT__ << "Entering error recovery state" << __E__;
1560  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1561  getDAQState_();
1562  __SUP_COUT__ << "Status before error: " << daqinterface_state_ << __E__;
1563 
1564  PyObjectGuard pName(PyUnicode_FromString("do_recover"));
1565  PyObjectGuard res(PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1566  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_recover"), "do_recover");
1567 
1568  if(checkPythonError(res.get()))
1569  {
1570  std::string err = capturePyErr("do_recover");
1571  //do not throw exception when entering error, because failing DAQ interface could be the reason for error in first place
1572  __SUP_COUT_WARN__ << "Error calling DAQ Interface recover transition: " << err
1573  << __E__;
1574  return;
1575  }
1576 
1577  getDAQState_();
1578  __SUP_COUT__ << "Status after error: " << daqinterface_state_ << __E__;
1579  __SUP_COUT__ << "EnteringError DONE." << __E__;
1580 
1581 } // end enteringError()
1582 
1583 std::vector<SupervisorInfo::SubappInfo> ots::ARTDAQSupervisor::getSubappInfo(void)
1584 {
1585  auto apps = getAndParseProcessInfo_();
1586 
1587  std::map<int, SupervisorInfo::SubappInfo> subapp_infos;
1588  for(auto& app : apps)
1589  {
1591 
1592  info.name = app.label;
1593  info.detail = "Rank " + std::to_string(app.rank) + ", subsystem " +
1594  std::to_string(app.subsystem);
1595  info.lastStatusTime = time(0);
1596  info.progress = 100;
1597  info.status = artdaqStateToOtsState(app.state);
1598  info.url = "http://" + app.host + ":" + std::to_string(app.port) + "/RPC2";
1599  info.class_name = "ARTDAQ " + labelToProcType_(app.label);
1600 
1601  subapp_infos[app.rank] = info;
1602  }
1603 
1604  std::vector<SupervisorInfo::SubappInfo> output;
1605  for(auto& [rank, info] : subapp_infos)
1606  {
1607  output.push_back(info);
1608  }
1609  return output;
1610 } //end getSubappInfo()
1611 
1612 //==============================================================================
1613 // Helper function to check if a Python call failed
1614 // Returns true if there was an error (result is NULL or PyErr_Occurred)
1615 // NOTE: Does NOT clear the Python error state - caller should call capturePyErr() to fetch it
1616 bool ots::ARTDAQSupervisor::checkPythonError(PyObject* result)
1617 {
1618  if(result == NULL || PyErr_Occurred())
1619  {
1620  // Assume result is cleaned up by its PyObjectGuard if needed
1621 
1622  // Note: We keep the Python error state so caller can extract the message with capturePyErr()
1623  return true; // Error occurred
1624  }
1625  return false; // No error
1626 } //end checkPythonError()
1627 
1628 //==============================================================================
1629 std::string ots::ARTDAQSupervisor::capturePyErr(std::string label /* = "" */)
1630 {
1631  std::string err_msg = "Unknown Python Error";
1632  PyObject * pType, *pValue, *pTraceback;
1633  PyErr_Fetch(&pType, &pValue, &pTraceback);
1634  PyErr_NormalizeException(&pType, &pValue, &pTraceback);
1635 
1636  if(pType != NULL)
1637  {
1638  // Format the full traceback like Python does
1639  PyObjectGuard traceback_module(PyImport_ImportModule("traceback"));
1640  if(traceback_module.get() != NULL)
1641  {
1642  PyObjectGuard format_exception(
1643  PyObject_GetAttrString(traceback_module.get(), "format_exception"));
1644  if(format_exception.get() != NULL)
1645  {
1646  PyObjectGuard formatted(
1647  PyObject_CallFunctionObjArgs(format_exception.get(),
1648  pType,
1649  pValue ? pValue : Py_None,
1650  pTraceback ? pTraceback : Py_None,
1651  NULL));
1652  if(formatted.get() != NULL)
1653  {
1654  // formatted is a list of strings, join them
1655  PyObjectGuard empty_string(PyUnicode_FromString(""));
1656  PyObjectGuard joined(
1657  PyUnicode_Join(empty_string.get(), formatted.get()));
1658  if(joined.get() != NULL)
1659  {
1660  const char* traceback_cstr = PyUnicode_AsUTF8(joined.get());
1661  if(traceback_cstr)
1662  err_msg = traceback_cstr;
1663  }
1664  }
1665  }
1666  }
1667 
1668  // Fallback to simple message if traceback formatting failed
1669  if(err_msg == "Unknown Python Error" && pValue != NULL)
1670  {
1671  PyObjectGuard pStr(PyObject_Str(pValue));
1672  if(pStr.get() != NULL)
1673  {
1674  const char* error_cstr = PyUnicode_AsUTF8(pStr.get());
1675  if(error_cstr)
1676  err_msg = error_cstr;
1677  }
1678  }
1679  }
1680 
1681  Py_XDECREF(pType);
1682  Py_XDECREF(pValue);
1683  Py_XDECREF(pTraceback);
1684 
1685  // Add label prefix if provided
1686  if(!label.empty())
1687  err_msg = label + ":\n" + err_msg;
1688 
1689  return err_msg;
1690 } //end capturePyErr()
1691 
1692 //==============================================================================
1693 std::string ots::ARTDAQSupervisor::captureStderrAndStdout_(std::string label /* = "" */)
1694 {
1695  if(!stringIO_out_)
1696  return ""; // Not defined
1697  // If a Python error is already pending, do not consume it here
1698  if(PyErr_Occurred())
1699  return "";
1700  if(label.size())
1701  label += ' '; //for nice printing
1702 
1703  std::string outString = "";
1704  PyObjectGuard out(PyObject_CallMethod(stringIO_out_, "getvalue", NULL));
1705 
1706  if(checkPythonError(out.get()))
1707  {
1708  // Error getting output - clear the error and return empty string
1709  capturePyErr("captureStderrAndStdout getvalue");
1710  return "";
1711  }
1712 
1713  const char* text = PyUnicode_AsUTF8(out.get());
1714 
1715  return text ? text : "";
1716 } //end captureStderrAndStdout_()
1717 
1718 void ots::ARTDAQSupervisor::getDAQState_()
1719 {
1720  __SUP_COUTS__(50) << "Getting DAQInterface python lock" << __E__;
1721  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1722  __SUP_COUTS__(50) << "Have DAQInterface python lock" << __E__;
1723 
1724  if(daqinterface_ptr_ == NULL)
1725  {
1726  daqinterface_state_ = "";
1727  __SUP_COUT_WARN__ << "daqinterface_ptr_ is not initialized!" << __E__;
1728  return;
1729  }
1730 
1731  // Prepare Python Strings ONCE (Move outside loop to prevent 5x memory leak)
1732  PyObjectGuard pName(PyUnicode_FromString("state"));
1733  PyObjectGuard pArg(PyUnicode_FromString("DAQInterface"));
1734 
1735  // WARNING: Verify your Python 'state' method accepts an argument.
1736  // If 'def state(self):' is the signature, passing pArg will fail.
1737  // If so, call: PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1738 
1739  int tries = 0;
1740  while(tries < 5)
1741  {
1742  // Call the method
1743  PyObjectGuard res(
1744  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1745 
1746  if(checkPythonError(res.get()))
1747  {
1748  tries++;
1749 
1750  // Get the error message
1751  std::string err_msg = capturePyErr("state");
1752 
1753  std::ostringstream ss;
1754  ss << "Attempt n " << tries
1755  << ". Error calling 'state'. Python Exception: " << err_msg;
1756 
1757  if(tries >= 5)
1758  {
1759  __COUT_ERR__ << ss.str() << __E__; // Log error on final fail
1760  daqinterface_state_ = "ERROR"; // distinct from empty
1761  }
1762  else
1763  {
1764  __COUT__ << ss.str() << __E__; // Log warning
1765  usleep(100000); // 100ms
1766  }
1767  continue;
1768  }
1769 
1770  // --- SUCCESS CASE ---
1771 
1772  // Safely convert result to string (res might not be a string!)
1773  PyObjectGuard strRes(PyObject_Str(res.get())); // Force conversion to string
1774  if(strRes.get())
1775  {
1776  daqinterface_state_ = std::string(PyUnicode_AsUTF8(strRes.get()));
1777  }
1778  else
1779  {
1780  // Rare case: object couldn't be converted to string
1781  daqinterface_state_ = "UNKNOWN";
1782  }
1783 
1784  __SUP_COUTS__(20) << "getDAQState_ state=" << daqinterface_state_ << __E__;
1785  break;
1786  }
1787 
1788  // Cleanup the string objects we created
1789 } //end getDAQState_()
1790 
1791 //==============================================================================
1792 std::string ots::ARTDAQSupervisor::getProcessInfo_(void)
1793 {
1794  __SUP_COUTS__(50) << "Getting DAQInterface state lock" << __E__;
1795  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1796  __SUP_COUTS__(50) << "Have DAQInterface state lock" << __E__;
1797 
1798  if(daqinterface_ptr_ == nullptr)
1799  {
1800  return "";
1801  }
1802 
1803  PyObjectGuard pName(PyUnicode_FromString("artdaq_process_info"));
1804  PyObjectGuard pArg(PyUnicode_FromString("DAQInterface"));
1805  PyObjectGuard pArg2(PyBool_FromLong(true));
1806  PyObjectGuard res(PyObject_CallMethodObjArgs(
1807  daqinterface_ptr_, pName.get(), pArg.get(), pArg2.get(), NULL));
1808 
1809  if(checkPythonError(res.get()))
1810  {
1811  std::string err = capturePyErr("artdaq_process_info");
1812  __SS__ << "Error calling artdaq_process_info function: " << err << __E__;
1813  __SUP_SS_THROW__;
1814  return "";
1815  }
1816  //cache status as latest
1817  std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1818  daqinterface_status_ = std::string(PyUnicode_AsUTF8(res.get()));
1819  return daqinterface_status_;
1820 } // end getProcessInfo_()
1821 
1822 std::string ots::ARTDAQSupervisor::artdaqStateToOtsState(std::string state)
1823 {
1824  if(state == "nonexistent" || state == "nonexistant")
1825  return RunControlStateMachine::INITIAL_STATE_NAME;
1826  if(state == "Ready")
1827  return "Configured";
1828  if(state == "Running")
1829  return RunControlStateMachine::RUNNING_STATE_NAME;
1830  if(state == "Paused")
1831  return RunControlStateMachine::PAUSED_STATE_NAME;
1832  if(state == "Stopped")
1833  return RunControlStateMachine::HALTED_STATE_NAME;
1834 
1835  TLOG(TLVL_WARNING) << "Unrecognized state name " << state;
1836  return RunControlStateMachine::FAILED_STATE_NAME;
1837 }
1838 
1839 std::string ots::ARTDAQSupervisor::labelToProcType_(std::string label)
1840 {
1841  if(label_to_proc_type_map_.count(label))
1842  {
1843  return label_to_proc_type_map_[label];
1844  }
1845  return "UNKNOWN";
1846 }
1847 
1848 //==============================================================================
1850 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo>
1851 ots::ARTDAQSupervisor::getAndParseProcessInfo_()
1852 {
1853  std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> output;
1854  // full acquire from getProcessInfo_ creates mutex locking up!
1855  // auto info = getProcessInfo_();
1856  std::string info;
1857 
1858  std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1859  std::try_to_lock);
1860  if(!lk.owns_lock()) //if lock not availabe, just report last status
1861  {
1862  __COUTS__(50) << "Do not have python lock." << __E__;
1863  std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1864  info = daqinterface_status_;
1865  }
1866  else //have lock! so retrieve Python Interface status
1867  {
1868  __COUTS__(50) << "Have python lock!" << __E__;
1869  info = getProcessInfo_();
1870  }
1871  __COUTVS__(20, info);
1872 
1873  auto procs = tokenize_(info);
1874 
1875  // 0: Whole string
1876  // 1: Process Label
1877  // 2: Process host
1878  // 3: Process port
1879  // 4: Process subsystem
1880  // 5: Process Rank
1881  // 6: Process state
1882  std::regex re("(.*?) at ([^:]*):(\\d+) \\(subsystem (\\d+), rank (\\d+)\\): (.*)");
1883 
1884  for(auto& proc : procs)
1885  {
1886  std::smatch match;
1887  if(std::regex_match(proc, match, re))
1888  {
1889  DAQInterfaceProcessInfo info;
1890 
1891  info.label = match[1];
1892  info.host = match[2];
1893  info.port = std::stoi(match[3]);
1894  info.subsystem = std::stoi(match[4]);
1895  info.rank = std::stoi(match[5]);
1896  info.state = match[6];
1897 
1898  output.push_back(info);
1899  }
1900  }
1901  return output;
1902 } // end getAndParseProcessInfo_()
1903 
1904 //==============================================================================
1906  std::unique_ptr<artdaq::CommanderInterface>>>
1907 ots::ARTDAQSupervisor::makeCommandersFromProcessInfo()
1908 {
1909  std::list<
1910  std::pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>>
1911  output;
1912  auto infos = getAndParseProcessInfo_();
1913 
1914  for(auto& info : infos)
1915  {
1916  artdaq::Commandable cm;
1917  fhicl::ParameterSet ps;
1918 
1919  ps.put<std::string>("commanderPluginType", "xmlrpc");
1920  ps.put<int>("id", info.port);
1921  ps.put<std::string>("server_url", info.host);
1922 
1923  output.emplace_back(std::make_pair<DAQInterfaceProcessInfo,
1924  std::unique_ptr<artdaq::CommanderInterface>>(
1925  std::move(info), artdaq::MakeCommanderPlugin(ps, cm)));
1926  }
1927 
1928  return output;
1929 } // end makeCommandersFromProcessInfo()
1930 
1931 //==============================================================================
1932 // getConfiguredArtdaqHosts
1933 // Returns the de-duplicated set of hostnames of all enabled artdaq processes,
1934 // taken from the active configuration via ARTDAQTableBase::extractARTDAQInfo
1935 // (the same call used by configuringThread()). Unlike makeCommandersFromProcessInfo()
1936 // -- which reads the live DAQInterface status and is empty when DAQInterface is
1937 // not running -- this reflects the configuration's intended deployment. The
1938 // configuration does NOT carry the runtime xmlrpc commander ports, so this is
1939 // used for host discovery only (e.g. to drive 'ots -tt <hosts>').
1940 std::set<std::string> ots::ARTDAQSupervisor::getConfiguredArtdaqHosts(void)
1941 {
1942  std::set<std::string> hosts;
1943  try
1944  {
1945  ConfigurationTree supervisorNode = getSupervisorTableNode();
1946  ARTDAQTableBase::ARTDAQInfo info = ARTDAQTableBase::extractARTDAQInfo(
1947  supervisorNode, false /*getStatusFalseNodes*/, false /*doWriteFHiCL*/);
1948  for(const auto& typeProcs : info.processes)
1949  for(const auto& proc : typeProcs.second)
1950  if(proc.status && !proc.hostname.empty())
1951  hosts.insert(proc.hostname);
1952  }
1953  catch(const std::exception& e)
1954  {
1955  __SUP_COUT_ERR__ << "Failed to extract configured artdaq hosts: " << e.what()
1956  << __E__;
1957  }
1958  catch(...)
1959  {
1960  __SUP_COUT_ERR__
1961  << "Failed to extract configured artdaq hosts (unknown exception)." << __E__;
1962  }
1963  __SUP_COUT__ << "Configured artdaq hosts: " << StringMacros::setToString(hosts)
1964  << __E__;
1965  return hosts;
1966 } // end getConfiguredArtdaqHosts()
1967 
1968 //==============================================================================
1969 std::list<std::string> ots::ARTDAQSupervisor::tokenize_(std::string const& input)
1970 {
1971  size_t pos = 0;
1972  std::list<std::string> output;
1973 
1974  while(pos != std::string::npos && pos < input.size())
1975  {
1976  auto newpos = input.find('\n', pos);
1977  if(newpos != std::string::npos)
1978  {
1979  output.emplace_back(input, pos, newpos - pos);
1980  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1981  pos = newpos + 1;
1982  }
1983  else
1984  {
1985  output.emplace_back(input, pos);
1986  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1987  pos = newpos;
1988  }
1989  }
1990  return output;
1991 } // end tokenize_()
1992 
1993 //==============================================================================
1994 void ots::ARTDAQSupervisor::daqinterfaceRunner_()
1995 try
1996 {
1997  TLOG(TLVL_TRACE) << "Runner thread starting";
1998  runner_running_ = true;
1999  while(runner_running_)
2000  {
2001  if(daqinterface_ptr_ != NULL)
2002  {
2003  std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_);
2004  getDAQState_();
2005  std::string state_before = daqinterface_state_;
2006 
2007  __SUP_COUTS__(2) << "Runner state_before=" << state_before
2008  << " state now=" << daqinterface_state_
2009  << " ?= running, ready, or booted" << __E__;
2010 
2011  if(daqinterface_state_ == "running" || daqinterface_state_ == "ready" ||
2012  daqinterface_state_ == "booted")
2013  {
2014  try
2015  {
2016  TLOG(TLVL_TRACE) << "Calling DAQInterface::check_proc_heartbeats";
2017  PyObjectGuard pName(PyUnicode_FromString("check_proc_heartbeats"));
2018  PyObjectGuard res(
2019  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
2020  __COUT_MULTI_LBL__(1,
2021  captureStderrAndStdout_("check_proc_heartbeats"),
2022  "check_proc_heartbeats");
2023  TLOG(TLVL_TRACE)
2024  << "Done with DAQInterface::check_proc_heartbeats call";
2025 
2026  if(res.get() == NULL)
2027  {
2028  runner_running_ = false;
2029  std::string err = capturePyErr("check_proc_heartbeats");
2030  __SS__ << "Error calling check_proc_heartbeats function: " << err
2031  << __E__;
2032  __SUP_SS_THROW__;
2033  break;
2034  }
2035  }
2036  catch(cet::exception& ex)
2037  {
2038  runner_running_ = false;
2039  std::string err = capturePyErr("check_proc_heartbeats");
2040  __SS__ << "An cet::exception occurred while calling "
2041  "check_proc_heartbeats function "
2042  << ex.explain_self() << ": " << err << __E__;
2043  __SUP_SS_THROW__;
2044  break;
2045  }
2046  catch(std::exception& ex)
2047  {
2048  runner_running_ = false;
2049  std::string err = capturePyErr("check_proc_heartbeats");
2050  __SS__ << "An std::exception occurred while calling "
2051  "check_proc_heartbeats function: "
2052  << ex.what() << "\n\n"
2053  << err << __E__;
2054  __SUP_SS_THROW__;
2055  break;
2056  }
2057  catch(...)
2058  {
2059  runner_running_ = false;
2060  std::string err = capturePyErr("check_proc_heartbeats");
2061  __SS__ << "An unknown Error occurred while calling "
2062  "check_proc_heartbeats function: "
2063  << err << __E__;
2064  __SUP_SS_THROW__;
2065  break;
2066  }
2067 
2068  lk.unlock();
2069  getDAQState_();
2070  if(daqinterface_state_ != state_before)
2071  {
2072  runner_running_ = false;
2073  lk.unlock();
2074  __SS__ << "DAQInterface state unexpectedly changed from "
2075  << state_before << " to " << daqinterface_state_
2076  << ". Check supervisor log file for more info!" << __E__;
2077  __SUP_SS_THROW__;
2078  break;
2079  }
2080  }
2081  }
2082  else
2083  {
2084  __SUP_COUT__ << "daqinterface_ptr_ is null" << __E__;
2085  break;
2086  }
2087  usleep(1000000);
2088  }
2089  runner_running_ = false;
2090  TLOG(TLVL_TRACE) << "Runner thread complete";
2091 } // end daqinterfaceRunner_()
2092 catch(...)
2093 {
2094  __SS__ << "An error occurred in "
2095  "start_runner_/daqinterfaceRunner_ thread "
2096  << __E__;
2097  try
2098  {
2099  throw;
2100  }
2101  catch(const std::runtime_error& e)
2102  {
2103  ss << "Here is the error: " << e.what() << __E__;
2104  }
2105  catch(...)
2106  {
2107  ss << "Unexpected error!" << __E__;
2108  }
2109  __COUT_ERR__ << ss.str();
2110 
2111  {
2112  std::lock_guard<std::mutex> lock(
2113  thread_mutex_); // lock out for remainder of scope
2114  thread_error_message_ = ss.str();
2115  }
2116 
2117  theStateMachine_.setErrorMessage(ss.str());
2118 
2119  sendAsyncExceptionToGateway( //0 for both pause/stop indicates error
2120  ss.str(),
2121  0 /* isPauseException */,
2122  0 /* isStopException */);
2123 
2124 } // end daqinterfaceRunner_() catch
2125 
2126 //==============================================================================
2127 void ots::ARTDAQSupervisor::stop_runner_()
2128 {
2129  runner_running_ = false;
2130  if(runner_thread_ && runner_thread_->joinable())
2131  {
2132  runner_thread_->join();
2133  runner_thread_.reset(nullptr);
2134  }
2135 } // end stop_runner_()
2136 
2137 //==============================================================================
2138 void ots::ARTDAQSupervisor::start_runner_()
2139 {
2140  stop_runner_();
2141  runner_thread_ =
2142  std::make_unique<std::thread>(&ots::ARTDAQSupervisor::daqinterfaceRunner_, this);
2143 } // end start_runner_()
virtual void transitionHalting(toolbox::Event::Reference event) override
virtual void transitionInitializing(toolbox::Event::Reference event) override
static const std::string ARTDAQ_FCL_PATH
Tree-path rule is, if the last link in the path is a group link with a specified group ID,...
static std::string getBootFileContentFromInfo(const ARTDAQInfo &info, const std::string &setupScript, int debugLevel)
ConfigurationTree getNode(const std::string &nodeString, bool doNotThrowOnBrokenUIDLinks=false) const
"root/parent/parent/"
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
const std::string & getValueAsString(bool returnLinkTableValue=false) const
void getValue(T &value) const
ITRACEController * theTRACEController_
only define for an app that receives a command
bool isComplete()
get functions
Definition: ProgressBar.cc:88
void step()
thread safe
Definition: ProgressBar.cc:74
int read()
if stepsToComplete==0, then define any progress as 50%, thread safe
Definition: ProgressBar.cc:120
void complete()
declare complete, thread safe
Definition: ProgressBar.cc:95
defines used also by OtsConfigurationWizardSupervisor
void INIT_MF(const char *name)
static std::string setToString(const std::set< T > &setToReturn, const std::string &delimeter=", ")
setToString ~
std::string name
Also key in map.