otsdaq  3.07.00
ARTDAQSupervisor.cc
1 
2 
3 #define TRACEMF_USE_VERBATIM 1 // for trace longer path filenames
4 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisor.hh"
5 
6 #include "artdaq-core/Utilities/configureMessageFacility.hh"
7 #include "artdaq/BuildInfo/GetPackageBuildInfo.hh"
8 #include "artdaq/DAQdata/Globals.hh"
9 #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
10 #include "cetlib_except/exception.h"
11 #include "fhiclcpp/make_ParameterSet.h"
12 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisorTRACEController.h"
13 
14 #include "artdaq-core/Utilities/ExceptionHandler.hh" /*for artdaq::ExceptionHandler*/
15 
16 #include <boost/exception/all.hpp>
17 #include <boost/filesystem.hpp>
18 
19 #include <signal.h>
20 #include <cerrno>
21 #include <cstring>
22 #include <regex>
23 
24 #define OUT_ON_ERR_SIZE 2000 //tail size of output to include on error
25 
26 using namespace ots;
27 
28 XDAQ_INSTANTIATOR_IMPL(ARTDAQSupervisor)
29 
30 #define FAKE_CONFIG_NAME "ots_config"
31 #define DAQINTERFACE_PORT \
32  std::atoi(__ENV__("ARTDAQ_BASE_PORT")) + \
33  (partition_ * std::atoi(__ENV__("ARTDAQ_PORTS_PER_PARTITION")))
34 
35 static ARTDAQSupervisor* instance = nullptr;
36 static std::unordered_map<int, struct sigaction> old_actions =
37  std::unordered_map<int, struct sigaction>();
38 static bool sighandler_init = false;
39 static void signal_handler(int signum)
40 {
41 // Messagefacility may already be gone at this point, TRACE ONLY!
42 #if TRACE_REVNUM < 1459
43  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
44 #else
45  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
46 #endif
47  << "A signal of type " << signum
48  << " was caught by ARTDAQSupervisor. Shutting down DAQInterface, "
49  "then proceeding with default handlers!";
50 
51  if(instance)
52  instance->destroy();
53 
54  sigset_t set;
55  pthread_sigmask(SIG_UNBLOCK, NULL, &set);
56  pthread_sigmask(SIG_UNBLOCK, &set, NULL);
57 
58 #if TRACE_REVNUM < 1459
59  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
60 #else
61  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
62 #endif
63  << "Calling default signal handler";
64  if(signum != SIGUSR2)
65  {
66  sigaction(signum, &old_actions[signum], NULL);
67  kill(getpid(), signum); // Only send signal to self
68  }
69  else
70  {
71  // Send Interrupt signal if parsing SIGUSR2 (i.e. user-defined exception that
72  // should tear down ARTDAQ)
73  sigaction(SIGINT, &old_actions[SIGINT], NULL);
74  kill(getpid(), SIGINT); // Only send signal to self
75  }
76 }
77 
78 static void init_sighandler(ARTDAQSupervisor* inst)
79 {
80  static std::mutex sighandler_mutex;
81  std::unique_lock<std::mutex> lk(sighandler_mutex);
82 
83  if(!sighandler_init)
84  {
85  sighandler_init = true;
86  instance = inst;
87  std::vector<int> signals = {
88  SIGINT,
89  SIGILL,
90  SIGABRT,
91  SIGFPE,
92  SIGSEGV,
93  SIGPIPE,
94  SIGALRM,
95  SIGTERM,
96  SIGUSR2,
97  SIGHUP}; // SIGQUIT is used by art in normal operation
98  for(auto signal : signals)
99  {
100  struct sigaction old_action;
101  sigaction(signal, NULL, &old_action);
102 
103  // If the old handler wasn't SIG_IGN (it's a handler that just
104  // "ignore" the signal)
105  if(old_action.sa_handler != SIG_IGN)
106  {
107  struct sigaction action;
108  action.sa_handler = signal_handler;
109  sigemptyset(&action.sa_mask);
110  for(auto sigblk : signals)
111  {
112  sigaddset(&action.sa_mask, sigblk);
113  }
114  action.sa_flags = 0;
115 
116  // Replace the signal handler of SIGINT with the one described by
117  // new_action
118  sigaction(signal, &action, NULL);
119  old_actions[signal] = old_action;
120  }
121  }
122  }
123 }
124 
125 //==============================================================================
126 ARTDAQSupervisor::ARTDAQSupervisor(xdaq::ApplicationStub* stub)
127  : CoreSupervisorBase(stub)
128  , daqinterface_ptr_(NULL)
129  , partition_(getSupervisorProperty("partition", 0))
130  , daqinterface_state_("notrunning")
131  , runner_thread_(nullptr)
132 {
133  __SUP_COUT__ << "Constructor." << __E__;
134 
135  INIT_MF("." /*directory used is USER_DATA/LOG/.*/);
136  init_sighandler(this);
137 
138  // Only use system Python
139  // unsetenv("PYTHONPATH");
140  // unsetenv("PYTHONHOME");
141 
142  // Write out settings file
143  auto settings_file = __ENV__("DAQINTERFACE_SETTINGS");
144  std::ofstream of(settings_file, std::ios::trunc);
145  const int openErrno = errno; // capture errno immediately after open attempt
146  if(!of.is_open() || of.fail())
147  {
148  __SS__ << "Failed to open DAQINTERFACE_SETTINGS file '" << settings_file
149  << "' for writing: " << strerror(openErrno) << __E__;
150  __SS_THROW__;
151  }
152  std::stringstream o;
153 
154  setenv("DAQINTERFACE_PARTITION_NUMBER", std::to_string(partition_).c_str(), 1);
155  auto logfileName = std::string(__ENV__("OTSDAQ_LOG_DIR")) +
156  "/DAQInteface/DAQInterface_partition" +
157  std::to_string(partition_) + ".log";
158  setenv("DAQINTERFACE_LOGFILE", logfileName.c_str(), 1);
159 
160  o << "log_directory: "
161  << getSupervisorProperty("log_directory", std::string(__ENV__("OTSDAQ_LOG_DIR")))
162  << std::endl;
163 
164  {
165  const std::string record_directory = getSupervisorProperty(
166  "record_directory", ARTDAQTableBase::ARTDAQ_FCL_PATH + "/run_records/");
167  mkdir(record_directory.c_str(), 0755);
168  o << "record_directory: " << record_directory << std::endl;
169  }
170 
171  o << "package_hashes_to_save: "
172  << getSupervisorProperty("package_hashes_to_save", "[artdaq]") << std::endl;
173 
174  o << "spack_root_for_bash_scripts: "
175  << getSupervisorProperty("spack_root_for_bash_scripts",
176  std::string(__ENV__("SPACK_ROOT")))
177  << std::endl;
178  o << "boardreader timeout: " << getSupervisorProperty("boardreader_timeout", 30)
179  << std::endl;
180  o << "eventbuilder timeout: " << getSupervisorProperty("eventbuilder_timeout", 30)
181  << std::endl;
182  o << "datalogger timeout: " << getSupervisorProperty("datalogger_timeout", 30)
183  << std::endl;
184  o << "dispatcher timeout: " << getSupervisorProperty("dispatcher_timeout", 30)
185  << std::endl;
186  // Only put max_fragment_size_bytes into DAQInterface settings file if advanced_memory_usage is disabled
187  if(!getSupervisorProperty("advanced_memory_usage", false))
188  {
189  o << "max_fragment_size_bytes: "
190  << getSupervisorProperty("max_fragment_size_bytes", 1048576) << std::endl;
191  }
192  o << "transfer_plugin_to_use: "
193  << getSupervisorProperty("transfer_plugin_to_use", "TCPSocket") << std::endl;
194  if(getSupervisorProperty("transfer_plugin_from_brs", "") != "")
195  {
196  o << "transfer_plugin_from_brs: "
197  << getSupervisorProperty("transfer_plugin_from_brs", "") << std::endl;
198  }
199  if(getSupervisorProperty("transfer_plugin_from_ebs", "") != "")
200  {
201  o << "transfer_plugin_from_ebs: "
202  << getSupervisorProperty("transfer_plugin_from_ebs", "") << std::endl;
203  }
204  if(getSupervisorProperty("transfer_plugin_from_dls", "") != "")
205  {
206  o << "transfer_plugin_from_dls: "
207  << getSupervisorProperty("transfer_plugin_from_dls", "") << std::endl;
208  }
209  o << "all_events_to_all_dispatchers: " << std::boolalpha
210  << getSupervisorProperty("all_events_to_all_dispatchers", true) << std::endl;
211  if(getSupervisorProperty("data_directory_override", "") != "")
212  {
213  o << "data_directory_override: "
214  << getSupervisorProperty("data_directory_override", "") << std::endl;
215  }
216  o << "max_configurations_to_list: "
217  << getSupervisorProperty("max_configurations_to_list", 10) << std::endl;
218  o << "disable_unique_rootfile_labels: "
219  << getSupervisorProperty("disable_unique_rootfile_labels", false) << std::endl;
220  o << "use_messageviewer: " << std::boolalpha
221  << getSupervisorProperty("use_messageviewer", false) << std::endl;
222  o << "use_messagefacility: " << std::boolalpha
223  << getSupervisorProperty("use_messagefacility", true) << std::endl;
224  o << "fake_messagefacility: " << std::boolalpha
225  << getSupervisorProperty("fake_messagefacility", false) << std::endl;
226  o << "kill_existing_processes: " << std::boolalpha
227  << getSupervisorProperty("kill_existing_processes", true) << std::endl;
228  o << "advanced_memory_usage: " << std::boolalpha
229  << getSupervisorProperty("advanced_memory_usage", false) << std::endl;
230  o << "strict_fragment_id_mode: " << std::boolalpha
231  << getSupervisorProperty("strict_fragment_id_mode", false) << std::endl;
232  o << "disable_private_network_bookkeeping: " << std::boolalpha
233  << getSupervisorProperty("disable_private_network_bookkeeping", false) << std::endl;
234  o << "allowed_processors: "
235  << getSupervisorProperty(
236  "allowed_processors",
237  "0-255") // Note this sets a taskset for ALL processes, on all nodes (ex. "1,2,5-7")
238  << std::endl;
239  if(getSupervisorProperty("partition_label_format", "") !=
240  "") //Add to ARTDAQSupervisor properties partition_label_format: "-P%s"
241  o << "partition_label_format: "
242  << getSupervisorProperty("partition_label_format", "") << std::endl;
243 
244  __COUT_MULTI__(0, o.str());
245 
246  of << o.str();
247  of.close();
248 
249  // destroy current TRACEController and instantiate ARTDAQSupervisorTRACEController
250  if(CorePropertySupervisorBase::theTRACEController_)
251  {
252  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
253  delete CorePropertySupervisorBase::
254  theTRACEController_; // destruct current TRACEController
255  CorePropertySupervisorBase::theTRACEController_ = nullptr;
256  }
257  CorePropertySupervisorBase::theTRACEController_ =
259  ((ARTDAQSupervisorTRACEController*)CorePropertySupervisorBase::theTRACEController_)
260  ->setSupervisorPtr(this);
261 
262  __SUP_COUT__ << "Constructed." << __E__;
263 } // end constructor()
264 
265 //==============================================================================
266 ARTDAQSupervisor::~ARTDAQSupervisor(void)
267 {
268  __SUP_COUT__ << "Destructor." << __E__;
269  destroy();
270 
271  __SUP_COUT__ << "Calling Py_Finalize()" << __E__;
272  Py_Finalize();
273 
274  // CorePropertySupervisorBase would destroy, but since it was created here, attempt to destroy
276  {
277  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
280  }
281 
282  __SUP_COUT__ << "Destructed." << __E__;
283 } // end destructor()
284 
285 //==============================================================================
286 void ARTDAQSupervisor::destroy(void)
287 {
288  __SUP_COUT__ << "Destroying..." << __E__;
289 
290  if(daqinterface_ptr_ != NULL)
291  {
292  __SUP_COUT__ << "Calling recover transition" << __E__;
293  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
294 
295  PyObjectGuard pName(PyUnicode_FromString("do_recover"));
296  PyObjectGuard res(
297  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
298 
299  __SUP_COUT__ << "Making sure that correct state has been reached" << __E__;
300  getDAQState_();
301  while(daqinterface_state_ != "stopped")
302  {
303  getDAQState_();
304  __SUP_COUT__ << "State is " << daqinterface_state_
305  << ", waiting 1s and retrying..." << __E__;
306  usleep(1000000);
307  }
308 
309  // Cleanup
310  Py_XDECREF(daqinterface_ptr_);
311  daqinterface_ptr_ = NULL;
312  }
313 
314  __SUP_COUT__ << "Flusing printouts" << __E__;
315 
316  //make sure to flush printouts
317  PyRun_SimpleString(R"(
318 import sys
319 sys.stdout = sys.__stdout__
320 sys.stderr = sys.__stderr__
321 )");
322  // stringIO_out_ and stringIO_err_ may refer to borrowed Python objects
323  // (e.g., via PyDict_GetItemString in the tee-buffer path). Do not DECREF
324  // them here to avoid corrupting their reference counts; treat them as
325  // non-owning pointers and clear them instead.
326  stringIO_out_ = nullptr;
327  stringIO_err_ = nullptr;
328 
329  __SUP_COUT__ << "Thread and garbage cleanup" << __E__;
330  // force python thread cleanup:
331  PyRun_SimpleString(
332  "import threading; [t.join() for t in threading.enumerate() if t is not "
333  "threading.main_thread() and not isinstance(t, threading._DummyThread)]");
334  PyRun_SimpleString("import gc; gc.collect()");
335  // __SUP_COUT__ << "Calling Py_Finalize()" << __E__;
336  // Py_Finalize();
337 
338  __SUP_COUT__ << "Destroyed." << __E__;
339 } // end destroy()
340 
341 //==============================================================================
342 void ARTDAQSupervisor::init(void)
343 {
344  stop_runner_();
345 
346  __SUP_COUT__ << "Initializing..." << __E__;
347  {
348  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
349 
350  // allSupervisorInfo_.init(getApplicationContext());
351  artdaq::configureMessageFacility("ARTDAQSupervisor");
352  __SUP_COUT__ << "artdaq MF configured." << __E__;
353 
354  // initialization
355  char* daqinterface_dir = getenv("ARTDAQ_DAQINTERFACE_DIR");
356  if(daqinterface_dir == NULL)
357  {
358  __SS__ << "ARTDAQ_DAQINTERFACE_DIR environment variable not set! This "
359  "means that DAQInterface has not been setup!"
360  << __E__;
361  __SUP_SS_THROW__;
362  }
363  else
364  {
365  __SUP_COUT__ << "Initializing Python" << __E__;
366  Py_Initialize();
367 
368  //setup Python output to tee output to stdout/err and to StringIO buffer "tee_buffer"
369  PyRun_SimpleString(
370  "import sys\n"
371  "from io import StringIO\n"
372  "\n"
373  "class TeeOut:\n"
374  " def __init__(self, real, buf):\n"
375  " self.real = real\n"
376  " self.buf = buf\n"
377  " def write(self, data):\n"
378  " self.real.write(data)\n"
379  " self.buf.write(data)\n"
380  " def flush(self):\n"
381  " self.real.flush()\n"
382  " self.buf.flush()\n"
383  "\n"
384  "tee_buffer = StringIO()\n"
385  "sys.stdout = TeeOut(sys.stdout, tee_buffer)\n"
386  "sys.stderr = TeeOut(sys.stderr, tee_buffer)\n");
387 
388  __SUP_COUT__ << "Adding DAQInterface directory to PYTHON_PATH" << __E__;
389  PyObject* sysPath = PySys_GetObject(
390  (char*)"path"); //do NOT DECREF borrowed objects through GetObject!
391  PyObjectGuard programName(PyUnicode_FromString(daqinterface_dir));
392  PyList_Append(sysPath, programName.get());
393 
394  __SUP_COUT__ << "Creating Module name" << __E__;
395  PyObjectGuard pName(PyUnicode_FromString("rc.control.daqinterface"));
396  /* Error checking of pName left out */
397 
398  __SUP_COUT__ << "Importing module" << __E__;
399  PyObjectGuard pModule(PyImport_Import(pName.get()));
400 
401  if(pModule.get() == NULL)
402  {
403  std::string err = capturePyErr("import rc.control.daqinterface");
404  __SS__ << "Failed to load rc.control.daqinterface. Python Exception: "
405  << err << __E__;
406  __SUP_SS_THROW__;
407  }
408  else
409  {
410  __SUP_COUT__ << "Loading python module dictionary" << __E__;
411  PyObject* pDict = PyModule_GetDict(
412  pModule.get()); //do NOT DECREF borrowed objects through GetDict!
413  if(pDict == NULL)
414  {
415  std::string err = capturePyErr("module dict");
416  __SS__ << "Unable to load module dictionary. Python Exception: "
417  << err << __E__;
418  __SUP_SS_THROW__;
419  }
420  else
421  {
422  __SUP_COUT__ << "Getting DAQInterface object pointer" << __E__;
423  PyObject* di_obj_raw = PyDict_GetItemString(
424  pDict, "DAQInterface"); // borrowed reference
425  if(di_obj_raw == NULL)
426  {
427  std::string err = capturePyErr("DAQInterface lookup");
428  __SS__ << "Unable to find 'DAQInterface' in module dictionary. "
429  "Python Exception: "
430  << err << __E__;
431  __SUP_SS_THROW__;
432  }
433  Py_INCREF(di_obj_raw); // convert borrowed reference to owned
434  PyObjectGuard di_obj_ptr(di_obj_raw);
435 
436  __SUP_COUT__ << "Filling out DAQInterface args struct" << __E__;
437  PyObjectGuard pArgs(PyTuple_New(0));
438 
439  PyObjectGuard kwargs(Py_BuildValue("{s:s, s:s, s:i, s:i, s:s, s:s}",
440  "logpath",
441  ".daqint.log",
442  "name",
443  "DAQInterface",
444  "partition_number",
445  partition_,
446  "rpc_port",
447  DAQINTERFACE_PORT,
448  "rpc_host",
449  "localhost",
450  "control_host",
451  "localhost"));
452 
453  __SUP_COUT__ << "Calling DAQInterface Object Constructor" << __E__;
454 
455  // Get sys and io
456  PyObjectGuard sys(PyImport_ImportModule("sys"));
457  PyObjectGuard io(PyImport_ImportModule("io"));
458 
459  if(0)
460  {
461  //------------- redirect stdout to string
462 
463  // Create StringIO objects for stdout and stderr
464  stringIO_out_ = PyObject_CallMethod(io.get(), "StringIO", NULL);
465  stringIO_err_ = PyObject_CallMethod(io.get(), "StringIO", NULL);
466 
467  // Save originals (not needed, since just keep the redirection until daqinterface_ptr_ is destructed)
468  // PyObject* sys_stdout = PyObject_GetAttrString(sys, "stdout");
469  // PyObject* sys_stderr = PyObject_GetAttrString(sys, "stderr");
470 
471  // Redirect
472  PyObject_SetAttrString(sys.get(), "stdout", stringIO_out_);
473  PyObject_SetAttrString(sys.get(), "stderr", stringIO_err_);
474  //------------- end redirect stdout to string
475  }
476  else //capture tee buffer instead so output to console continues
477  {
478  PyObject* mainmod =
479  PyImport_AddModule("__main__"); // borrowed ref
480  PyObject* globals = PyModule_GetDict(mainmod); // borrowed ref
481 
482  stringIO_out_ =
483  PyDict_GetItemString(globals, "tee_buffer"); // borrowed ref
484 
485  // Do not Py_DECREF borrowed references.
486  }
487 
488  daqinterface_ptr_ =
489  PyObject_Call(di_obj_ptr.get(), pArgs.get(), kwargs.get());
490  if(checkPythonError(daqinterface_ptr_))
491  {
492  std::string err = capturePyErr("DAQInterface constructor");
493  __SS__ << "DAQInterface constructor failed. Python Exception: "
494  << err << __E__;
495  __SUP_SS_THROW__;
496  }
497 
498  if(0) //example printout handling
499  {
500  // Force an error
501  PyObjectGuard bad(
502  PyObject_CallMethod(sys.get(), "does_not_exist", NULL));
503  if(!bad.get())
504  PyErr_Print(); // <-- this writes into stringIO_err_, not the terminal
505 
506  // Grab stderr contents
507  PyObjectGuard err_text(
508  PyObject_CallMethod(stringIO_err_, "getvalue", NULL));
509  if(err_text.get())
510  __COUT__ << "Captured stderr:\n"
511  << PyUnicode_AsUTF8(err_text.get()) << "\n";
512  else
513  __COUT__ << "Capture of stderr failed.";
514  } //end example printout handling
515  }
516  }
517  }
518 
519  getDAQState_();
520 
521  // { //attempt to cleanup old artdaq processes DOES NOT WORK because artdaq interface knows it hasn't started
522  // __SUP_COUT__ << "Attempting artdaq stale cleanup..." << __E__;
523  // std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
524  // getDAQState_();
525  // __SUP_COUT__ << "Status before cleanup: " << daqinterface_state_ << __E__;
526 
527  // PyObjectGuard pName(PyUnicode_FromString("do_recover"));
528  // PyObjectGuard res(PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL));
529  // __COUT_MULTI_LBL__(0,captureStderrAndStdout_("do_recover"),"do_recover");
530 
531  // if(res == NULL)
532  // {
533  // std::string err = capturePyErr("do_recover");
534  // __SS__ << "Error with clean up calling do_recover: " << err << __E__;
535  // __SUP_SS_THROW__;
536  // }
537  // getDAQState_();
538  // __SUP_COUT__ << "Status after cleanup: " << daqinterface_state_ << __E__;
539  // __SUP_COUT__ << "cleanup DONE." << __E__;
540  // }
541  }
542  start_runner_();
543  __SUP_COUT__ << "Initialized." << __E__;
544 } // end init()
545 
546 //==============================================================================
547 void ARTDAQSupervisor::transitionConfiguring(toolbox::Event::Reference /*event*/)
548 {
549  __SUP_COUTT__ << "transitionConfiguring" << __E__;
550 
551  // activate the configuration tree (the first iteration)
552  if(RunControlStateMachine::getIterationIndex() == 0 &&
553  RunControlStateMachine::getSubIterationIndex() == 0)
554  {
555  thread_error_message_ = "";
556  thread_progress_bar_.resetProgressBar(0);
557  last_thread_progress_update_ = time(0); // initialize timeout timer
558 
559  CoreSupervisorBase::configureInit();
560 
561  // start configuring thread
562  std::thread(&ARTDAQSupervisor::configuringThread, this).detach();
563 
564  __SUP_COUT__ << "Configuring thread started." << __E__;
565 
566  RunControlStateMachine::
567  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
568  }
569  else // not first time
570  {
571  std::string errorMessage;
572  {
573  std::lock_guard<std::mutex> lock(
574  thread_mutex_); // lock out for remainder of scope
575  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
576  }
577  int progress = thread_progress_bar_.read();
578  __SUP_COUTVS__(2, errorMessage);
579  __SUP_COUTVS__(2, progress);
580  __SUP_COUTVS__(2, thread_progress_bar_.isComplete());
581 
582  // check for done and error messages
583  if(errorMessage == "" && // if no update in 600 seconds, give up
584  time(0) - last_thread_progress_update_ > 600)
585  {
586  __SUP_SS__ << "There has been no update from the configuration thread for "
587  << (time(0) - last_thread_progress_update_)
588  << " seconds, assuming something is wrong and giving up! "
589  << "Last progress received was " << progress << __E__;
590  errorMessage = ss.str();
591  }
592 
593  if(errorMessage != "")
594  {
595  __SUP_SS__ << "Error was caught in configuring thread: " << errorMessage
596  << __E__;
597  __SUP_COUT_ERR__ << "\n" << ss.str();
598 
599  theStateMachine_.setErrorMessage(ss.str());
600  throw toolbox::fsm::exception::Exception(
601  "Transition Error" /*name*/,
602  ss.str() /* message*/,
603  "CoreSupervisorBase::transitionConfiguring" /*module*/,
604  __LINE__ /*line*/,
605  __FUNCTION__ /*function*/
606  );
607  }
608 
609  if(!thread_progress_bar_.isComplete())
610  {
611  __SUP_COUTT__ << "Not done yet..." << __E__;
612  //attempt to get live view of python output
613  // __COUT_MULTI_LBL__(0, captureStderrAndStdout_("statuscheck"), "statuscheck");
614 
615  RunControlStateMachine::
616  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
617 
618  if(last_thread_progress_read_ != progress)
619  {
620  last_thread_progress_read_ = progress;
621  last_thread_progress_update_ = time(0);
622  }
623 
624  sleep(1 /*seconds*/);
625  }
626  else
627  {
628  __SUP_COUT_INFO__ << "Complete configuring transition!" << __E__;
629  __SUP_COUTV__(getProcessInfo_());
630  }
631  }
632 
633  return;
634 } // end transitionConfiguring()
635 
636 //==============================================================================
637 void ARTDAQSupervisor::configuringThread()
638 try
639 {
640  std::string uid = theConfigurationManager_
641  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
642  "/" + CorePropertySupervisorBase::getSupervisorUID() +
643  "/" + "LinkToSupervisorTable")
644  .getValueAsString();
645 
646  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
647 
648  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
649 
650  ConfigurationTree theSupervisorNode = getSupervisorTableNode();
651 
652  thread_progress_bar_.step();
653 
654  set_thread_message_("ConfigGen");
655 
656  auto info = ARTDAQTableBase::extractARTDAQInfo(
657  theSupervisorNode,
658  false /*getStatusFalseNodes*/,
659  true /*doWriteFHiCL*/,
660  getSupervisorProperty("max_fragment_size_bytes", 8888),
661  getSupervisorProperty("routing_timeout_ms", 1999),
662  getSupervisorProperty("routing_retry_count", 12),
663  &thread_progress_bar_);
664 
665  // Check lists
666  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::BoardReader) == 0)
667  {
668  __GEN_SS__ << "There must be at least one enabled BoardReader!" << __E__;
669  __GEN_SS_THROW__;
670  }
671  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::EventBuilder) == 0)
672  {
673  __GEN_SS__ << "There must be at least one enabled EventBuilder!" << __E__;
674  __GEN_SS_THROW__;
675  }
676 
677  thread_progress_bar_.step();
678  set_thread_message_("Writing boot.txt");
679 
680  __GEN_COUT__ << "Writing boot.txt" << __E__;
681 
682  int debugLevel = theSupervisorNode.getNode("DAQInterfaceDebugLevel").getValue<int>();
683  std::string setupScript = theSupervisorNode.getNode("DAQSetupScript").getValue();
684 
685  // Generate boot file content using helper function
686  std::string bootContent =
687  ARTDAQTableBase::getBootFileContentFromInfo(info, setupScript, debugLevel);
688 
689  // Populate label_to_proc_type_map_ (still needed for later)
690  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
691  label_to_proc_type_map_[builder.label] = "EventBuilder";
692  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
693  label_to_proc_type_map_[logger.label] = "DataLogger";
694  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
695  label_to_proc_type_map_[dispatcher.label] = "Dispatcher";
696  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
697  label_to_proc_type_map_[rmanager.label] = "RoutingManager";
698 
699  // Write boot.txt file
700  std::ofstream o(ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt", std::ios::trunc);
701  o << bootContent;
702  o.close();
703 
704  // TODO: To save to runlog, store bootContent in metadata/configuration archive
705  // Example (add when implementing runlog integration):
706  // saveToRunlog("boot.txt", bootContent, run_number);
707 
708  thread_progress_bar_.step();
709  set_thread_message_("Writing Fhicl Files");
710 
711  __GEN_COUT__ << "Building configuration directory" << __E__;
712 
713  boost::system::error_code ignored;
714  boost::filesystem::remove_all(ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME,
715  ignored);
716  mkdir((ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME).c_str(), 0755);
717 
718  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
719  {
720  symlink(ARTDAQTableBase::getFlatFHICLFilename(
721  ARTDAQTableBase::ARTDAQAppType::BoardReader, reader.label)
722  .c_str(),
723  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
724  reader.label + ".fcl")
725  .c_str());
726  }
727  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
728  {
729  symlink(ARTDAQTableBase::getFlatFHICLFilename(
730  ARTDAQTableBase::ARTDAQAppType::EventBuilder, builder.label)
731  .c_str(),
732  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
733  builder.label + ".fcl")
734  .c_str());
735  }
736  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
737  {
738  symlink(ARTDAQTableBase::getFlatFHICLFilename(
739  ARTDAQTableBase::ARTDAQAppType::DataLogger, logger.label)
740  .c_str(),
741  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
742  logger.label + ".fcl")
743  .c_str());
744  }
745  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
746  {
747  symlink(ARTDAQTableBase::getFlatFHICLFilename(
748  ARTDAQTableBase::ARTDAQAppType::Dispatcher, dispatcher.label)
749  .c_str(),
750  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
751  dispatcher.label + ".fcl")
752  .c_str());
753  }
754  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
755  {
756  symlink(ARTDAQTableBase::getFlatFHICLFilename(
757  ARTDAQTableBase::ARTDAQAppType::RoutingManager, rmanager.label)
758  .c_str(),
759  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
760  rmanager.label + ".fcl")
761  .c_str());
762  }
763 
764  thread_progress_bar_.step();
765 
766  // Block 1: State check — acquire and release daqinterface_pythonMutex_
767  // so the runner thread and halt transition can interleave between steps
768  {
769  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
770  getDAQState_();
771  if(daqinterface_state_ != "stopped" && daqinterface_state_ != "")
772  {
773  __GEN_SS__ << "Cannot configure DAQInterface because it is in the wrong state"
774  << " (" << daqinterface_state_ << " != stopped)!" << __E__;
775  __GEN_SS_THROW__
776  }
777 
778  if(daqinterface_ptr_ == nullptr)
779  {
780  __GEN_SS__ << "DAQInterface is not initialized. "
781  "Check earlier Python import/constructor errors (e.g. syntax) "
782  "in DAQInterface."
783  << __E__;
784  __GEN_SS_THROW__;
785  }
786  } // end Block 1 — release daqinterface_pythonMutex_
787 
788  // Block 2: setdaqcomps
789  set_thread_message_("Calling setdaqcomps");
790  __GEN_COUT__ << "Calling setdaqcomps" << __E__;
791  {
792  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
793 
794  __GEN_COUT__ << "Status before setdaqcomps: " << daqinterface_state_ << __E__;
795 
796  PyObjectGuard pName1(PyUnicode_FromString("setdaqcomps"));
797 
798  PyObjectGuard readerDict(PyDict_New());
799  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
800  {
801  // PyDict_SetItem INCREFs key/value, so use PyObjectGuard to manage references
802  label_to_proc_type_map_[reader.label] = "BoardReader";
803  PyObjectGuard readerName(PyUnicode_FromString(reader.label.c_str()));
804 
805  int list_size = reader.allowed_processors != "" ? 4 : 3;
806 
807  PyObjectGuard readerData(PyList_New(list_size));
808  PyObject* readerHost = PyUnicode_FromString(reader.hostname.c_str());
809  PyObject* readerPort = PyUnicode_FromString("-1");
810  PyObject* readerSubsystem =
811  PyUnicode_FromString(std::to_string(reader.subsystem).c_str());
812  PyList_SetItem(readerData.get(), 0, readerHost);
813  PyList_SetItem(readerData.get(), 1, readerPort);
814  PyList_SetItem(readerData.get(), 2, readerSubsystem);
815  if(reader.allowed_processors != "")
816  {
817  PyObject* readerAllowedProcessors =
818  PyUnicode_FromString(reader.allowed_processors.c_str());
819  PyList_SetItem(readerData.get(), 3, readerAllowedProcessors);
820  }
821  PyDict_SetItem(readerDict.get(), readerName.get(), readerData.get());
822  }
823  PyObjectGuard res1(PyObject_CallMethodObjArgs(
824  daqinterface_ptr_, pName1.get(), readerDict.get(), NULL));
825  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("setdaqcomps"), "setdaqcomps");
826 
827  if(checkPythonError(res1.get()))
828  {
829  std::string err_msg = capturePyErr("setdaqcomps");
830  __GEN_SS__ << "Error calling setdaqcomps: " << err_msg << __E__;
831  __GEN_SS_THROW__;
832  }
833 
834  getDAQState_();
835  __GEN_COUT__ << "Status after setdaqcomps: " << daqinterface_state_ << __E__;
836  } // end Block 2 — release daqinterface_pythonMutex_
837 
838  thread_progress_bar_.step();
839 
840  // Block 3: do_boot (with recover + retry)
841  set_thread_message_("Calling do_boot");
842  __GEN_COUT_INFO__ << "Calling do_boot" << __E__;
843  std::string doBootOutput = "";
844  {
845  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
846 
847  __GEN_COUT__ << "Status before boot: " << daqinterface_state_ << __E__;
848 
849  // 1. Create Python Strings
850  PyObjectGuard pNameBoot(PyUnicode_FromString("do_boot"));
851  PyObjectGuard pBootArgs(PyUnicode_FromString(
852  (ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt").c_str()));
853 
854  // 2. First Attempt: Call do_boot
855  PyObjectGuard resBoot1(PyObject_CallMethodObjArgs(
856  daqinterface_ptr_, pNameBoot.get(), pBootArgs.get(), NULL));
857 
858  doBootOutput = captureStderrAndStdout_("do_boot");
859  __COUT_MULTI_LBL__(0, doBootOutput, "do_boot");
860 
861  if(checkPythonError(resBoot1.get()))
862  {
863  // --- FAILURE PATH ---
864 
865  std::string err1 = capturePyErr("do_boot");
866 
867  __GEN_COUT_INFO__ << "Error on first boot attempt: " << err1
868  << ". Recovering and retrying..." << __E__;
869 
870  // B. Attempt 'do_recover'
871  PyObjectGuard pNameRecover(PyUnicode_FromString("do_recover"));
872  PyObjectGuard resRecover(
873  PyObject_CallMethodObjArgs(daqinterface_ptr_, pNameRecover.get(), NULL));
874  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_recover"), "do_recover");
875 
876  if(checkPythonError(resRecover.get()))
877  {
878  // Recover failed - Critical Error
879  std::string errRec = capturePyErr("do_recover");
880 
881  std::stringstream oss;
882  oss << "Error calling recover transition!!!! " << errRec;
883  if(doBootOutput.size() > OUT_ON_ERR_SIZE)
884  oss << "... last " << OUT_ON_ERR_SIZE
885  << " chars: " << doBootOutput.substr(doBootOutput.size() - 1000);
886  else
887  oss << doBootOutput;
888 
889  // Clean up original args before throwing
890  __GEN_SS__ << oss.str() << __E__;
891  __GEN_SS_THROW__;
892  }
893 
894  // C. Retry 'do_boot'
895  thread_progress_bar_.step();
896  set_thread_message_("Calling do_boot (retry)");
897  __GEN_COUT_INFO__ << "Calling do_boot again" << __E__;
898 
899  // Reuse pNameBoot and pBootArgs (valid until end of scope)
900  PyObjectGuard resBoot2(PyObject_CallMethodObjArgs(
901  daqinterface_ptr_, pNameBoot.get(), pBootArgs.get(), NULL));
902 
903  doBootOutput = captureStderrAndStdout_("do_boot (retry)");
904  __COUT_MULTI_LBL__(0, doBootOutput, "do_boot (retry)");
905 
906  if(checkPythonError(resBoot2.get()))
907  {
908  // Second boot failed
909  std::string err2 = capturePyErr("do_boot retry");
910 
911  std::stringstream oss;
912  oss << "Error calling boot transition (2nd try): " << err2;
913  if(doBootOutput.size() > OUT_ON_ERR_SIZE)
914  oss << "... last " << OUT_ON_ERR_SIZE
915  << " chars: " << doBootOutput.substr(doBootOutput.size() - 1000);
916  else
917  oss << doBootOutput;
918 
919  __GEN_SS__ << oss.str() << __E__;
920  __GEN_SS_THROW__;
921  }
922  }
923 
924  getDAQState_();
925  if(daqinterface_state_ != "booted")
926  {
927  std::cout << "Do boot output on error: \n" << doBootOutput << __E__;
928  __GEN_SS__ << "DAQInterface boot transition failed! "
929  << "Status after boot attempt: " << daqinterface_state_ << __E__;
930 
931  if(doBootOutput.size() > OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
932  ss << "... last " << OUT_ON_ERR_SIZE
933  << " characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
934  else
935  ss << doBootOutput;
936  __GEN_SS_THROW__;
937  }
938  __GEN_COUT__ << "Status after boot: " << daqinterface_state_ << __E__;
939  } // end Block 3 — release daqinterface_pythonMutex_
940 
941  thread_progress_bar_.step();
942 
943  // Block 4: do_config
944  set_thread_message_("Calling do_config");
945  __GEN_COUT_INFO__ << "Calling do_config" << __E__;
946  std::string doConfigOutput = "";
947  {
948  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
949 
950  __GEN_COUT__ << "Status before config: " << daqinterface_state_ << __E__;
951 
952  { //do_config call
953  // RAII wrapper for Python objects to ensure cleanup even on exception
954 
955  PyObjectGuard pName3(PyUnicode_FromString("do_config"));
956  // 2. Create the argument - list containing config name: ["my_config"]
957  PyObjectGuard pArg(Py_BuildValue("[s]", FAKE_CONFIG_NAME));
958 
959  // 3. Call the method
960  PyObjectGuard res3(PyObject_CallMethodObjArgs(
961  daqinterface_ptr_, pName3.get(), pArg.get(), NULL));
962 
963  // 4. Check for errors FIRST before capturing output (which might clear error state)
964  if(checkPythonError(res3.get()))
965  {
966  // Get the error message before doing anything else
967  std::string err = capturePyErr("do_config");
968 
969  // Now capture output for diagnostics
970  doConfigOutput = captureStderrAndStdout_("do_config");
971 
972  __GEN_SS__ << "Error calling config transition: " << err << __E__;
973  __GEN_SS_THROW__;
974  }
975 
976  // 5. Success path - capture output
977  doConfigOutput = captureStderrAndStdout_("do_config");
978  __COUT_MULTI_LBL__(0, doConfigOutput, "do_config");
979 
980  // 6. Success Handling (Safe conversion to string)
981  // We use PyObject_Str to safely convert any return type (None, Int, String) to text
982  PyObjectGuard strRes(PyObject_Str(res3.get()));
983  const char* res_cstr = "";
984  if(strRes.get())
985  {
986  res_cstr = PyUnicode_AsUTF8(strRes.get());
987  }
988 
989  __SUP_COUTT__ << "do_config result=" << (res_cstr ? res_cstr : "N/A")
990  << __E__;
991  } //end do_config call
992 
993  getDAQState_();
994  if(daqinterface_state_ != "ready")
995  {
996  __GEN_SS__ << "DAQInterface config transition failed!" << __E__
997  << "Supervisor state: \"" << daqinterface_state_
998  << "\" != \"ready\" " << __E__;
999  auto doConfigOutput_recover_i =
1000  doConfigOutput.find("RECOVER transition underway");
1001  if(doConfigOutput_recover_i == std::string::npos)
1002  ss << doConfigOutput;
1003  else if(doConfigOutput_recover_i >
1004  OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
1005  ss << "... tail of " << OUT_ON_ERR_SIZE << " characters before recovery: "
1006  << doConfigOutput.substr(
1007  doConfigOutput_recover_i - OUT_ON_ERR_SIZE +
1008  std::string("RECOVER transition underway").size(),
1009  OUT_ON_ERR_SIZE);
1010  else
1011  ss << doConfigOutput.substr(
1012  0,
1013  doConfigOutput_recover_i +
1014  std::string("RECOVER transition underway").size());
1015  __GEN_SS_THROW__;
1016  }
1017  __GEN_COUT__ << "Status after config: " << daqinterface_state_ << __E__;
1018  } // end Block 4 — release daqinterface_pythonMutex_
1019 
1020  thread_progress_bar_.complete();
1021  set_thread_message_("Configured");
1022  __GEN_COUT_INFO__ << "Configured." << __E__;
1023 
1024 } // end configuringThread()
1025 catch(const std::runtime_error& e)
1026 {
1027  set_thread_message_("ERROR");
1028  __SS__ << "Error was caught while configuring: " << e.what() << __E__;
1029  __COUT_ERR__ << "\n" << ss.str();
1030  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1031  thread_error_message_ = ss.str();
1032 }
1033 catch(...)
1034 {
1035  set_thread_message_("ERROR");
1036  __SS__ << "Unknown error was caught while configuring. Please checked the logs."
1037  << __E__;
1038  __COUT_ERR__ << "\n" << ss.str();
1039 
1040  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1041 
1042  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1043  thread_error_message_ = ss.str();
1044 } // end configuringThread() error handling
1045 
1046 //==============================================================================
1047 void ARTDAQSupervisor::transitionHalting(toolbox::Event::Reference /*event*/)
1048 try
1049 {
1050  set_thread_message_("Halting");
1051  __SUP_COUT__ << "Halting..." << __E__;
1052 
1053  int tries = 0;
1054  while(tries++ < 5)
1055  {
1056  std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1057  std::try_to_lock);
1058  if(!lk.owns_lock()) //if lock not availabe, just report last status
1059  {
1060  __COUTS__(50) << "Do not have python lock for halt. tries=" << tries << __E__;
1061  sleep(1);
1062  continue;
1063  }
1064  __COUTS__(50) << "Have python lock!" << __E__;
1065 
1066  // std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1067  getDAQState_();
1068  __SUP_COUT__ << "Status before halt: " << daqinterface_state_ << __E__;
1069 
1070  if(daqinterface_state_ == "running")
1071  {
1072  // First stop before halting
1073  PyObjectGuard pName(PyUnicode_FromString("do_stop_running"));
1074  PyObjectGuard res(
1075  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1076  __COUT_MULTI_LBL__(
1077  0, captureStderrAndStdout_("do_stop_running"), "do_stop_running");
1078 
1079  if(res.get() == NULL)
1080  {
1081  std::string err = capturePyErr();
1082  __SS__ << "Error calling DAQ Interface stop transition: " << err
1083  << __E__;
1084  __SUP_SS_THROW__;
1085  }
1086  }
1087 
1088  PyObjectGuard pName(PyUnicode_FromString("do_command"));
1089  PyObjectGuard pArg(PyUnicode_FromString("Shutdown"));
1090  PyObjectGuard res(
1091  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1092  __COUT_MULTI_LBL__(
1093  0, captureStderrAndStdout_("do_command Shutdown"), "do_command Shutdown");
1094 
1095  if(checkPythonError(res.get()))
1096  {
1097  std::string err = capturePyErr("do_command Shutdown");
1098  __SS__ << "Error calling DAQ Interface halt transition: " << err << __E__;
1099  __SUP_SS_THROW__;
1100  }
1101 
1102  getDAQState_();
1103  __SUP_COUT__ << "Status after halt: " << daqinterface_state_ << __E__;
1104  break;
1105  } //end retry loop
1106 
1107  if(tries >= 5)
1108  {
1109  __SUP_SS__ << "Failed to acquire python lock for halting after " << tries
1110  << " tries, giving up! Is it possible the configure thread is stuck?"
1111  << __E__;
1112  __SUP_SS_THROW__;
1113  }
1114 
1115  __SUP_COUT__ << "Halted." << __E__;
1116  set_thread_message_("Halted");
1117 } // end transitionHalting()
1118 catch(const std::runtime_error& e)
1119 {
1120  const std::string transitionName = "Halting";
1121  // if halting from Failed state, then ignore errors
1122  if(theStateMachine_.getProvenanceStateName() ==
1123  RunControlStateMachine::FAILED_STATE_NAME ||
1124  theStateMachine_.getProvenanceStateName() ==
1125  RunControlStateMachine::HALTED_STATE_NAME)
1126  {
1127  __SUP_COUT_INFO__ << "Error was caught while halting (but ignoring because "
1128  "previous state was '"
1129  << RunControlStateMachine::FAILED_STATE_NAME
1130  << "'): " << e.what() << __E__;
1131  }
1132  else // if not previously in Failed state, then fail
1133  {
1134  __SUP_SS__ << "Error was caught while " << transitionName << ": " << e.what()
1135  << __E__;
1136  __SUP_COUT_ERR__ << "\n" << ss.str();
1137  theStateMachine_.setErrorMessage(ss.str());
1138  throw toolbox::fsm::exception::Exception(
1139  "Transition Error" /*name*/,
1140  ss.str() /* message*/,
1141  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
1142  __LINE__ /*line*/,
1143  __FUNCTION__ /*function*/
1144  );
1145  }
1146 } // end transitionHalting() std::runtime_error exception handling
1147 catch(...)
1148 {
1149  const std::string transitionName = "Halting";
1150  // if halting from Failed state, then ignore errors
1151  if(theStateMachine_.getProvenanceStateName() ==
1152  RunControlStateMachine::FAILED_STATE_NAME ||
1153  theStateMachine_.getProvenanceStateName() ==
1154  RunControlStateMachine::HALTED_STATE_NAME)
1155  {
1156  __SUP_COUT_INFO__ << "Unknown error was caught while halting (but ignoring "
1157  "because previous state was '"
1158  << RunControlStateMachine::FAILED_STATE_NAME << "')." << __E__;
1159  }
1160  else // if not previously in Failed state, then fail
1161  {
1162  __SUP_SS__ << "Unknown error was caught while " << transitionName
1163  << ". Please checked the logs." << __E__;
1164  __SUP_COUT_ERR__ << "\n" << ss.str();
1165  theStateMachine_.setErrorMessage(ss.str());
1166 
1167  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1168 
1169  throw toolbox::fsm::exception::Exception(
1170  "Transition Error" /*name*/,
1171  ss.str() /* message*/,
1172  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
1173  __LINE__ /*line*/,
1174  __FUNCTION__ /*function*/
1175  );
1176  }
1177 } // end transitionHalting() exception handling
1178 
1179 //==============================================================================
1180 void ARTDAQSupervisor::transitionInitializing(toolbox::Event::Reference /*event*/)
1181 try
1182 {
1183  set_thread_message_("Initializing");
1184  __SUP_COUT__ << "Initializing..." << __E__;
1185  init();
1186  __SUP_COUT__ << "Initialized." << __E__;
1187  set_thread_message_("Initialized");
1188 } // end transitionInitializing()
1189 catch(const std::runtime_error& e)
1190 {
1191  __SS__ << "Error was caught while Initializing: " << e.what() << __E__;
1192  __SS_THROW__;
1193 }
1194 catch(...)
1195 {
1196  __SS__ << "Unknown error was caught while Initializing. Please checked the logs."
1197  << __E__;
1198  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1199  __SS_THROW__;
1200 } // end transitionInitializing() error handling
1201 
1202 //==============================================================================
1203 void ARTDAQSupervisor::transitionPausing(toolbox::Event::Reference /*event*/)
1204 try
1205 {
1206  set_thread_message_("Pausing");
1207  __SUP_COUT__ << "Pausing..." << __E__;
1208  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1209 
1210  getDAQState_();
1211  __SUP_COUT__ << "Status before pause: " << daqinterface_state_ << __E__;
1212 
1213  PyObjectGuard pName(PyUnicode_FromString("do_command"));
1214  PyObjectGuard pArg(PyUnicode_FromString("Pause"));
1215  PyObjectGuard res(
1216  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1217  __COUT_MULTI_LBL__(
1218  0, captureStderrAndStdout_("do_command Pause"), "do_command Pause");
1219 
1220  if(checkPythonError(res.get()))
1221  {
1222  std::string err = capturePyErr("do_command Pause");
1223  __SS__ << "Error calling DAQ Interface Pause transition: " << err << __E__;
1224  __SUP_SS_THROW__;
1225  }
1226 
1227  getDAQState_();
1228  __SUP_COUT__ << "Status after pause: " << daqinterface_state_ << __E__;
1229 
1230  __SUP_COUT__ << "Paused." << __E__;
1231  set_thread_message_("Paused");
1232 } // end transitionPausing()
1233 catch(const std::runtime_error& e)
1234 {
1235  __SS__ << "Error was caught while Pausing: " << e.what() << __E__;
1236  __SS_THROW__;
1237 }
1238 catch(...)
1239 {
1240  __SS__ << "Unknown error was caught while Pausing. Please checked the logs." << __E__;
1241  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1242  __SS_THROW__;
1243 } // end transitionPausing() error handling
1244 
1245 //==============================================================================
1246 void ARTDAQSupervisor::transitionResuming(toolbox::Event::Reference /*event*/)
1247 try
1248 {
1249  set_thread_message_("Resuming");
1250  __SUP_COUT__ << "Resuming..." << __E__;
1251  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1252 
1253  getDAQState_();
1254  __SUP_COUT__ << "Status before resume: " << daqinterface_state_ << __E__;
1255  PyObjectGuard pName(PyUnicode_FromString("do_command"));
1256  PyObjectGuard pArg(PyUnicode_FromString("Resume"));
1257  PyObjectGuard res(
1258  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1259  __COUT_MULTI_LBL__(
1260  0, captureStderrAndStdout_("do_command Resume"), "do_command Resume");
1261 
1262  if(checkPythonError(res.get()))
1263  {
1264  std::string err = capturePyErr("do_command Resume");
1265  __SS__ << "Error calling DAQ Interface Resume transition: " << err << __E__;
1266  __SUP_SS_THROW__;
1267  }
1268 
1269  getDAQState_();
1270  __SUP_COUT__ << "Status after resume: " << daqinterface_state_ << __E__;
1271  __SUP_COUT__ << "Resumed." << __E__;
1272  set_thread_message_("Resumed");
1273 } // end transitionResuming()
1274 catch(const std::runtime_error& e)
1275 {
1276  __SS__ << "Error was caught while Resuming: " << e.what() << __E__;
1277  __SS_THROW__;
1278 }
1279 catch(...)
1280 {
1281  __SS__ << "Unknown error was caught while Resuming. Please checked the logs."
1282  << __E__;
1283  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1284  __SS_THROW__;
1285 } // end transitionResuming() error handling
1286 
1287 //==============================================================================
1288 void ARTDAQSupervisor::transitionStarting(toolbox::Event::Reference /*event*/)
1289 try
1290 {
1291  __SUP_COUT__ << "transitionStarting" << __E__;
1292 
1293  // first time launch thread because artdaq Supervisor may take a while
1294  if(RunControlStateMachine::getIterationIndex() == 0 &&
1295  RunControlStateMachine::getSubIterationIndex() == 0)
1296  {
1297  thread_error_message_ = "";
1298  thread_progress_bar_.resetProgressBar(0);
1299  last_thread_progress_update_ = time(0); // initialize timeout timer
1300 
1301  // start configuring thread
1302  std::thread(&ARTDAQSupervisor::startingThread, this).detach();
1303 
1304  __SUP_COUT_INFO__ << "Starting thread started." << __E__;
1305 
1306  RunControlStateMachine::
1307  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1308  }
1309  else // not first time
1310  {
1311  std::string errorMessage;
1312  {
1313  std::lock_guard<std::mutex> lock(
1314  thread_mutex_); // lock out for remainder of scope
1315  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
1316  }
1317  int progress = thread_progress_bar_.read();
1318  __SUP_COUTV__(errorMessage);
1319  __SUP_COUTV__(progress);
1320  __SUP_COUTV__(thread_progress_bar_.isComplete());
1321 
1322  // check for done and error messages
1323  if(errorMessage == "" && // if no update in 600 seconds, give up
1324  time(0) - last_thread_progress_update_ > 600)
1325  {
1326  __SUP_SS__ << "There has been no update from the start thread for "
1327  << (time(0) - last_thread_progress_update_)
1328  << " seconds, assuming something is wrong and giving up! "
1329  << "Last progress received was " << progress << __E__;
1330  errorMessage = ss.str();
1331  }
1332 
1333  if(errorMessage != "")
1334  {
1335  __SUP_SS__ << "Error was caught in starting thread: " << errorMessage
1336  << __E__;
1337  __SUP_COUT_ERR__ << "\n" << ss.str();
1338 
1339  theStateMachine_.setErrorMessage(ss.str());
1340  throw toolbox::fsm::exception::Exception(
1341  "Transition Error" /*name*/,
1342  ss.str() /* message*/,
1343  "CoreSupervisorBase::transitionStarting" /*module*/,
1344  __LINE__ /*line*/,
1345  __FUNCTION__ /*function*/
1346  );
1347  }
1348 
1349  if(!thread_progress_bar_.isComplete())
1350  {
1351  __SUP_COUT__ << "Not done yet..." << __E__;
1352  //attempt to get live view of python output (not working and not needed with new Tee Buffer solution)
1353  // __COUT_MULTI_LBL__(0, captureStderrAndStdout_("statuscheck"), "statuscheck");
1354 
1355  RunControlStateMachine::
1356  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1357 
1358  if(last_thread_progress_read_ != progress)
1359  {
1360  last_thread_progress_read_ = progress;
1361  last_thread_progress_update_ = time(0);
1362  }
1363 
1364  sleep(1 /*seconds*/);
1365  }
1366  else
1367  {
1368  __SUP_COUT_INFO__ << "Starting transition completed!" << __E__;
1369  __SUP_COUTV__(getProcessInfo_());
1370  }
1371  }
1372 
1373  return;
1374 
1375 } // end transitionStarting()
1376 catch(const std::runtime_error& e)
1377 {
1378  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1379  __SS_THROW__;
1380 }
1381 catch(...)
1382 {
1383  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1384  << __E__;
1385  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1386  __SS_THROW__;
1387 } // end transitionStarting() error handling
1388 
1389 //==============================================================================
1390 void ARTDAQSupervisor::startingThread()
1391 try
1392 {
1393  std::string uid = theConfigurationManager_
1394  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
1395  "/" + CorePropertySupervisorBase::getSupervisorUID() +
1396  "/" + "LinkToSupervisorTable")
1397  .getValueAsString();
1398 
1399  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
1400  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
1401  __GEN_COUT__ << "Starting..." << __E__;
1402  set_thread_message_("Starting");
1403 
1404  thread_progress_bar_.step();
1405  stop_runner_();
1406  {
1407  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1408  getDAQState_();
1409  __GEN_COUT__ << "Status before start: " << daqinterface_state_ << __E__;
1410  auto runNumber = SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
1411  .getParameters()
1412  .getValue("RunNumber");
1413 
1414  thread_progress_bar_.step();
1415 
1416  __GEN_COUT_INFO__ << "Calling do_start_running" << __E__;
1417  PyObjectGuard pName(PyUnicode_FromString("do_start_running"));
1418  int run_number = std::stoi(runNumber);
1419  PyObjectGuard pStateArgs(PyLong_FromLong(run_number));
1420  PyObjectGuard res(PyObject_CallMethodObjArgs(
1421  daqinterface_ptr_, pName.get(), pStateArgs.get(), NULL));
1422  std::string doStartOutput;
1423 
1424  thread_progress_bar_.step();
1425 
1426  if(checkPythonError(res.get()))
1427  {
1428  std::string err = capturePyErr("do_start_running");
1429  doStartOutput = captureStderrAndStdout_("do_start_running");
1430  __SS__ << "Error calling start transition: " << err << __E__;
1431  if(doStartOutput.size() > OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
1432  ss << "... last " << OUT_ON_ERR_SIZE << " characters: "
1433  << doStartOutput.substr(doStartOutput.size() - OUT_ON_ERR_SIZE);
1434  else
1435  ss << doStartOutput;
1436  __GEN_SS_THROW__;
1437  }
1438 
1439  doStartOutput = captureStderrAndStdout_("do_start_running");
1440  __COUT_MULTI_LBL__(0, doStartOutput, "do_start_running");
1441  getDAQState_();
1442 
1443  thread_progress_bar_.step();
1444 
1445  __GEN_COUT__ << "Status after start: " << daqinterface_state_ << __E__;
1446  if(daqinterface_state_ != "running")
1447  {
1448  __SS__ << "DAQInterface start transition failed!" << __E__
1449  << "DAQInterface state: \"" << daqinterface_state_
1450  << "\" != \"running\" " << __E__;
1451  if(doStartOutput.size() > OUT_ON_ERR_SIZE) //last OUT_ON_ERR_SIZE chars only
1452  ss << "... last " << OUT_ON_ERR_SIZE << " characters: "
1453  << doStartOutput.substr(doStartOutput.size() - OUT_ON_ERR_SIZE);
1454  else
1455  ss << doStartOutput;
1456  __GEN_SS_THROW__;
1457  }
1458 
1459  thread_progress_bar_.step();
1460  }
1461  start_runner_();
1462  set_thread_message_("Started");
1463  thread_progress_bar_.step();
1464 
1465  __GEN_COUT_INFO__ << "Started." << __E__;
1466  thread_progress_bar_.complete();
1467 
1468 } // end startingThread()
1469 catch(const std::runtime_error& e)
1470 {
1471  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1472  __COUT_ERR__ << "\n" << ss.str();
1473  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1474  thread_error_message_ = ss.str();
1475 }
1476 catch(...)
1477 {
1478  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1479  << __E__;
1480  __COUT_ERR__ << "\n" << ss.str();
1481 
1482  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1483 
1484  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1485  thread_error_message_ = ss.str();
1486 } // end startingThread() error handling
1487 
1488 //==============================================================================
1489 void ARTDAQSupervisor::transitionStopping(toolbox::Event::Reference /*event*/)
1490 try
1491 {
1492  __SUP_COUT__ << "Stopping..." << __E__;
1493  set_thread_message_("Stopping");
1494  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1495  getDAQState_();
1496  __SUP_COUT__ << "Status before stop: " << daqinterface_state_ << __E__;
1497  PyObjectGuard pName(PyUnicode_FromString("do_stop_running"));
1498  PyObjectGuard res(PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1499  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_stop_running"), "do_stop_running");
1500 
1501  if(checkPythonError(res.get()))
1502  {
1503  std::string err = capturePyErr("do_stop_running");
1504  __SS__ << "Error calling DAQ Interface stop transition: " << err << __E__;
1505  __SUP_SS_THROW__;
1506  }
1507  getDAQState_();
1508  __SUP_COUT__ << "Status after stop: " << daqinterface_state_ << __E__;
1509  __SUP_COUT__ << "Stopped." << __E__;
1510  set_thread_message_("Stopped");
1511 } // end transitionStopping()
1512 catch(const std::runtime_error& e)
1513 {
1514  __SS__ << "Error was caught while Stopping: " << e.what() << __E__;
1515  __SS_THROW__;
1516 }
1517 catch(...)
1518 {
1519  __SS__ << "Unknown error was caught while Stopping. Please checked the logs."
1520  << __E__;
1521  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1522  __SS_THROW__;
1523 } // end transitionStopping() error handling
1524 
1525 //==============================================================================
1526 void ots::ARTDAQSupervisor::enteringError(toolbox::Event::Reference /*event*/)
1527 {
1528  __SUP_COUT__ << "Entering error recovery state" << __E__;
1529  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1530  getDAQState_();
1531  __SUP_COUT__ << "Status before error: " << daqinterface_state_ << __E__;
1532 
1533  PyObjectGuard pName(PyUnicode_FromString("do_recover"));
1534  PyObjectGuard res(PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1535  __COUT_MULTI_LBL__(0, captureStderrAndStdout_("do_recover"), "do_recover");
1536 
1537  if(checkPythonError(res.get()))
1538  {
1539  std::string err = capturePyErr("do_recover");
1540  //do not throw exception when entering error, because failing DAQ interface could be the reason for error in first place
1541  __SUP_COUT_WARN__ << "Error calling DAQ Interface recover transition: " << err
1542  << __E__;
1543  return;
1544  }
1545 
1546  getDAQState_();
1547  __SUP_COUT__ << "Status after error: " << daqinterface_state_ << __E__;
1548  __SUP_COUT__ << "EnteringError DONE." << __E__;
1549 
1550 } // end enteringError()
1551 
1552 std::vector<SupervisorInfo::SubappInfo> ots::ARTDAQSupervisor::getSubappInfo(void)
1553 {
1554  auto apps = getAndParseProcessInfo_();
1555 
1556  std::map<int, SupervisorInfo::SubappInfo> subapp_infos;
1557  for(auto& app : apps)
1558  {
1560 
1561  info.name = app.label;
1562  info.detail = "Rank " + std::to_string(app.rank) + ", subsystem " +
1563  std::to_string(app.subsystem);
1564  info.lastStatusTime = time(0);
1565  info.progress = 100;
1566  info.status = artdaqStateToOtsState(app.state);
1567  info.url = "http://" + app.host + ":" + std::to_string(app.port) + "/RPC2";
1568  info.class_name = "ARTDAQ " + labelToProcType_(app.label);
1569 
1570  subapp_infos[app.rank] = info;
1571  }
1572 
1573  std::vector<SupervisorInfo::SubappInfo> output;
1574  for(auto& [rank, info] : subapp_infos)
1575  {
1576  output.push_back(info);
1577  }
1578  return output;
1579 } //end getSubappInfo()
1580 
1581 //==============================================================================
1582 // Helper function to check if a Python call failed
1583 // Returns true if there was an error (result is NULL or PyErr_Occurred)
1584 // NOTE: Does NOT clear the Python error state - caller should call capturePyErr() to fetch it
1585 bool ots::ARTDAQSupervisor::checkPythonError(PyObject* result)
1586 {
1587  if(result == NULL || PyErr_Occurred())
1588  {
1589  // Assume result is cleaned up by its PyObjectGuard if needed
1590 
1591  // Note: We keep the Python error state so caller can extract the message with capturePyErr()
1592  return true; // Error occurred
1593  }
1594  return false; // No error
1595 } //end checkPythonError()
1596 
1597 //==============================================================================
1598 std::string ots::ARTDAQSupervisor::capturePyErr(std::string label /* = "" */)
1599 {
1600  std::string err_msg = "Unknown Python Error";
1601  PyObject * pType, *pValue, *pTraceback;
1602  PyErr_Fetch(&pType, &pValue, &pTraceback);
1603  PyErr_NormalizeException(&pType, &pValue, &pTraceback);
1604 
1605  if(pType != NULL)
1606  {
1607  // Format the full traceback like Python does
1608  PyObjectGuard traceback_module(PyImport_ImportModule("traceback"));
1609  if(traceback_module.get() != NULL)
1610  {
1611  PyObjectGuard format_exception(
1612  PyObject_GetAttrString(traceback_module.get(), "format_exception"));
1613  if(format_exception.get() != NULL)
1614  {
1615  PyObjectGuard formatted(
1616  PyObject_CallFunctionObjArgs(format_exception.get(),
1617  pType,
1618  pValue ? pValue : Py_None,
1619  pTraceback ? pTraceback : Py_None,
1620  NULL));
1621  if(formatted.get() != NULL)
1622  {
1623  // formatted is a list of strings, join them
1624  PyObjectGuard empty_string(PyUnicode_FromString(""));
1625  PyObjectGuard joined(
1626  PyUnicode_Join(empty_string.get(), formatted.get()));
1627  if(joined.get() != NULL)
1628  {
1629  const char* traceback_cstr = PyUnicode_AsUTF8(joined.get());
1630  if(traceback_cstr)
1631  err_msg = traceback_cstr;
1632  }
1633  }
1634  }
1635  }
1636 
1637  // Fallback to simple message if traceback formatting failed
1638  if(err_msg == "Unknown Python Error" && pValue != NULL)
1639  {
1640  PyObjectGuard pStr(PyObject_Str(pValue));
1641  if(pStr.get() != NULL)
1642  {
1643  const char* error_cstr = PyUnicode_AsUTF8(pStr.get());
1644  if(error_cstr)
1645  err_msg = error_cstr;
1646  }
1647  }
1648  }
1649 
1650  Py_XDECREF(pType);
1651  Py_XDECREF(pValue);
1652  Py_XDECREF(pTraceback);
1653 
1654  // Add label prefix if provided
1655  if(!label.empty())
1656  err_msg = label + ":\n" + err_msg;
1657 
1658  return err_msg;
1659 } //end capturePyErr()
1660 
1661 //==============================================================================
1662 std::string ots::ARTDAQSupervisor::captureStderrAndStdout_(std::string label /* = "" */)
1663 {
1664  if(!stringIO_out_)
1665  return ""; // Not defined
1666  // If a Python error is already pending, do not consume it here
1667  if(PyErr_Occurred())
1668  return "";
1669  if(label.size())
1670  label += ' '; //for nice printing
1671 
1672  std::string outString = "";
1673  PyObjectGuard out(PyObject_CallMethod(stringIO_out_, "getvalue", NULL));
1674 
1675  if(checkPythonError(out.get()))
1676  {
1677  // Error getting output - clear the error and return empty string
1678  capturePyErr("captureStderrAndStdout getvalue");
1679  return "";
1680  }
1681 
1682  const char* text = PyUnicode_AsUTF8(out.get());
1683 
1684  return text ? text : "";
1685 } //end captureStderrAndStdout_()
1686 
1687 void ots::ARTDAQSupervisor::getDAQState_()
1688 {
1689  __SUP_COUTS__(50) << "Getting DAQInterface python lock" << __E__;
1690  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1691  __SUP_COUTS__(50) << "Have DAQInterface python lock" << __E__;
1692 
1693  if(daqinterface_ptr_ == NULL)
1694  {
1695  daqinterface_state_ = "";
1696  __SUP_COUT_WARN__ << "daqinterface_ptr_ is not initialized!" << __E__;
1697  return;
1698  }
1699 
1700  // Prepare Python Strings ONCE (Move outside loop to prevent 5x memory leak)
1701  PyObjectGuard pName(PyUnicode_FromString("state"));
1702  PyObjectGuard pArg(PyUnicode_FromString("DAQInterface"));
1703 
1704  // WARNING: Verify your Python 'state' method accepts an argument.
1705  // If 'def state(self):' is the signature, passing pArg will fail.
1706  // If so, call: PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1707 
1708  int tries = 0;
1709  while(tries < 5)
1710  {
1711  // Call the method
1712  PyObjectGuard res(
1713  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1714 
1715  if(checkPythonError(res.get()))
1716  {
1717  tries++;
1718 
1719  // Get the error message
1720  std::string err_msg = capturePyErr("state");
1721 
1722  std::ostringstream ss;
1723  ss << "Attempt n " << tries
1724  << ". Error calling 'state'. Python Exception: " << err_msg;
1725 
1726  if(tries >= 5)
1727  {
1728  __COUT_ERR__ << ss.str() << __E__; // Log error on final fail
1729  daqinterface_state_ = "ERROR"; // distinct from empty
1730  }
1731  else
1732  {
1733  __COUT__ << ss.str() << __E__; // Log warning
1734  usleep(100000); // 100ms
1735  }
1736  continue;
1737  }
1738 
1739  // --- SUCCESS CASE ---
1740 
1741  // Safely convert result to string (res might not be a string!)
1742  PyObjectGuard strRes(PyObject_Str(res.get())); // Force conversion to string
1743  if(strRes.get())
1744  {
1745  daqinterface_state_ = std::string(PyUnicode_AsUTF8(strRes.get()));
1746  }
1747  else
1748  {
1749  // Rare case: object couldn't be converted to string
1750  daqinterface_state_ = "UNKNOWN";
1751  }
1752 
1753  __SUP_COUTS__(20) << "getDAQState_ state=" << daqinterface_state_ << __E__;
1754  break;
1755  }
1756 
1757  // Cleanup the string objects we created
1758 } //end getDAQState_()
1759 
1760 //==============================================================================
1761 std::string ots::ARTDAQSupervisor::getProcessInfo_(void)
1762 {
1763  __SUP_COUTS__(50) << "Getting DAQInterface state lock" << __E__;
1764  std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1765  __SUP_COUTS__(50) << "Have DAQInterface state lock" << __E__;
1766 
1767  if(daqinterface_ptr_ == nullptr)
1768  {
1769  return "";
1770  }
1771 
1772  PyObjectGuard pName(PyUnicode_FromString("artdaq_process_info"));
1773  PyObjectGuard pArg(PyUnicode_FromString("DAQInterface"));
1774  PyObjectGuard pArg2(PyBool_FromLong(true));
1775  PyObjectGuard res(PyObject_CallMethodObjArgs(
1776  daqinterface_ptr_, pName.get(), pArg.get(), pArg2.get(), NULL));
1777 
1778  if(checkPythonError(res.get()))
1779  {
1780  std::string err = capturePyErr("artdaq_process_info");
1781  __SS__ << "Error calling artdaq_process_info function: " << err << __E__;
1782  __SUP_SS_THROW__;
1783  return "";
1784  }
1785  //cache status as latest
1786  std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1787  daqinterface_status_ = std::string(PyUnicode_AsUTF8(res.get()));
1788  return daqinterface_status_;
1789 } // end getProcessInfo_()
1790 
1791 std::string ots::ARTDAQSupervisor::artdaqStateToOtsState(std::string state)
1792 {
1793  if(state == "nonexistent" || state == "nonexistant")
1794  return RunControlStateMachine::INITIAL_STATE_NAME;
1795  if(state == "Ready")
1796  return "Configured";
1797  if(state == "Running")
1798  return RunControlStateMachine::RUNNING_STATE_NAME;
1799  if(state == "Paused")
1800  return RunControlStateMachine::PAUSED_STATE_NAME;
1801  if(state == "Stopped")
1802  return RunControlStateMachine::HALTED_STATE_NAME;
1803 
1804  TLOG(TLVL_WARNING) << "Unrecognized state name " << state;
1805  return RunControlStateMachine::FAILED_STATE_NAME;
1806 }
1807 
1808 std::string ots::ARTDAQSupervisor::labelToProcType_(std::string label)
1809 {
1810  if(label_to_proc_type_map_.count(label))
1811  {
1812  return label_to_proc_type_map_[label];
1813  }
1814  return "UNKNOWN";
1815 }
1816 
1817 //==============================================================================
1819 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo>
1820 ots::ARTDAQSupervisor::getAndParseProcessInfo_()
1821 {
1822  std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> output;
1823  // full acquire from getProcessInfo_ creates mutex locking up!
1824  // auto info = getProcessInfo_();
1825  std::string info;
1826 
1827  std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1828  std::try_to_lock);
1829  if(!lk.owns_lock()) //if lock not availabe, just report last status
1830  {
1831  __COUTS__(50) << "Do not have python lock." << __E__;
1832  std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1833  info = daqinterface_status_;
1834  }
1835  else //have lock! so retrieve Python Interface status
1836  {
1837  __COUTS__(50) << "Have python lock!" << __E__;
1838  info = getProcessInfo_();
1839  }
1840  __COUTVS__(20, info);
1841 
1842  auto procs = tokenize_(info);
1843 
1844  // 0: Whole string
1845  // 1: Process Label
1846  // 2: Process host
1847  // 3: Process port
1848  // 4: Process subsystem
1849  // 5: Process Rank
1850  // 6: Process state
1851  std::regex re("(.*?) at ([^:]*):(\\d+) \\(subsystem (\\d+), rank (\\d+)\\): (.*)");
1852 
1853  for(auto& proc : procs)
1854  {
1855  std::smatch match;
1856  if(std::regex_match(proc, match, re))
1857  {
1858  DAQInterfaceProcessInfo info;
1859 
1860  info.label = match[1];
1861  info.host = match[2];
1862  info.port = std::stoi(match[3]);
1863  info.subsystem = std::stoi(match[4]);
1864  info.rank = std::stoi(match[5]);
1865  info.state = match[6];
1866 
1867  output.push_back(info);
1868  }
1869  }
1870  return output;
1871 } // end getAndParseProcessInfo_()
1872 
1873 //==============================================================================
1875  std::unique_ptr<artdaq::CommanderInterface>>>
1876 ots::ARTDAQSupervisor::makeCommandersFromProcessInfo()
1877 {
1878  std::list<
1879  std::pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>>
1880  output;
1881  auto infos = getAndParseProcessInfo_();
1882 
1883  for(auto& info : infos)
1884  {
1885  artdaq::Commandable cm;
1886  fhicl::ParameterSet ps;
1887 
1888  ps.put<std::string>("commanderPluginType", "xmlrpc");
1889  ps.put<int>("id", info.port);
1890  ps.put<std::string>("server_url", info.host);
1891 
1892  output.emplace_back(std::make_pair<DAQInterfaceProcessInfo,
1893  std::unique_ptr<artdaq::CommanderInterface>>(
1894  std::move(info), artdaq::MakeCommanderPlugin(ps, cm)));
1895  }
1896 
1897  return output;
1898 } // end makeCommandersFromProcessInfo()
1899 
1900 //==============================================================================
1901 std::list<std::string> ots::ARTDAQSupervisor::tokenize_(std::string const& input)
1902 {
1903  size_t pos = 0;
1904  std::list<std::string> output;
1905 
1906  while(pos != std::string::npos && pos < input.size())
1907  {
1908  auto newpos = input.find('\n', pos);
1909  if(newpos != std::string::npos)
1910  {
1911  output.emplace_back(input, pos, newpos - pos);
1912  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1913  pos = newpos + 1;
1914  }
1915  else
1916  {
1917  output.emplace_back(input, pos);
1918  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1919  pos = newpos;
1920  }
1921  }
1922  return output;
1923 } // end tokenize_()
1924 
1925 //==============================================================================
1926 void ots::ARTDAQSupervisor::daqinterfaceRunner_()
1927 try
1928 {
1929  TLOG(TLVL_TRACE) << "Runner thread starting";
1930  runner_running_ = true;
1931  while(runner_running_)
1932  {
1933  if(daqinterface_ptr_ != NULL)
1934  {
1935  std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1936  getDAQState_();
1937  std::string state_before = daqinterface_state_;
1938 
1939  __SUP_COUTS__(2) << "Runner state_before=" << state_before
1940  << " state now=" << daqinterface_state_
1941  << " ?= running, ready, or booted" << __E__;
1942 
1943  if(daqinterface_state_ == "running" || daqinterface_state_ == "ready" ||
1944  daqinterface_state_ == "booted")
1945  {
1946  try
1947  {
1948  TLOG(TLVL_TRACE) << "Calling DAQInterface::check_proc_heartbeats";
1949  PyObjectGuard pName(PyUnicode_FromString("check_proc_heartbeats"));
1950  PyObjectGuard res(
1951  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1952  __COUT_MULTI_LBL__(1,
1953  captureStderrAndStdout_("check_proc_heartbeats"),
1954  "check_proc_heartbeats");
1955  TLOG(TLVL_TRACE)
1956  << "Done with DAQInterface::check_proc_heartbeats call";
1957 
1958  if(res.get() == NULL)
1959  {
1960  runner_running_ = false;
1961  std::string err = capturePyErr("check_proc_heartbeats");
1962  __SS__ << "Error calling check_proc_heartbeats function: " << err
1963  << __E__;
1964  __SUP_SS_THROW__;
1965  break;
1966  }
1967  }
1968  catch(cet::exception& ex)
1969  {
1970  runner_running_ = false;
1971  std::string err = capturePyErr("check_proc_heartbeats");
1972  __SS__ << "An cet::exception occurred while calling "
1973  "check_proc_heartbeats function "
1974  << ex.explain_self() << ": " << err << __E__;
1975  __SUP_SS_THROW__;
1976  break;
1977  }
1978  catch(std::exception& ex)
1979  {
1980  runner_running_ = false;
1981  std::string err = capturePyErr("check_proc_heartbeats");
1982  __SS__ << "An std::exception occurred while calling "
1983  "check_proc_heartbeats function: "
1984  << ex.what() << "\n\n"
1985  << err << __E__;
1986  __SUP_SS_THROW__;
1987  break;
1988  }
1989  catch(...)
1990  {
1991  runner_running_ = false;
1992  std::string err = capturePyErr("check_proc_heartbeats");
1993  __SS__ << "An unknown Error occurred while calling "
1994  "check_proc_heartbeats function: "
1995  << err << __E__;
1996  __SUP_SS_THROW__;
1997  break;
1998  }
1999 
2000  lk.unlock();
2001  getDAQState_();
2002  if(daqinterface_state_ != state_before)
2003  {
2004  runner_running_ = false;
2005  lk.unlock();
2006  __SS__ << "DAQInterface state unexpectedly changed from "
2007  << state_before << " to " << daqinterface_state_
2008  << ". Check supervisor log file for more info!" << __E__;
2009  __SUP_SS_THROW__;
2010  break;
2011  }
2012  }
2013  }
2014  else
2015  {
2016  __SUP_COUT__ << "daqinterface_ptr_ is null" << __E__;
2017  break;
2018  }
2019  usleep(1000000);
2020  }
2021  runner_running_ = false;
2022  TLOG(TLVL_TRACE) << "Runner thread complete";
2023 } // end daqinterfaceRunner_()
2024 catch(...)
2025 {
2026  __SS__ << "An error occurred in "
2027  "start_runner_/daqinterfaceRunner_ thread "
2028  << __E__;
2029  try
2030  {
2031  throw;
2032  }
2033  catch(const std::runtime_error& e)
2034  {
2035  ss << "Here is the error: " << e.what() << __E__;
2036  }
2037  catch(...)
2038  {
2039  ss << "Unexpected error!" << __E__;
2040  }
2041  __COUT_ERR__ << ss.str();
2042 
2043  {
2044  std::lock_guard<std::mutex> lock(
2045  thread_mutex_); // lock out for remainder of scope
2046  thread_error_message_ = ss.str();
2047  }
2048 
2049  theStateMachine_.setErrorMessage(ss.str());
2050 
2051  sendAsyncExceptionToGateway( //0 for both pause/stop indicates error
2052  ss.str(),
2053  0 /* isPauseException */,
2054  0 /* isStopException */);
2055 
2056 } // end daqinterfaceRunner_() catch
2057 
2058 //==============================================================================
2059 void ots::ARTDAQSupervisor::stop_runner_()
2060 {
2061  runner_running_ = false;
2062  if(runner_thread_ && runner_thread_->joinable())
2063  {
2064  runner_thread_->join();
2065  runner_thread_.reset(nullptr);
2066  }
2067 } // end stop_runner_()
2068 
2069 //==============================================================================
2070 void ots::ARTDAQSupervisor::start_runner_()
2071 {
2072  stop_runner_();
2073  runner_thread_ =
2074  std::make_unique<std::thread>(&ots::ARTDAQSupervisor::daqinterfaceRunner_, this);
2075 } // end start_runner_()
virtual void transitionHalting(toolbox::Event::Reference event) override
virtual void transitionInitializing(toolbox::Event::Reference event) override
static const std::string ARTDAQ_FCL_PATH
Tree-path rule is, if the last link in the path is a group link with a specified group ID,...
static std::string getBootFileContentFromInfo(const ARTDAQInfo &info, const std::string &setupScript, int debugLevel)
ConfigurationTree getNode(const std::string &nodeString, bool doNotThrowOnBrokenUIDLinks=false) const
"root/parent/parent/"
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
const std::string & getValueAsString(bool returnLinkTableValue=false) const
void getValue(T &value) const
ITRACEController * theTRACEController_
only define for an app that receives a command
bool isComplete()
get functions
Definition: ProgressBar.cc:88
void step()
thread safe
Definition: ProgressBar.cc:74
int read()
if stepsToComplete==0, then define any progress as 50%, thread safe
Definition: ProgressBar.cc:120
void complete()
declare complete, thread safe
Definition: ProgressBar.cc:95
defines used also by OtsConfigurationWizardSupervisor
void INIT_MF(const char *name)
std::string name
Also key in map.