3 #define TRACEMF_USE_VERBATIM 1
4 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisor.hh"
6 #include "artdaq-core/Utilities/configureMessageFacility.hh"
7 #include "artdaq/BuildInfo/GetPackageBuildInfo.hh"
8 #include "artdaq/DAQdata/Globals.hh"
9 #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
10 #include "cetlib_except/exception.h"
11 #include "fhiclcpp/make_ParameterSet.h"
12 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisorTRACEController.h"
14 #include "artdaq-core/Utilities/ExceptionHandler.hh"
16 #include <boost/exception/all.hpp>
17 #include <boost/filesystem.hpp>
24 #define OUT_ON_ERR_SIZE 2000
30 #define FAKE_CONFIG_NAME "ots_config"
31 #define DAQINTERFACE_PORT \
32 std::atoi(__ENV__("ARTDAQ_BASE_PORT")) + \
33 (partition_ * std::atoi(__ENV__("ARTDAQ_PORTS_PER_PARTITION")))
36 static std::unordered_map<int, struct sigaction> old_actions =
37 std::unordered_map<int, struct sigaction>();
38 static bool sighandler_init =
false;
39 static void signal_handler(
int signum)
42 #if TRACE_REVNUM < 1459
43 TRACE_STREAMER(TLVL_ERROR, &(
"ARTDAQsupervisor")[0], 0, 0, 0)
45 TRACE_STREAMER(TLVL_ERROR, TLOG2(
"ARTDAQsupervisor", 0), 0)
47 <<
"A signal of type " << signum
48 <<
" was caught by ARTDAQSupervisor. Shutting down DAQInterface, "
49 "then proceeding with default handlers!";
55 pthread_sigmask(SIG_UNBLOCK, NULL, &set);
56 pthread_sigmask(SIG_UNBLOCK, &set, NULL);
58 #if TRACE_REVNUM < 1459
59 TRACE_STREAMER(TLVL_ERROR, &(
"ARTDAQsupervisor")[0], 0, 0, 0)
61 TRACE_STREAMER(TLVL_ERROR, TLOG2(
"ARTDAQsupervisor", 0), 0)
63 <<
"Calling default signal handler";
66 sigaction(signum, &old_actions[signum], NULL);
67 kill(getpid(), signum);
73 sigaction(SIGINT, &old_actions[SIGINT], NULL);
74 kill(getpid(), SIGINT);
80 static std::mutex sighandler_mutex;
81 std::unique_lock<std::mutex> lk(sighandler_mutex);
85 sighandler_init =
true;
87 std::vector<int> signals = {
98 for(
auto signal : signals)
100 struct sigaction old_action;
101 sigaction(signal, NULL, &old_action);
105 if(old_action.sa_handler != SIG_IGN)
107 struct sigaction action;
108 action.sa_handler = signal_handler;
109 sigemptyset(&action.sa_mask);
110 for(
auto sigblk : signals)
112 sigaddset(&action.sa_mask, sigblk);
118 sigaction(signal, &action, NULL);
119 old_actions[signal] = old_action;
126 ARTDAQSupervisor::ARTDAQSupervisor(xdaq::ApplicationStub* stub)
128 , daqinterface_ptr_(NULL)
129 , partition_(getSupervisorProperty(
"partition", 0))
130 , daqinterface_state_(
"notrunning")
131 , runner_thread_(nullptr)
133 __SUP_COUT__ <<
"Constructor." << __E__;
136 init_sighandler(
this);
143 auto settings_file = __ENV__(
"DAQINTERFACE_SETTINGS");
144 std::ofstream of(settings_file, std::ios::trunc);
145 const int openErrno = errno;
146 if(!of.is_open() || of.fail())
148 __SS__ <<
"Failed to open DAQINTERFACE_SETTINGS file '" << settings_file
149 <<
"' for writing: " << strerror(openErrno) << __E__;
154 setenv(
"DAQINTERFACE_PARTITION_NUMBER", std::to_string(partition_).c_str(), 1);
155 auto logfileName = std::string(__ENV__(
"OTSDAQ_LOG_DIR")) +
156 "/DAQInteface/DAQInterface_partition" +
157 std::to_string(partition_) +
".log";
158 setenv(
"DAQINTERFACE_LOGFILE", logfileName.c_str(), 1);
160 o <<
"log_directory: "
161 << getSupervisorProperty(
"log_directory", std::string(__ENV__(
"OTSDAQ_LOG_DIR")))
165 const std::string record_directory = getSupervisorProperty(
166 "record_directory", ARTDAQTableBase::ARTDAQ_FCL_PATH +
"/run_records/");
167 mkdir(record_directory.c_str(), 0755);
168 o <<
"record_directory: " << record_directory << std::endl;
171 o <<
"package_hashes_to_save: "
172 << getSupervisorProperty(
"package_hashes_to_save",
"[artdaq]") << std::endl;
174 o <<
"spack_root_for_bash_scripts: "
175 << getSupervisorProperty(
"spack_root_for_bash_scripts",
176 std::string(__ENV__(
"SPACK_ROOT")))
178 o <<
"boardreader timeout: " << getSupervisorProperty(
"boardreader_timeout", 30)
180 o <<
"eventbuilder timeout: " << getSupervisorProperty(
"eventbuilder_timeout", 30)
182 o <<
"datalogger timeout: " << getSupervisorProperty(
"datalogger_timeout", 30)
184 o <<
"dispatcher timeout: " << getSupervisorProperty(
"dispatcher_timeout", 30)
187 if(!getSupervisorProperty(
"advanced_memory_usage",
false))
189 o <<
"max_fragment_size_bytes: "
190 << getSupervisorProperty(
"max_fragment_size_bytes", 1048576) << std::endl;
192 o <<
"transfer_plugin_to_use: "
193 << getSupervisorProperty(
"transfer_plugin_to_use",
"TCPSocket") << std::endl;
194 if(getSupervisorProperty(
"transfer_plugin_from_brs",
"") !=
"")
196 o <<
"transfer_plugin_from_brs: "
197 << getSupervisorProperty(
"transfer_plugin_from_brs",
"") << std::endl;
199 if(getSupervisorProperty(
"transfer_plugin_from_ebs",
"") !=
"")
201 o <<
"transfer_plugin_from_ebs: "
202 << getSupervisorProperty(
"transfer_plugin_from_ebs",
"") << std::endl;
204 if(getSupervisorProperty(
"transfer_plugin_from_dls",
"") !=
"")
206 o <<
"transfer_plugin_from_dls: "
207 << getSupervisorProperty(
"transfer_plugin_from_dls",
"") << std::endl;
209 o <<
"all_events_to_all_dispatchers: " << std::boolalpha
210 << getSupervisorProperty(
"all_events_to_all_dispatchers",
true) << std::endl;
211 if(getSupervisorProperty(
"data_directory_override",
"") !=
"")
213 o <<
"data_directory_override: "
214 << getSupervisorProperty(
"data_directory_override",
"") << std::endl;
216 o <<
"max_configurations_to_list: "
217 << getSupervisorProperty(
"max_configurations_to_list", 10) << std::endl;
218 o <<
"disable_unique_rootfile_labels: "
219 << getSupervisorProperty(
"disable_unique_rootfile_labels",
false) << std::endl;
220 o <<
"use_messageviewer: " << std::boolalpha
221 << getSupervisorProperty(
"use_messageviewer",
false) << std::endl;
222 o <<
"use_messagefacility: " << std::boolalpha
223 << getSupervisorProperty(
"use_messagefacility",
true) << std::endl;
224 o <<
"fake_messagefacility: " << std::boolalpha
225 << getSupervisorProperty(
"fake_messagefacility",
false) << std::endl;
226 o <<
"kill_existing_processes: " << std::boolalpha
227 << getSupervisorProperty(
"kill_existing_processes",
true) << std::endl;
228 o <<
"advanced_memory_usage: " << std::boolalpha
229 << getSupervisorProperty(
"advanced_memory_usage",
false) << std::endl;
230 o <<
"strict_fragment_id_mode: " << std::boolalpha
231 << getSupervisorProperty(
"strict_fragment_id_mode",
false) << std::endl;
232 o <<
"disable_private_network_bookkeeping: " << std::boolalpha
233 << getSupervisorProperty(
"disable_private_network_bookkeeping",
false) << std::endl;
234 o <<
"allowed_processors: "
235 << getSupervisorProperty(
236 "allowed_processors",
239 if(getSupervisorProperty(
"partition_label_format",
"") !=
241 o <<
"partition_label_format: "
242 << getSupervisorProperty(
"partition_label_format",
"") << std::endl;
244 __COUT_MULTI__(0, o.str());
250 if(CorePropertySupervisorBase::theTRACEController_)
252 __SUP_COUT__ <<
"Destroying TRACE Controller..." << __E__;
253 delete CorePropertySupervisorBase::
255 CorePropertySupervisorBase::theTRACEController_ =
nullptr;
257 CorePropertySupervisorBase::theTRACEController_ =
260 ->setSupervisorPtr(
this);
262 __SUP_COUT__ <<
"Constructed." << __E__;
266 ARTDAQSupervisor::~ARTDAQSupervisor(
void)
268 __SUP_COUT__ <<
"Destructor." << __E__;
271 __SUP_COUT__ <<
"Calling Py_Finalize()" << __E__;
277 __SUP_COUT__ <<
"Destroying TRACE Controller..." << __E__;
282 __SUP_COUT__ <<
"Destructed." << __E__;
286 void ARTDAQSupervisor::destroy(
void)
288 __SUP_COUT__ <<
"Destroying..." << __E__;
290 if(daqinterface_ptr_ != NULL)
292 __SUP_COUT__ <<
"Calling recover transition" << __E__;
293 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
295 PyObjectGuard pName(PyUnicode_FromString(
"do_recover"));
297 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
299 __SUP_COUT__ <<
"Making sure that correct state has been reached" << __E__;
301 while(daqinterface_state_ !=
"stopped")
304 __SUP_COUT__ <<
"State is " << daqinterface_state_
305 <<
", waiting 1s and retrying..." << __E__;
310 Py_XDECREF(daqinterface_ptr_);
311 daqinterface_ptr_ = NULL;
314 __SUP_COUT__ <<
"Flusing printouts" << __E__;
317 PyRun_SimpleString(R
"(
319 sys.stdout = sys.__stdout__
320 sys.stderr = sys.__stderr__
326 stringIO_out_ =
nullptr;
327 stringIO_err_ =
nullptr;
329 __SUP_COUT__ <<
"Thread and garbage cleanup" << __E__;
332 "import threading; [t.join() for t in threading.enumerate() if t is not "
333 "threading.main_thread() and not isinstance(t, threading._DummyThread)]");
334 PyRun_SimpleString(
"import gc; gc.collect()");
338 __SUP_COUT__ <<
"Destroyed." << __E__;
342 void ARTDAQSupervisor::init(
void)
346 __SUP_COUT__ <<
"Initializing..." << __E__;
348 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
351 artdaq::configureMessageFacility(
"ARTDAQSupervisor");
352 __SUP_COUT__ <<
"artdaq MF configured." << __E__;
355 char* daqinterface_dir = getenv(
"ARTDAQ_DAQINTERFACE_DIR");
356 if(daqinterface_dir == NULL)
358 __SS__ <<
"ARTDAQ_DAQINTERFACE_DIR environment variable not set! This "
359 "means that DAQInterface has not been setup!"
365 __SUP_COUT__ <<
"Initializing Python" << __E__;
371 "from io import StringIO\n"
374 " def __init__(self, real, buf):\n"
375 " self.real = real\n"
377 " def write(self, data):\n"
378 " self.real.write(data)\n"
379 " self.buf.write(data)\n"
380 " def flush(self):\n"
381 " self.real.flush()\n"
382 " self.buf.flush()\n"
384 "tee_buffer = StringIO()\n"
385 "sys.stdout = TeeOut(sys.stdout, tee_buffer)\n"
386 "sys.stderr = TeeOut(sys.stderr, tee_buffer)\n");
388 __SUP_COUT__ <<
"Adding DAQInterface directory to PYTHON_PATH" << __E__;
389 PyObject* sysPath = PySys_GetObject(
391 PyObjectGuard programName(PyUnicode_FromString(daqinterface_dir));
392 PyList_Append(sysPath, programName.get());
394 __SUP_COUT__ <<
"Creating Module name" << __E__;
395 PyObjectGuard pName(PyUnicode_FromString(
"rc.control.daqinterface"));
398 __SUP_COUT__ <<
"Importing module" << __E__;
399 PyObjectGuard pModule(PyImport_Import(pName.get()));
401 if(pModule.get() == NULL)
403 std::string err = capturePyErr(
"import rc.control.daqinterface");
404 __SS__ <<
"Failed to load rc.control.daqinterface. Python Exception: "
410 __SUP_COUT__ <<
"Loading python module dictionary" << __E__;
411 PyObject* pDict = PyModule_GetDict(
415 std::string err = capturePyErr(
"module dict");
416 __SS__ <<
"Unable to load module dictionary. Python Exception: "
422 __SUP_COUT__ <<
"Getting DAQInterface object pointer" << __E__;
423 PyObject* di_obj_raw = PyDict_GetItemString(
424 pDict,
"DAQInterface");
425 if(di_obj_raw == NULL)
427 std::string err = capturePyErr(
"DAQInterface lookup");
428 __SS__ <<
"Unable to find 'DAQInterface' in module dictionary. "
433 Py_INCREF(di_obj_raw);
434 PyObjectGuard di_obj_ptr(di_obj_raw);
436 __SUP_COUT__ <<
"Filling out DAQInterface args struct" << __E__;
437 PyObjectGuard pArgs(PyTuple_New(0));
439 PyObjectGuard kwargs(Py_BuildValue(
"{s:s, s:s, s:i, s:i, s:s, s:s}",
453 __SUP_COUT__ <<
"Calling DAQInterface Object Constructor" << __E__;
456 PyObjectGuard sys(PyImport_ImportModule(
"sys"));
457 PyObjectGuard io(PyImport_ImportModule(
"io"));
464 stringIO_out_ = PyObject_CallMethod(io.get(),
"StringIO", NULL);
465 stringIO_err_ = PyObject_CallMethod(io.get(),
"StringIO", NULL);
472 PyObject_SetAttrString(sys.get(),
"stdout", stringIO_out_);
473 PyObject_SetAttrString(sys.get(),
"stderr", stringIO_err_);
479 PyImport_AddModule(
"__main__");
480 PyObject* globals = PyModule_GetDict(mainmod);
483 PyDict_GetItemString(globals,
"tee_buffer");
489 PyObject_Call(di_obj_ptr.get(), pArgs.get(), kwargs.get());
490 if(checkPythonError(daqinterface_ptr_))
492 std::string err = capturePyErr(
"DAQInterface constructor");
493 __SS__ <<
"DAQInterface constructor failed. Python Exception: "
502 PyObject_CallMethod(sys.get(),
"does_not_exist", NULL));
507 PyObjectGuard err_text(
508 PyObject_CallMethod(stringIO_err_,
"getvalue", NULL));
510 __COUT__ <<
"Captured stderr:\n"
511 << PyUnicode_AsUTF8(err_text.get()) <<
"\n";
513 __COUT__ <<
"Capture of stderr failed.";
543 __SUP_COUT__ <<
"Initialized." << __E__;
547 void ARTDAQSupervisor::transitionConfiguring(toolbox::Event::Reference )
549 __SUP_COUTT__ <<
"transitionConfiguring" << __E__;
552 if(RunControlStateMachine::getIterationIndex() == 0 &&
553 RunControlStateMachine::getSubIterationIndex() == 0)
555 thread_error_message_ =
"";
556 thread_progress_bar_.resetProgressBar(0);
557 last_thread_progress_update_ = time(0);
559 CoreSupervisorBase::configureInit();
562 std::thread(&ARTDAQSupervisor::configuringThread,
this).detach();
564 __SUP_COUT__ <<
"Configuring thread started." << __E__;
566 RunControlStateMachine::
567 indicateIterationWork();
571 std::string errorMessage;
573 std::lock_guard<std::mutex> lock(
575 errorMessage = thread_error_message_;
577 int progress = thread_progress_bar_.
read();
578 __SUP_COUTVS__(2, errorMessage);
579 __SUP_COUTVS__(2, progress);
580 __SUP_COUTVS__(2, thread_progress_bar_.
isComplete());
583 if(errorMessage ==
"" &&
584 time(0) - last_thread_progress_update_ > 600)
586 __SUP_SS__ <<
"There has been no update from the configuration thread for "
587 << (time(0) - last_thread_progress_update_)
588 <<
" seconds, assuming something is wrong and giving up! "
589 <<
"Last progress received was " << progress << __E__;
590 errorMessage = ss.str();
593 if(errorMessage !=
"")
595 __SUP_SS__ <<
"Error was caught in configuring thread: " << errorMessage
597 __SUP_COUT_ERR__ <<
"\n" << ss.str();
599 theStateMachine_.setErrorMessage(ss.str());
600 throw toolbox::fsm::exception::Exception(
603 "CoreSupervisorBase::transitionConfiguring" ,
611 __SUP_COUTT__ <<
"Not done yet..." << __E__;
615 RunControlStateMachine::
616 indicateIterationWork();
618 if(last_thread_progress_read_ != progress)
620 last_thread_progress_read_ = progress;
621 last_thread_progress_update_ = time(0);
628 __SUP_COUT_INFO__ <<
"Complete configuring transition!" << __E__;
629 __SUP_COUTV__(getProcessInfo_());
637 void ARTDAQSupervisor::configuringThread()
640 std::string uid = theConfigurationManager_
641 ->
getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
642 "/" + CorePropertySupervisorBase::getSupervisorUID() +
643 "/" +
"LinkToSupervisorTable")
646 __COUT__ <<
"Supervisor uid is " << uid <<
", getting supervisor table node" << __E__;
648 const std::string mfSubject_ = supervisorClassNoNamespace_ +
"-" + uid;
652 thread_progress_bar_.
step();
654 set_thread_message_(
"ConfigGen");
656 auto info = ARTDAQTableBase::extractARTDAQInfo(
660 getSupervisorProperty(
"max_fragment_size_bytes", 8888),
661 getSupervisorProperty(
"routing_timeout_ms", 1999),
662 getSupervisorProperty(
"routing_retry_count", 12),
663 &thread_progress_bar_);
666 if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::BoardReader) == 0)
668 __GEN_SS__ <<
"There must be at least one enabled BoardReader!" << __E__;
671 if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::EventBuilder) == 0)
673 __GEN_SS__ <<
"There must be at least one enabled EventBuilder!" << __E__;
677 thread_progress_bar_.
step();
678 set_thread_message_(
"Writing boot.txt");
680 __GEN_COUT__ <<
"Writing boot.txt" << __E__;
682 int debugLevel = theSupervisorNode.
getNode(
"DAQInterfaceDebugLevel").
getValue<
int>();
683 std::string setupScript = theSupervisorNode.
getNode(
"DAQSetupScript").
getValue();
686 std::string bootContent =
690 for(
auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
691 label_to_proc_type_map_[builder.label] =
"EventBuilder";
692 for(
auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
693 label_to_proc_type_map_[logger.label] =
"DataLogger";
694 for(
auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
695 label_to_proc_type_map_[dispatcher.label] =
"Dispatcher";
696 for(
auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
697 label_to_proc_type_map_[rmanager.label] =
"RoutingManager";
708 thread_progress_bar_.
step();
709 set_thread_message_(
"Writing Fhicl Files");
711 __GEN_COUT__ <<
"Building configuration directory" << __E__;
713 boost::system::error_code ignored;
718 for(
auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
720 symlink(ARTDAQTableBase::getFlatFHICLFilename(
721 ARTDAQTableBase::ARTDAQAppType::BoardReader, reader.label)
724 reader.label +
".fcl")
727 for(
auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
729 symlink(ARTDAQTableBase::getFlatFHICLFilename(
730 ARTDAQTableBase::ARTDAQAppType::EventBuilder, builder.label)
733 builder.label +
".fcl")
736 for(
auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
738 symlink(ARTDAQTableBase::getFlatFHICLFilename(
739 ARTDAQTableBase::ARTDAQAppType::DataLogger, logger.label)
742 logger.label +
".fcl")
745 for(
auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
747 symlink(ARTDAQTableBase::getFlatFHICLFilename(
748 ARTDAQTableBase::ARTDAQAppType::Dispatcher, dispatcher.label)
751 dispatcher.label +
".fcl")
754 for(
auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
756 symlink(ARTDAQTableBase::getFlatFHICLFilename(
757 ARTDAQTableBase::ARTDAQAppType::RoutingManager, rmanager.label)
760 rmanager.label +
".fcl")
764 thread_progress_bar_.
step();
769 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
771 if(daqinterface_state_ !=
"stopped" && daqinterface_state_ !=
"")
773 __GEN_SS__ <<
"Cannot configure DAQInterface because it is in the wrong state"
774 <<
" (" << daqinterface_state_ <<
" != stopped)!" << __E__;
778 if(daqinterface_ptr_ ==
nullptr)
780 __GEN_SS__ <<
"DAQInterface is not initialized. "
781 "Check earlier Python import/constructor errors (e.g. syntax) "
789 set_thread_message_(
"Calling setdaqcomps");
790 __GEN_COUT__ <<
"Calling setdaqcomps" << __E__;
792 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
794 __GEN_COUT__ <<
"Status before setdaqcomps: " << daqinterface_state_ << __E__;
796 PyObjectGuard pName1(PyUnicode_FromString(
"setdaqcomps"));
798 PyObjectGuard readerDict(PyDict_New());
799 for(
auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
802 label_to_proc_type_map_[reader.label] =
"BoardReader";
803 PyObjectGuard readerName(PyUnicode_FromString(reader.label.c_str()));
805 int list_size = reader.allowed_processors !=
"" ? 4 : 3;
807 PyObjectGuard readerData(PyList_New(list_size));
808 PyObject* readerHost = PyUnicode_FromString(reader.hostname.c_str());
809 PyObject* readerPort = PyUnicode_FromString(
"-1");
810 PyObject* readerSubsystem =
811 PyUnicode_FromString(std::to_string(reader.subsystem).c_str());
812 PyList_SetItem(readerData.get(), 0, readerHost);
813 PyList_SetItem(readerData.get(), 1, readerPort);
814 PyList_SetItem(readerData.get(), 2, readerSubsystem);
815 if(reader.allowed_processors !=
"")
817 PyObject* readerAllowedProcessors =
818 PyUnicode_FromString(reader.allowed_processors.c_str());
819 PyList_SetItem(readerData.get(), 3, readerAllowedProcessors);
821 PyDict_SetItem(readerDict.get(), readerName.get(), readerData.get());
823 PyObjectGuard res1(PyObject_CallMethodObjArgs(
824 daqinterface_ptr_, pName1.get(), readerDict.get(), NULL));
825 __COUT_MULTI_LBL__(0, captureStderrAndStdout_(
"setdaqcomps"),
"setdaqcomps");
827 if(checkPythonError(res1.get()))
829 std::string err_msg = capturePyErr(
"setdaqcomps");
830 __GEN_SS__ <<
"Error calling setdaqcomps: " << err_msg << __E__;
835 __GEN_COUT__ <<
"Status after setdaqcomps: " << daqinterface_state_ << __E__;
838 thread_progress_bar_.
step();
841 set_thread_message_(
"Calling do_boot");
842 __GEN_COUT_INFO__ <<
"Calling do_boot" << __E__;
843 std::string doBootOutput =
"";
845 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
847 __GEN_COUT__ <<
"Status before boot: " << daqinterface_state_ << __E__;
850 PyObjectGuard pNameBoot(PyUnicode_FromString(
"do_boot"));
851 PyObjectGuard pBootArgs(PyUnicode_FromString(
855 PyObjectGuard resBoot1(PyObject_CallMethodObjArgs(
856 daqinterface_ptr_, pNameBoot.get(), pBootArgs.get(), NULL));
858 doBootOutput = captureStderrAndStdout_(
"do_boot");
859 __COUT_MULTI_LBL__(0, doBootOutput,
"do_boot");
861 if(checkPythonError(resBoot1.get()))
865 std::string err1 = capturePyErr(
"do_boot");
867 __GEN_COUT_INFO__ <<
"Error on first boot attempt: " << err1
868 <<
". Recovering and retrying..." << __E__;
871 PyObjectGuard pNameRecover(PyUnicode_FromString(
"do_recover"));
872 PyObjectGuard resRecover(
873 PyObject_CallMethodObjArgs(daqinterface_ptr_, pNameRecover.get(), NULL));
874 __COUT_MULTI_LBL__(0, captureStderrAndStdout_(
"do_recover"),
"do_recover");
876 if(checkPythonError(resRecover.get()))
879 std::string errRec = capturePyErr(
"do_recover");
881 std::stringstream oss;
882 oss <<
"Error calling recover transition!!!! " << errRec;
883 if(doBootOutput.size() > OUT_ON_ERR_SIZE)
884 oss <<
"... last " << OUT_ON_ERR_SIZE
885 <<
" chars: " << doBootOutput.substr(doBootOutput.size() - 1000);
890 __GEN_SS__ << oss.str() << __E__;
895 thread_progress_bar_.
step();
896 set_thread_message_(
"Calling do_boot (retry)");
897 __GEN_COUT_INFO__ <<
"Calling do_boot again" << __E__;
900 PyObjectGuard resBoot2(PyObject_CallMethodObjArgs(
901 daqinterface_ptr_, pNameBoot.get(), pBootArgs.get(), NULL));
903 doBootOutput = captureStderrAndStdout_(
"do_boot (retry)");
904 __COUT_MULTI_LBL__(0, doBootOutput,
"do_boot (retry)");
906 if(checkPythonError(resBoot2.get()))
909 std::string err2 = capturePyErr(
"do_boot retry");
911 std::stringstream oss;
912 oss <<
"Error calling boot transition (2nd try): " << err2;
913 if(doBootOutput.size() > OUT_ON_ERR_SIZE)
914 oss <<
"... last " << OUT_ON_ERR_SIZE
915 <<
" chars: " << doBootOutput.substr(doBootOutput.size() - 1000);
919 __GEN_SS__ << oss.str() << __E__;
925 if(daqinterface_state_ !=
"booted")
927 std::cout <<
"Do boot output on error: \n" << doBootOutput << __E__;
928 __GEN_SS__ <<
"DAQInterface boot transition failed! "
929 <<
"Status after boot attempt: " << daqinterface_state_ << __E__;
931 if(doBootOutput.size() > OUT_ON_ERR_SIZE)
932 ss <<
"... last " << OUT_ON_ERR_SIZE
933 <<
" characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
938 __GEN_COUT__ <<
"Status after boot: " << daqinterface_state_ << __E__;
941 thread_progress_bar_.
step();
944 set_thread_message_(
"Calling do_config");
945 __GEN_COUT_INFO__ <<
"Calling do_config" << __E__;
946 std::string doConfigOutput =
"";
948 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
950 __GEN_COUT__ <<
"Status before config: " << daqinterface_state_ << __E__;
955 PyObjectGuard pName3(PyUnicode_FromString(
"do_config"));
957 PyObjectGuard pArg(Py_BuildValue(
"[s]", FAKE_CONFIG_NAME));
960 PyObjectGuard res3(PyObject_CallMethodObjArgs(
961 daqinterface_ptr_, pName3.get(), pArg.get(), NULL));
964 if(checkPythonError(res3.get()))
967 std::string err = capturePyErr(
"do_config");
970 doConfigOutput = captureStderrAndStdout_(
"do_config");
972 __GEN_SS__ <<
"Error calling config transition: " << err << __E__;
977 doConfigOutput = captureStderrAndStdout_(
"do_config");
978 __COUT_MULTI_LBL__(0, doConfigOutput,
"do_config");
982 PyObjectGuard strRes(PyObject_Str(res3.get()));
983 const char* res_cstr =
"";
986 res_cstr = PyUnicode_AsUTF8(strRes.get());
989 __SUP_COUTT__ <<
"do_config result=" << (res_cstr ? res_cstr :
"N/A")
994 if(daqinterface_state_ !=
"ready")
996 __GEN_SS__ <<
"DAQInterface config transition failed!" << __E__
997 <<
"Supervisor state: \"" << daqinterface_state_
998 <<
"\" != \"ready\" " << __E__;
999 auto doConfigOutput_recover_i =
1000 doConfigOutput.find(
"RECOVER transition underway");
1001 if(doConfigOutput_recover_i == std::string::npos)
1002 ss << doConfigOutput;
1003 else if(doConfigOutput_recover_i >
1005 ss <<
"... tail of " << OUT_ON_ERR_SIZE <<
" characters before recovery: "
1006 << doConfigOutput.substr(
1007 doConfigOutput_recover_i - OUT_ON_ERR_SIZE +
1008 std::string(
"RECOVER transition underway").size(),
1011 ss << doConfigOutput.substr(
1013 doConfigOutput_recover_i +
1014 std::string(
"RECOVER transition underway").size());
1017 __GEN_COUT__ <<
"Status after config: " << daqinterface_state_ << __E__;
1021 set_thread_message_(
"Configured");
1022 __GEN_COUT_INFO__ <<
"Configured." << __E__;
1025 catch(
const std::runtime_error& e)
1027 set_thread_message_(
"ERROR");
1028 __SS__ <<
"Error was caught while configuring: " << e.what() << __E__;
1029 __COUT_ERR__ <<
"\n" << ss.str();
1030 std::lock_guard<std::mutex> lock(thread_mutex_);
1031 thread_error_message_ = ss.str();
1035 set_thread_message_(
"ERROR");
1036 __SS__ <<
"Unknown error was caught while configuring. Please checked the logs."
1038 __COUT_ERR__ <<
"\n" << ss.str();
1040 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1042 std::lock_guard<std::mutex> lock(thread_mutex_);
1043 thread_error_message_ = ss.str();
1050 set_thread_message_(
"Halting");
1051 __SUP_COUT__ <<
"Halting..." << __E__;
1056 std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1060 __COUTS__(50) <<
"Do not have python lock for halt. tries=" << tries << __E__;
1064 __COUTS__(50) <<
"Have python lock!" << __E__;
1068 __SUP_COUT__ <<
"Status before halt: " << daqinterface_state_ << __E__;
1070 if(daqinterface_state_ ==
"running")
1073 PyObjectGuard pName(PyUnicode_FromString(
"do_stop_running"));
1075 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1077 0, captureStderrAndStdout_(
"do_stop_running"),
"do_stop_running");
1079 if(res.get() == NULL)
1081 std::string err = capturePyErr();
1082 __SS__ <<
"Error calling DAQ Interface stop transition: " << err
1088 PyObjectGuard pName(PyUnicode_FromString(
"do_command"));
1089 PyObjectGuard pArg(PyUnicode_FromString(
"Shutdown"));
1091 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1093 0, captureStderrAndStdout_(
"do_command Shutdown"),
"do_command Shutdown");
1095 if(checkPythonError(res.get()))
1097 std::string err = capturePyErr(
"do_command Shutdown");
1098 __SS__ <<
"Error calling DAQ Interface halt transition: " << err << __E__;
1103 __SUP_COUT__ <<
"Status after halt: " << daqinterface_state_ << __E__;
1109 __SUP_SS__ <<
"Failed to acquire python lock for halting after " << tries
1110 <<
" tries, giving up! Is it possible the configure thread is stuck?"
1115 __SUP_COUT__ <<
"Halted." << __E__;
1116 set_thread_message_(
"Halted");
1118 catch(
const std::runtime_error& e)
1120 const std::string transitionName =
"Halting";
1122 if(theStateMachine_.getProvenanceStateName() ==
1123 RunControlStateMachine::FAILED_STATE_NAME ||
1124 theStateMachine_.getProvenanceStateName() ==
1125 RunControlStateMachine::HALTED_STATE_NAME)
1127 __SUP_COUT_INFO__ <<
"Error was caught while halting (but ignoring because "
1128 "previous state was '"
1129 << RunControlStateMachine::FAILED_STATE_NAME
1130 <<
"'): " << e.what() << __E__;
1134 __SUP_SS__ <<
"Error was caught while " << transitionName <<
": " << e.what()
1136 __SUP_COUT_ERR__ <<
"\n" << ss.str();
1137 theStateMachine_.setErrorMessage(ss.str());
1138 throw toolbox::fsm::exception::Exception(
1139 "Transition Error" ,
1141 "ARTDAQSupervisorBase::transition" + transitionName ,
1149 const std::string transitionName =
"Halting";
1151 if(theStateMachine_.getProvenanceStateName() ==
1152 RunControlStateMachine::FAILED_STATE_NAME ||
1153 theStateMachine_.getProvenanceStateName() ==
1154 RunControlStateMachine::HALTED_STATE_NAME)
1156 __SUP_COUT_INFO__ <<
"Unknown error was caught while halting (but ignoring "
1157 "because previous state was '"
1158 << RunControlStateMachine::FAILED_STATE_NAME <<
"')." << __E__;
1162 __SUP_SS__ <<
"Unknown error was caught while " << transitionName
1163 <<
". Please checked the logs." << __E__;
1164 __SUP_COUT_ERR__ <<
"\n" << ss.str();
1165 theStateMachine_.setErrorMessage(ss.str());
1167 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1169 throw toolbox::fsm::exception::Exception(
1170 "Transition Error" ,
1172 "ARTDAQSupervisorBase::transition" + transitionName ,
1183 set_thread_message_(
"Initializing");
1184 __SUP_COUT__ <<
"Initializing..." << __E__;
1186 __SUP_COUT__ <<
"Initialized." << __E__;
1187 set_thread_message_(
"Initialized");
1189 catch(
const std::runtime_error& e)
1191 __SS__ <<
"Error was caught while Initializing: " << e.what() << __E__;
1196 __SS__ <<
"Unknown error was caught while Initializing. Please checked the logs."
1198 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1203 void ARTDAQSupervisor::transitionPausing(toolbox::Event::Reference )
1206 set_thread_message_(
"Pausing");
1207 __SUP_COUT__ <<
"Pausing..." << __E__;
1208 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1211 __SUP_COUT__ <<
"Status before pause: " << daqinterface_state_ << __E__;
1213 PyObjectGuard pName(PyUnicode_FromString(
"do_command"));
1214 PyObjectGuard pArg(PyUnicode_FromString(
"Pause"));
1216 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1218 0, captureStderrAndStdout_(
"do_command Pause"),
"do_command Pause");
1220 if(checkPythonError(res.get()))
1222 std::string err = capturePyErr(
"do_command Pause");
1223 __SS__ <<
"Error calling DAQ Interface Pause transition: " << err << __E__;
1228 __SUP_COUT__ <<
"Status after pause: " << daqinterface_state_ << __E__;
1230 __SUP_COUT__ <<
"Paused." << __E__;
1231 set_thread_message_(
"Paused");
1233 catch(
const std::runtime_error& e)
1235 __SS__ <<
"Error was caught while Pausing: " << e.what() << __E__;
1240 __SS__ <<
"Unknown error was caught while Pausing. Please checked the logs." << __E__;
1241 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1246 void ARTDAQSupervisor::transitionResuming(toolbox::Event::Reference )
1249 set_thread_message_(
"Resuming");
1250 __SUP_COUT__ <<
"Resuming..." << __E__;
1251 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1254 __SUP_COUT__ <<
"Status before resume: " << daqinterface_state_ << __E__;
1255 PyObjectGuard pName(PyUnicode_FromString(
"do_command"));
1256 PyObjectGuard pArg(PyUnicode_FromString(
"Resume"));
1258 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1260 0, captureStderrAndStdout_(
"do_command Resume"),
"do_command Resume");
1262 if(checkPythonError(res.get()))
1264 std::string err = capturePyErr(
"do_command Resume");
1265 __SS__ <<
"Error calling DAQ Interface Resume transition: " << err << __E__;
1270 __SUP_COUT__ <<
"Status after resume: " << daqinterface_state_ << __E__;
1271 __SUP_COUT__ <<
"Resumed." << __E__;
1272 set_thread_message_(
"Resumed");
1274 catch(
const std::runtime_error& e)
1276 __SS__ <<
"Error was caught while Resuming: " << e.what() << __E__;
1281 __SS__ <<
"Unknown error was caught while Resuming. Please checked the logs."
1283 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1288 void ARTDAQSupervisor::transitionStarting(toolbox::Event::Reference )
1291 __SUP_COUT__ <<
"transitionStarting" << __E__;
1294 if(RunControlStateMachine::getIterationIndex() == 0 &&
1295 RunControlStateMachine::getSubIterationIndex() == 0)
1297 thread_error_message_ =
"";
1298 thread_progress_bar_.resetProgressBar(0);
1299 last_thread_progress_update_ = time(0);
1302 std::thread(&ARTDAQSupervisor::startingThread,
this).detach();
1304 __SUP_COUT_INFO__ <<
"Starting thread started." << __E__;
1306 RunControlStateMachine::
1307 indicateIterationWork();
1311 std::string errorMessage;
1313 std::lock_guard<std::mutex> lock(
1315 errorMessage = thread_error_message_;
1317 int progress = thread_progress_bar_.
read();
1318 __SUP_COUTV__(errorMessage);
1319 __SUP_COUTV__(progress);
1320 __SUP_COUTV__(thread_progress_bar_.
isComplete());
1323 if(errorMessage ==
"" &&
1324 time(0) - last_thread_progress_update_ > 600)
1326 __SUP_SS__ <<
"There has been no update from the start thread for "
1327 << (time(0) - last_thread_progress_update_)
1328 <<
" seconds, assuming something is wrong and giving up! "
1329 <<
"Last progress received was " << progress << __E__;
1330 errorMessage = ss.str();
1333 if(errorMessage !=
"")
1335 __SUP_SS__ <<
"Error was caught in starting thread: " << errorMessage
1337 __SUP_COUT_ERR__ <<
"\n" << ss.str();
1339 theStateMachine_.setErrorMessage(ss.str());
1340 throw toolbox::fsm::exception::Exception(
1341 "Transition Error" ,
1343 "CoreSupervisorBase::transitionStarting" ,
1351 __SUP_COUT__ <<
"Not done yet..." << __E__;
1355 RunControlStateMachine::
1356 indicateIterationWork();
1358 if(last_thread_progress_read_ != progress)
1360 last_thread_progress_read_ = progress;
1361 last_thread_progress_update_ = time(0);
1368 __SUP_COUT_INFO__ <<
"Starting transition completed!" << __E__;
1369 __SUP_COUTV__(getProcessInfo_());
1376 catch(
const std::runtime_error& e)
1378 __SS__ <<
"Error was caught while Starting: " << e.what() << __E__;
1383 __SS__ <<
"Unknown error was caught while Starting. Please checked the logs."
1385 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1390 void ARTDAQSupervisor::startingThread()
1393 std::string uid = theConfigurationManager_
1394 ->
getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
1395 "/" + CorePropertySupervisorBase::getSupervisorUID() +
1396 "/" +
"LinkToSupervisorTable")
1399 __COUT__ <<
"Supervisor uid is " << uid <<
", getting supervisor table node" << __E__;
1400 const std::string mfSubject_ = supervisorClassNoNamespace_ +
"-" + uid;
1401 __GEN_COUT__ <<
"Starting..." << __E__;
1402 set_thread_message_(
"Starting");
1404 thread_progress_bar_.
step();
1407 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1409 __GEN_COUT__ <<
"Status before start: " << daqinterface_state_ << __E__;
1410 auto runNumber = SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
1412 .getValue(
"RunNumber");
1414 thread_progress_bar_.
step();
1416 __GEN_COUT_INFO__ <<
"Calling do_start_running" << __E__;
1417 PyObjectGuard pName(PyUnicode_FromString(
"do_start_running"));
1418 int run_number = std::stoi(runNumber);
1419 PyObjectGuard pStateArgs(PyLong_FromLong(run_number));
1420 PyObjectGuard res(PyObject_CallMethodObjArgs(
1421 daqinterface_ptr_, pName.get(), pStateArgs.get(), NULL));
1422 std::string doStartOutput;
1424 thread_progress_bar_.
step();
1426 if(checkPythonError(res.get()))
1428 std::string err = capturePyErr(
"do_start_running");
1429 doStartOutput = captureStderrAndStdout_(
"do_start_running");
1430 __SS__ <<
"Error calling start transition: " << err << __E__;
1431 if(doStartOutput.size() > OUT_ON_ERR_SIZE)
1432 ss <<
"... last " << OUT_ON_ERR_SIZE <<
" characters: "
1433 << doStartOutput.substr(doStartOutput.size() - OUT_ON_ERR_SIZE);
1435 ss << doStartOutput;
1439 doStartOutput = captureStderrAndStdout_(
"do_start_running");
1440 __COUT_MULTI_LBL__(0, doStartOutput,
"do_start_running");
1443 thread_progress_bar_.
step();
1445 __GEN_COUT__ <<
"Status after start: " << daqinterface_state_ << __E__;
1446 if(daqinterface_state_ !=
"running")
1448 __SS__ <<
"DAQInterface start transition failed!" << __E__
1449 <<
"DAQInterface state: \"" << daqinterface_state_
1450 <<
"\" != \"running\" " << __E__;
1451 if(doStartOutput.size() > OUT_ON_ERR_SIZE)
1452 ss <<
"... last " << OUT_ON_ERR_SIZE <<
" characters: "
1453 << doStartOutput.substr(doStartOutput.size() - OUT_ON_ERR_SIZE);
1455 ss << doStartOutput;
1459 thread_progress_bar_.
step();
1462 set_thread_message_(
"Started");
1463 thread_progress_bar_.
step();
1465 __GEN_COUT_INFO__ <<
"Started." << __E__;
1469 catch(
const std::runtime_error& e)
1471 __SS__ <<
"Error was caught while Starting: " << e.what() << __E__;
1472 __COUT_ERR__ <<
"\n" << ss.str();
1473 std::lock_guard<std::mutex> lock(thread_mutex_);
1474 thread_error_message_ = ss.str();
1478 __SS__ <<
"Unknown error was caught while Starting. Please checked the logs."
1480 __COUT_ERR__ <<
"\n" << ss.str();
1482 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1484 std::lock_guard<std::mutex> lock(thread_mutex_);
1485 thread_error_message_ = ss.str();
1489 void ARTDAQSupervisor::transitionStopping(toolbox::Event::Reference )
1492 __SUP_COUT__ <<
"Stopping..." << __E__;
1493 set_thread_message_(
"Stopping");
1494 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1496 __SUP_COUT__ <<
"Status before stop: " << daqinterface_state_ << __E__;
1497 PyObjectGuard pName(PyUnicode_FromString(
"do_stop_running"));
1498 PyObjectGuard res(PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1499 __COUT_MULTI_LBL__(0, captureStderrAndStdout_(
"do_stop_running"),
"do_stop_running");
1501 if(checkPythonError(res.get()))
1503 std::string err = capturePyErr(
"do_stop_running");
1504 __SS__ <<
"Error calling DAQ Interface stop transition: " << err << __E__;
1508 __SUP_COUT__ <<
"Status after stop: " << daqinterface_state_ << __E__;
1509 __SUP_COUT__ <<
"Stopped." << __E__;
1510 set_thread_message_(
"Stopped");
1512 catch(
const std::runtime_error& e)
1514 __SS__ <<
"Error was caught while Stopping: " << e.what() << __E__;
1519 __SS__ <<
"Unknown error was caught while Stopping. Please checked the logs."
1521 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1526 void ots::ARTDAQSupervisor::enteringError(toolbox::Event::Reference )
1528 __SUP_COUT__ <<
"Entering error recovery state" << __E__;
1529 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1531 __SUP_COUT__ <<
"Status before error: " << daqinterface_state_ << __E__;
1533 PyObjectGuard pName(PyUnicode_FromString(
"do_recover"));
1534 PyObjectGuard res(PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1535 __COUT_MULTI_LBL__(0, captureStderrAndStdout_(
"do_recover"),
"do_recover");
1537 if(checkPythonError(res.get()))
1539 std::string err = capturePyErr(
"do_recover");
1541 __SUP_COUT_WARN__ <<
"Error calling DAQ Interface recover transition: " << err
1547 __SUP_COUT__ <<
"Status after error: " << daqinterface_state_ << __E__;
1548 __SUP_COUT__ <<
"EnteringError DONE." << __E__;
1552 std::vector<SupervisorInfo::SubappInfo> ots::ARTDAQSupervisor::getSubappInfo(
void)
1554 auto apps = getAndParseProcessInfo_();
1556 std::map<int, SupervisorInfo::SubappInfo> subapp_infos;
1557 for(
auto& app : apps)
1561 info.
name = app.label;
1562 info.detail =
"Rank " + std::to_string(app.rank) +
", subsystem " +
1563 std::to_string(app.subsystem);
1564 info.lastStatusTime = time(0);
1565 info.progress = 100;
1566 info.status = artdaqStateToOtsState(app.state);
1567 info.url =
"http://" + app.host +
":" + std::to_string(app.port) +
"/RPC2";
1568 info.class_name =
"ARTDAQ " + labelToProcType_(app.label);
1570 subapp_infos[app.rank] = info;
1573 std::vector<SupervisorInfo::SubappInfo> output;
1574 for(
auto& [rank, info] : subapp_infos)
1576 output.push_back(info);
1585 bool ots::ARTDAQSupervisor::checkPythonError(PyObject* result)
1587 if(result == NULL || PyErr_Occurred())
1598 std::string ots::ARTDAQSupervisor::capturePyErr(std::string label )
1600 std::string err_msg =
"Unknown Python Error";
1601 PyObject * pType, *pValue, *pTraceback;
1602 PyErr_Fetch(&pType, &pValue, &pTraceback);
1603 PyErr_NormalizeException(&pType, &pValue, &pTraceback);
1608 PyObjectGuard traceback_module(PyImport_ImportModule(
"traceback"));
1609 if(traceback_module.get() != NULL)
1611 PyObjectGuard format_exception(
1612 PyObject_GetAttrString(traceback_module.get(),
"format_exception"));
1613 if(format_exception.get() != NULL)
1615 PyObjectGuard formatted(
1616 PyObject_CallFunctionObjArgs(format_exception.get(),
1618 pValue ? pValue : Py_None,
1619 pTraceback ? pTraceback : Py_None,
1621 if(formatted.get() != NULL)
1624 PyObjectGuard empty_string(PyUnicode_FromString(
""));
1625 PyObjectGuard joined(
1626 PyUnicode_Join(empty_string.get(), formatted.get()));
1627 if(joined.get() != NULL)
1629 const char* traceback_cstr = PyUnicode_AsUTF8(joined.get());
1631 err_msg = traceback_cstr;
1638 if(err_msg ==
"Unknown Python Error" && pValue != NULL)
1640 PyObjectGuard pStr(PyObject_Str(pValue));
1641 if(pStr.get() != NULL)
1643 const char* error_cstr = PyUnicode_AsUTF8(pStr.get());
1645 err_msg = error_cstr;
1652 Py_XDECREF(pTraceback);
1656 err_msg = label +
":\n" + err_msg;
1662 std::string ots::ARTDAQSupervisor::captureStderrAndStdout_(std::string label )
1667 if(PyErr_Occurred())
1672 std::string outString =
"";
1673 PyObjectGuard out(PyObject_CallMethod(stringIO_out_,
"getvalue", NULL));
1675 if(checkPythonError(out.get()))
1678 capturePyErr(
"captureStderrAndStdout getvalue");
1682 const char* text = PyUnicode_AsUTF8(out.get());
1684 return text ? text :
"";
1687 void ots::ARTDAQSupervisor::getDAQState_()
1689 __SUP_COUTS__(50) <<
"Getting DAQInterface python lock" << __E__;
1690 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1691 __SUP_COUTS__(50) <<
"Have DAQInterface python lock" << __E__;
1693 if(daqinterface_ptr_ == NULL)
1695 daqinterface_state_ =
"";
1696 __SUP_COUT_WARN__ <<
"daqinterface_ptr_ is not initialized!" << __E__;
1701 PyObjectGuard pName(PyUnicode_FromString(
"state"));
1702 PyObjectGuard pArg(PyUnicode_FromString(
"DAQInterface"));
1713 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1715 if(checkPythonError(res.get()))
1720 std::string err_msg = capturePyErr(
"state");
1722 std::ostringstream ss;
1723 ss <<
"Attempt n " << tries
1724 <<
". Error calling 'state'. Python Exception: " << err_msg;
1728 __COUT_ERR__ << ss.str() << __E__;
1729 daqinterface_state_ =
"ERROR";
1733 __COUT__ << ss.str() << __E__;
1742 PyObjectGuard strRes(PyObject_Str(res.get()));
1745 daqinterface_state_ = std::string(PyUnicode_AsUTF8(strRes.get()));
1750 daqinterface_state_ =
"UNKNOWN";
1753 __SUP_COUTS__(20) <<
"getDAQState_ state=" << daqinterface_state_ << __E__;
1761 std::string ots::ARTDAQSupervisor::getProcessInfo_(
void)
1763 __SUP_COUTS__(50) <<
"Getting DAQInterface state lock" << __E__;
1764 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1765 __SUP_COUTS__(50) <<
"Have DAQInterface state lock" << __E__;
1767 if(daqinterface_ptr_ ==
nullptr)
1772 PyObjectGuard pName(PyUnicode_FromString(
"artdaq_process_info"));
1773 PyObjectGuard pArg(PyUnicode_FromString(
"DAQInterface"));
1774 PyObjectGuard pArg2(PyBool_FromLong(
true));
1775 PyObjectGuard res(PyObject_CallMethodObjArgs(
1776 daqinterface_ptr_, pName.get(), pArg.get(), pArg2.get(), NULL));
1778 if(checkPythonError(res.get()))
1780 std::string err = capturePyErr(
"artdaq_process_info");
1781 __SS__ <<
"Error calling artdaq_process_info function: " << err << __E__;
1786 std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1787 daqinterface_status_ = std::string(PyUnicode_AsUTF8(res.get()));
1788 return daqinterface_status_;
1791 std::string ots::ARTDAQSupervisor::artdaqStateToOtsState(std::string state)
1793 if(state ==
"nonexistent" || state ==
"nonexistant")
1794 return RunControlStateMachine::INITIAL_STATE_NAME;
1795 if(state ==
"Ready")
1796 return "Configured";
1797 if(state ==
"Running")
1798 return RunControlStateMachine::RUNNING_STATE_NAME;
1799 if(state ==
"Paused")
1800 return RunControlStateMachine::PAUSED_STATE_NAME;
1801 if(state ==
"Stopped")
1802 return RunControlStateMachine::HALTED_STATE_NAME;
1804 TLOG(TLVL_WARNING) <<
"Unrecognized state name " << state;
1805 return RunControlStateMachine::FAILED_STATE_NAME;
1808 std::string ots::ARTDAQSupervisor::labelToProcType_(std::string label)
1810 if(label_to_proc_type_map_.count(label))
1812 return label_to_proc_type_map_[label];
1819 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo>
1820 ots::ARTDAQSupervisor::getAndParseProcessInfo_()
1822 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> output;
1827 std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1831 __COUTS__(50) <<
"Do not have python lock." << __E__;
1832 std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1833 info = daqinterface_status_;
1837 __COUTS__(50) <<
"Have python lock!" << __E__;
1838 info = getProcessInfo_();
1840 __COUTVS__(20, info);
1842 auto procs = tokenize_(info);
1851 std::regex re(
"(.*?) at ([^:]*):(\\d+) \\(subsystem (\\d+), rank (\\d+)\\): (.*)");
1853 for(
auto& proc : procs)
1856 if(std::regex_match(proc, match, re))
1858 DAQInterfaceProcessInfo info;
1860 info.label = match[1];
1861 info.host = match[2];
1862 info.port = std::stoi(match[3]);
1863 info.subsystem = std::stoi(match[4]);
1864 info.rank = std::stoi(match[5]);
1865 info.state = match[6];
1867 output.push_back(info);
1875 std::unique_ptr<artdaq::CommanderInterface>>>
1876 ots::ARTDAQSupervisor::makeCommandersFromProcessInfo()
1879 std::pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>>
1881 auto infos = getAndParseProcessInfo_();
1883 for(
auto& info : infos)
1885 artdaq::Commandable cm;
1886 fhicl::ParameterSet ps;
1888 ps.put<std::string>(
"commanderPluginType",
"xmlrpc");
1889 ps.put<
int>(
"id", info.port);
1890 ps.put<std::string>(
"server_url", info.host);
1892 output.emplace_back(std::make_pair<DAQInterfaceProcessInfo,
1893 std::unique_ptr<artdaq::CommanderInterface>>(
1894 std::move(info), artdaq::MakeCommanderPlugin(ps, cm)));
1901 std::list<std::string> ots::ARTDAQSupervisor::tokenize_(std::string
const& input)
1904 std::list<std::string> output;
1906 while(pos != std::string::npos && pos < input.size())
1908 auto newpos = input.find(
'\n', pos);
1909 if(newpos != std::string::npos)
1911 output.emplace_back(input, pos, newpos - pos);
1917 output.emplace_back(input, pos);
1926 void ots::ARTDAQSupervisor::daqinterfaceRunner_()
1929 TLOG(TLVL_TRACE) <<
"Runner thread starting";
1930 runner_running_ =
true;
1931 while(runner_running_)
1933 if(daqinterface_ptr_ != NULL)
1935 std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1937 std::string state_before = daqinterface_state_;
1939 __SUP_COUTS__(2) <<
"Runner state_before=" << state_before
1940 <<
" state now=" << daqinterface_state_
1941 <<
" ?= running, ready, or booted" << __E__;
1943 if(daqinterface_state_ ==
"running" || daqinterface_state_ ==
"ready" ||
1944 daqinterface_state_ ==
"booted")
1948 TLOG(TLVL_TRACE) <<
"Calling DAQInterface::check_proc_heartbeats";
1949 PyObjectGuard pName(PyUnicode_FromString(
"check_proc_heartbeats"));
1951 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1952 __COUT_MULTI_LBL__(1,
1953 captureStderrAndStdout_(
"check_proc_heartbeats"),
1954 "check_proc_heartbeats");
1956 <<
"Done with DAQInterface::check_proc_heartbeats call";
1958 if(res.get() == NULL)
1960 runner_running_ =
false;
1961 std::string err = capturePyErr(
"check_proc_heartbeats");
1962 __SS__ <<
"Error calling check_proc_heartbeats function: " << err
1968 catch(cet::exception& ex)
1970 runner_running_ =
false;
1971 std::string err = capturePyErr(
"check_proc_heartbeats");
1972 __SS__ <<
"An cet::exception occurred while calling "
1973 "check_proc_heartbeats function "
1974 << ex.explain_self() <<
": " << err << __E__;
1978 catch(std::exception& ex)
1980 runner_running_ =
false;
1981 std::string err = capturePyErr(
"check_proc_heartbeats");
1982 __SS__ <<
"An std::exception occurred while calling "
1983 "check_proc_heartbeats function: "
1984 << ex.what() <<
"\n\n"
1991 runner_running_ =
false;
1992 std::string err = capturePyErr(
"check_proc_heartbeats");
1993 __SS__ <<
"An unknown Error occurred while calling "
1994 "check_proc_heartbeats function: "
2002 if(daqinterface_state_ != state_before)
2004 runner_running_ =
false;
2006 __SS__ <<
"DAQInterface state unexpectedly changed from "
2007 << state_before <<
" to " << daqinterface_state_
2008 <<
". Check supervisor log file for more info!" << __E__;
2016 __SUP_COUT__ <<
"daqinterface_ptr_ is null" << __E__;
2021 runner_running_ =
false;
2022 TLOG(TLVL_TRACE) <<
"Runner thread complete";
2026 __SS__ <<
"An error occurred in "
2027 "start_runner_/daqinterfaceRunner_ thread "
2033 catch(
const std::runtime_error& e)
2035 ss <<
"Here is the error: " << e.what() << __E__;
2039 ss <<
"Unexpected error!" << __E__;
2041 __COUT_ERR__ << ss.str();
2044 std::lock_guard<std::mutex> lock(
2046 thread_error_message_ = ss.str();
2049 theStateMachine_.setErrorMessage(ss.str());
2051 sendAsyncExceptionToGateway(
2059 void ots::ARTDAQSupervisor::stop_runner_()
2061 runner_running_ =
false;
2062 if(runner_thread_ && runner_thread_->joinable())
2064 runner_thread_->join();
2065 runner_thread_.reset(
nullptr);
2070 void ots::ARTDAQSupervisor::start_runner_()
2074 std::make_unique<std::thread>(&ots::ARTDAQSupervisor::daqinterfaceRunner_,
this);
virtual void transitionHalting(toolbox::Event::Reference event) override
virtual void transitionInitializing(toolbox::Event::Reference event) override
static const std::string ARTDAQ_FCL_PATH
Tree-path rule is, if the last link in the path is a group link with a specified group ID,...
static std::string getBootFileContentFromInfo(const ARTDAQInfo &info, const std::string &setupScript, int debugLevel)
ConfigurationTree getNode(const std::string &nodeString, bool doNotThrowOnBrokenUIDLinks=false) const
"root/parent/parent/"
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
const std::string & getValueAsString(bool returnLinkTableValue=false) const
void getValue(T &value) const
ITRACEController * theTRACEController_
only define for an app that receives a command
bool isComplete()
get functions
int read()
if stepsToComplete==0, then define any progress as 50%, thread safe
void complete()
declare complete, thread safe
defines used also by OtsConfigurationWizardSupervisor
void INIT_MF(const char *name)
std::string name
Also key in map.