3 #define TRACEMF_USE_VERBATIM 1
4 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisor.hh"
6 #include "artdaq-core/Utilities/configureMessageFacility.hh"
7 #include "artdaq/BuildInfo/GetPackageBuildInfo.hh"
8 #include "artdaq/DAQdata/Globals.hh"
9 #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
10 #include "cetlib_except/exception.h"
11 #include "fhiclcpp/make_ParameterSet.h"
12 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisorTRACEController.h"
14 #include "artdaq-core/Utilities/ExceptionHandler.hh"
16 #include <boost/exception/all.hpp>
17 #include <boost/filesystem.hpp>
22 #define OUT_ON_ERR_SIZE 2000
28 #define FAKE_CONFIG_NAME "ots_config"
29 #define DAQINTERFACE_PORT \
30 std::atoi(__ENV__("ARTDAQ_BASE_PORT")) + \
31 (partition_ * std::atoi(__ENV__("ARTDAQ_PORTS_PER_PARTITION")))
34 static std::unordered_map<int, struct sigaction> old_actions =
35 std::unordered_map<int, struct sigaction>();
36 static bool sighandler_init =
false;
37 static void signal_handler(
int signum)
40 #if TRACE_REVNUM < 1459
41 TRACE_STREAMER(TLVL_ERROR, &(
"ARTDAQsupervisor")[0], 0, 0, 0)
43 TRACE_STREAMER(TLVL_ERROR, TLOG2(
"ARTDAQsupervisor", 0), 0)
45 <<
"A signal of type " << signum
46 <<
" was caught by ARTDAQSupervisor. Shutting down DAQInterface, "
47 "then proceeding with default handlers!";
53 pthread_sigmask(SIG_UNBLOCK, NULL, &set);
54 pthread_sigmask(SIG_UNBLOCK, &set, NULL);
56 #if TRACE_REVNUM < 1459
57 TRACE_STREAMER(TLVL_ERROR, &(
"ARTDAQsupervisor")[0], 0, 0, 0)
59 TRACE_STREAMER(TLVL_ERROR, TLOG2(
"ARTDAQsupervisor", 0), 0)
61 <<
"Calling default signal handler";
64 sigaction(signum, &old_actions[signum], NULL);
65 kill(getpid(), signum);
71 sigaction(SIGINT, &old_actions[SIGINT], NULL);
72 kill(getpid(), SIGINT);
78 static std::mutex sighandler_mutex;
79 std::unique_lock<std::mutex> lk(sighandler_mutex);
83 sighandler_init =
true;
85 std::vector<int> signals = {
96 for(
auto signal : signals)
98 struct sigaction old_action;
99 sigaction(signal, NULL, &old_action);
103 if(old_action.sa_handler != SIG_IGN)
105 struct sigaction action;
106 action.sa_handler = signal_handler;
107 sigemptyset(&action.sa_mask);
108 for(
auto sigblk : signals)
110 sigaddset(&action.sa_mask, sigblk);
116 sigaction(signal, &action, NULL);
117 old_actions[signal] = old_action;
124 ARTDAQSupervisor::ARTDAQSupervisor(xdaq::ApplicationStub* stub)
126 , daqinterface_ptr_(NULL)
127 , partition_(getSupervisorProperty(
"partition", 0))
128 , daqinterface_state_(
"notrunning")
129 , runner_thread_(nullptr)
131 __SUP_COUT__ <<
"Constructor." << __E__;
134 init_sighandler(
this);
141 auto settings_file = __ENV__(
"DAQINTERFACE_SETTINGS");
142 std::ofstream o(settings_file, std::ios::trunc);
144 setenv(
"DAQINTERFACE_PARTITION_NUMBER", std::to_string(partition_).c_str(), 1);
145 auto logfileName = std::string(__ENV__(
"OTSDAQ_LOG_DIR")) +
146 "/DAQInteface/DAQInterface_partition" +
147 std::to_string(partition_) +
".log";
148 setenv(
"DAQINTERFACE_LOGFILE", logfileName.c_str(), 1);
150 o <<
"log_directory: "
151 << getSupervisorProperty(
"log_directory", std::string(__ENV__(
"OTSDAQ_LOG_DIR")))
155 const std::string record_directory = getSupervisorProperty(
156 "record_directory", ARTDAQTableBase::ARTDAQ_FCL_PATH +
"/run_records/");
157 mkdir(record_directory.c_str(), 0755);
158 o <<
"record_directory: " << record_directory << std::endl;
161 o <<
"package_hashes_to_save: "
162 << getSupervisorProperty(
"package_hashes_to_save",
"[artdaq]") << std::endl;
164 o <<
"spack_root_for_bash_scripts: "
165 << getSupervisorProperty(
"spack_root_for_bash_scripts",
166 std::string(__ENV__(
"SPACK_ROOT")))
168 o <<
"boardreader timeout: " << getSupervisorProperty(
"boardreader_timeout", 30)
170 o <<
"eventbuilder timeout: " << getSupervisorProperty(
"eventbuilder_timeout", 30)
172 o <<
"datalogger timeout: " << getSupervisorProperty(
"datalogger_timeout", 30)
174 o <<
"dispatcher timeout: " << getSupervisorProperty(
"dispatcher_timeout", 30)
177 if(!getSupervisorProperty(
"advanced_memory_usage",
false))
179 o <<
"max_fragment_size_bytes: "
180 << getSupervisorProperty(
"max_fragment_size_bytes", 1048576) << std::endl;
182 o <<
"transfer_plugin_to_use: "
183 << getSupervisorProperty(
"transfer_plugin_to_use",
"TCPSocket") << std::endl;
184 if(getSupervisorProperty(
"transfer_plugin_from_brs",
"") !=
"")
186 o <<
"transfer_plugin_from_brs: "
187 << getSupervisorProperty(
"transfer_plugin_from_brs",
"") << std::endl;
189 if(getSupervisorProperty(
"transfer_plugin_from_ebs",
"") !=
"")
191 o <<
"transfer_plugin_from_ebs: "
192 << getSupervisorProperty(
"transfer_plugin_from_ebs",
"") << std::endl;
194 if(getSupervisorProperty(
"transfer_plugin_from_dls",
"") !=
"")
196 o <<
"transfer_plugin_from_dls: "
197 << getSupervisorProperty(
"transfer_plugin_from_dls",
"") << std::endl;
199 o <<
"all_events_to_all_dispatchers: " << std::boolalpha
200 << getSupervisorProperty(
"all_events_to_all_dispatchers",
true) << std::endl;
201 if(getSupervisorProperty(
"data_directory_override",
"") !=
"")
203 o <<
"data_directory_override: "
204 << getSupervisorProperty(
"data_directory_override",
"") << std::endl;
206 o <<
"max_configurations_to_list: "
207 << getSupervisorProperty(
"max_configurations_to_list", 10) << std::endl;
208 o <<
"disable_unique_rootfile_labels: "
209 << getSupervisorProperty(
"disable_unique_rootfile_labels",
false) << std::endl;
210 o <<
"use_messageviewer: " << std::boolalpha
211 << getSupervisorProperty(
"use_messageviewer",
false) << std::endl;
212 o <<
"use_messagefacility: " << std::boolalpha
213 << getSupervisorProperty(
"use_messagefacility",
true) << std::endl;
214 o <<
"fake_messagefacility: " << std::boolalpha
215 << getSupervisorProperty(
"fake_messagefacility",
false) << std::endl;
216 o <<
"kill_existing_processes: " << std::boolalpha
217 << getSupervisorProperty(
"kill_existing_processes",
true) << std::endl;
218 o <<
"advanced_memory_usage: " << std::boolalpha
219 << getSupervisorProperty(
"advanced_memory_usage",
false) << std::endl;
220 o <<
"strict_fragment_id_mode: " << std::boolalpha
221 << getSupervisorProperty(
"strict_fragment_id_mode",
false) << std::endl;
222 o <<
"disable_private_network_bookkeeping: " << std::boolalpha
223 << getSupervisorProperty(
"disable_private_network_bookkeeping",
false) << std::endl;
224 o <<
"allowed_processors: "
225 << getSupervisorProperty(
226 "allowed_processors",
229 if(getSupervisorProperty(
"partition_label_format",
"") !=
231 o <<
"partition_label_format: "
232 << getSupervisorProperty(
"partition_label_format",
"") << std::endl;
237 if(CorePropertySupervisorBase::theTRACEController_)
239 __SUP_COUT__ <<
"Destroying TRACE Controller..." << __E__;
240 delete CorePropertySupervisorBase::
242 CorePropertySupervisorBase::theTRACEController_ =
nullptr;
244 CorePropertySupervisorBase::theTRACEController_ =
247 ->setSupervisorPtr(
this);
249 __SUP_COUT__ <<
"Constructed." << __E__;
253 ARTDAQSupervisor::~ARTDAQSupervisor(
void)
255 __SUP_COUT__ <<
"Destructor." << __E__;
258 __SUP_COUT__ <<
"Calling Py_Finalize()" << __E__;
264 __SUP_COUT__ <<
"Destroying TRACE Controller..." << __E__;
269 __SUP_COUT__ <<
"Destructed." << __E__;
273 void ARTDAQSupervisor::destroy(
void)
275 __SUP_COUT__ <<
"Destroying..." << __E__;
277 if(daqinterface_ptr_ != NULL)
279 __SUP_COUT__ <<
"Calling recover transition" << __E__;
280 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
282 PyObjectGuard pName(PyUnicode_FromString(
"do_recover"));
284 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
286 __SUP_COUT__ <<
"Making sure that correct state has been reached" << __E__;
288 while(daqinterface_state_ !=
"stopped")
291 __SUP_COUT__ <<
"State is " << daqinterface_state_
292 <<
", waiting 1s and retrying..." << __E__;
297 Py_XDECREF(daqinterface_ptr_);
298 daqinterface_ptr_ = NULL;
301 __SUP_COUT__ <<
"Flusing printouts" << __E__;
304 PyRun_SimpleString(R
"(
306 sys.stdout = sys.__stdout__
307 sys.stderr = sys.__stderr__
313 stringIO_out_ =
nullptr;
314 stringIO_err_ =
nullptr;
316 __SUP_COUT__ <<
"Thread and garbage cleanup" << __E__;
319 "import threading; [t.join() for t in threading.enumerate() if t is not "
320 "threading.main_thread() and not isinstance(t, threading._DummyThread)]");
321 PyRun_SimpleString(
"import gc; gc.collect()");
325 __SUP_COUT__ <<
"Destroyed." << __E__;
329 void ARTDAQSupervisor::init(
void)
333 __SUP_COUT__ <<
"Initializing..." << __E__;
335 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
338 artdaq::configureMessageFacility(
"ARTDAQSupervisor");
339 __SUP_COUT__ <<
"artdaq MF configured." << __E__;
342 char* daqinterface_dir = getenv(
"ARTDAQ_DAQINTERFACE_DIR");
343 if(daqinterface_dir == NULL)
345 __SS__ <<
"ARTDAQ_DAQINTERFACE_DIR environment variable not set! This "
346 "means that DAQInterface has not been setup!"
352 __SUP_COUT__ <<
"Initializing Python" << __E__;
358 "from io import StringIO\n"
361 " def __init__(self, real, buf):\n"
362 " self.real = real\n"
364 " def write(self, data):\n"
365 " self.real.write(data)\n"
366 " self.buf.write(data)\n"
367 " def flush(self):\n"
368 " self.real.flush()\n"
369 " self.buf.flush()\n"
371 "tee_buffer = StringIO()\n"
372 "sys.stdout = TeeOut(sys.stdout, tee_buffer)\n"
373 "sys.stderr = TeeOut(sys.stderr, tee_buffer)\n");
375 __SUP_COUT__ <<
"Adding DAQInterface directory to PYTHON_PATH" << __E__;
376 PyObject* sysPath = PySys_GetObject(
378 PyObjectGuard programName(PyUnicode_FromString(daqinterface_dir));
379 PyList_Append(sysPath, programName.get());
381 __SUP_COUT__ <<
"Creating Module name" << __E__;
382 PyObjectGuard pName(PyUnicode_FromString(
"rc.control.daqinterface"));
385 __SUP_COUT__ <<
"Importing module" << __E__;
386 PyObjectGuard pModule(PyImport_Import(pName.get()));
388 if(pModule.get() == NULL)
390 std::string err = capturePyErr(
"import rc.control.daqinterface");
391 __SS__ <<
"Failed to load rc.control.daqinterface. Python Exception: "
397 __SUP_COUT__ <<
"Loading python module dictionary" << __E__;
398 PyObject* pDict = PyModule_GetDict(
402 std::string err = capturePyErr(
"module dict");
403 __SS__ <<
"Unable to load module dictionary. Python Exception: "
409 __SUP_COUT__ <<
"Getting DAQInterface object pointer" << __E__;
410 PyObject* di_obj_raw = PyDict_GetItemString(
411 pDict,
"DAQInterface");
412 if(di_obj_raw == NULL)
414 std::string err = capturePyErr(
"DAQInterface lookup");
415 __SS__ <<
"Unable to find 'DAQInterface' in module dictionary. "
420 Py_INCREF(di_obj_raw);
421 PyObjectGuard di_obj_ptr(di_obj_raw);
423 __SUP_COUT__ <<
"Filling out DAQInterface args struct" << __E__;
424 PyObjectGuard pArgs(PyTuple_New(0));
426 PyObjectGuard kwargs(Py_BuildValue(
"{s:s, s:s, s:i, s:i, s:s, s:s}",
440 __SUP_COUT__ <<
"Calling DAQInterface Object Constructor" << __E__;
443 PyObjectGuard sys(PyImport_ImportModule(
"sys"));
444 PyObjectGuard io(PyImport_ImportModule(
"io"));
451 stringIO_out_ = PyObject_CallMethod(io.get(),
"StringIO", NULL);
452 stringIO_err_ = PyObject_CallMethod(io.get(),
"StringIO", NULL);
459 PyObject_SetAttrString(sys.get(),
"stdout", stringIO_out_);
460 PyObject_SetAttrString(sys.get(),
"stderr", stringIO_err_);
466 PyImport_AddModule(
"__main__");
467 PyObject* globals = PyModule_GetDict(mainmod);
470 PyDict_GetItemString(globals,
"tee_buffer");
476 PyObject_Call(di_obj_ptr.get(), pArgs.get(), kwargs.get());
477 if(checkPythonError(daqinterface_ptr_))
479 std::string err = capturePyErr(
"DAQInterface constructor");
480 __SS__ <<
"DAQInterface constructor failed. Python Exception: "
489 PyObject_CallMethod(sys.get(),
"does_not_exist", NULL));
494 PyObjectGuard err_text(
495 PyObject_CallMethod(stringIO_err_,
"getvalue", NULL));
497 __COUT__ <<
"Captured stderr:\n"
498 << PyUnicode_AsUTF8(err_text.get()) <<
"\n";
500 __COUT__ <<
"Capture of stderr failed.";
530 __SUP_COUT__ <<
"Initialized." << __E__;
534 void ARTDAQSupervisor::transitionConfiguring(toolbox::Event::Reference )
536 __SUP_COUT__ <<
"transitionConfiguring" << __E__;
539 if(RunControlStateMachine::getIterationIndex() == 0 &&
540 RunControlStateMachine::getSubIterationIndex() == 0)
542 thread_error_message_ =
"";
543 thread_progress_bar_.resetProgressBar(0);
544 last_thread_progress_update_ = time(0);
546 CoreSupervisorBase::configureInit();
549 std::thread(&ARTDAQSupervisor::configuringThread,
this).detach();
551 __SUP_COUT__ <<
"Configuring thread started." << __E__;
553 RunControlStateMachine::
554 indicateIterationWork();
558 std::string errorMessage;
560 std::lock_guard<std::mutex> lock(
562 errorMessage = thread_error_message_;
564 int progress = thread_progress_bar_.
read();
565 __SUP_COUTVS__(2, errorMessage);
566 __SUP_COUTVS__(2, progress);
567 __SUP_COUTVS__(2, thread_progress_bar_.
isComplete());
570 if(errorMessage ==
"" &&
571 time(0) - last_thread_progress_update_ > 600)
573 __SUP_SS__ <<
"There has been no update from the configuration thread for "
574 << (time(0) - last_thread_progress_update_)
575 <<
" seconds, assuming something is wrong and giving up! "
576 <<
"Last progress received was " << progress << __E__;
577 errorMessage = ss.str();
580 if(errorMessage !=
"")
582 __SUP_SS__ <<
"Error was caught in configuring thread: " << errorMessage
584 __SUP_COUT_ERR__ <<
"\n" << ss.str();
586 theStateMachine_.setErrorMessage(ss.str());
587 throw toolbox::fsm::exception::Exception(
590 "CoreSupervisorBase::transitionConfiguring" ,
598 __SUP_COUT__ <<
"Not done yet..." << __E__;
602 RunControlStateMachine::
603 indicateIterationWork();
605 if(last_thread_progress_read_ != progress)
607 last_thread_progress_read_ = progress;
608 last_thread_progress_update_ = time(0);
615 __SUP_COUT_INFO__ <<
"Complete configuring transition!" << __E__;
616 __SUP_COUTV__(getProcessInfo_());
624 void ARTDAQSupervisor::configuringThread()
627 std::string uid = theConfigurationManager_
628 ->
getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
629 "/" + CorePropertySupervisorBase::getSupervisorUID() +
630 "/" +
"LinkToSupervisorTable")
633 __COUT__ <<
"Supervisor uid is " << uid <<
", getting supervisor table node" << __E__;
635 const std::string mfSubject_ = supervisorClassNoNamespace_ +
"-" + uid;
639 thread_progress_bar_.
step();
641 set_thread_message_(
"ConfigGen");
643 auto info = ARTDAQTableBase::extractARTDAQInfo(
647 getSupervisorProperty(
"max_fragment_size_bytes", 8888),
648 getSupervisorProperty(
"routing_timeout_ms", 1999),
649 getSupervisorProperty(
"routing_retry_count", 12),
650 &thread_progress_bar_);
653 if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::BoardReader) == 0)
655 __GEN_SS__ <<
"There must be at least one enabled BoardReader!" << __E__;
658 if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::EventBuilder) == 0)
660 __GEN_SS__ <<
"There must be at least one enabled EventBuilder!" << __E__;
664 thread_progress_bar_.
step();
665 set_thread_message_(
"Writing boot.txt");
667 __GEN_COUT__ <<
"Writing boot.txt" << __E__;
669 int debugLevel = theSupervisorNode.
getNode(
"DAQInterfaceDebugLevel").
getValue<
int>();
670 std::string setupScript = theSupervisorNode.
getNode(
"DAQSetupScript").
getValue();
673 std::string bootContent =
677 for(
auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
678 label_to_proc_type_map_[builder.label] =
"EventBuilder";
679 for(
auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
680 label_to_proc_type_map_[logger.label] =
"DataLogger";
681 for(
auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
682 label_to_proc_type_map_[dispatcher.label] =
"Dispatcher";
683 for(
auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
684 label_to_proc_type_map_[rmanager.label] =
"RoutingManager";
695 thread_progress_bar_.
step();
696 set_thread_message_(
"Writing Fhicl Files");
698 __GEN_COUT__ <<
"Building configuration directory" << __E__;
700 boost::system::error_code ignored;
705 for(
auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
707 symlink(ARTDAQTableBase::getFlatFHICLFilename(
708 ARTDAQTableBase::ARTDAQAppType::BoardReader, reader.label)
711 reader.label +
".fcl")
714 for(
auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
716 symlink(ARTDAQTableBase::getFlatFHICLFilename(
717 ARTDAQTableBase::ARTDAQAppType::EventBuilder, builder.label)
720 builder.label +
".fcl")
723 for(
auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
725 symlink(ARTDAQTableBase::getFlatFHICLFilename(
726 ARTDAQTableBase::ARTDAQAppType::DataLogger, logger.label)
729 logger.label +
".fcl")
732 for(
auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
734 symlink(ARTDAQTableBase::getFlatFHICLFilename(
735 ARTDAQTableBase::ARTDAQAppType::Dispatcher, dispatcher.label)
738 dispatcher.label +
".fcl")
741 for(
auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
743 symlink(ARTDAQTableBase::getFlatFHICLFilename(
744 ARTDAQTableBase::ARTDAQAppType::RoutingManager, rmanager.label)
747 rmanager.label +
".fcl")
751 thread_progress_bar_.
step();
753 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
755 if(daqinterface_state_ !=
"stopped" && daqinterface_state_ !=
"")
757 __GEN_SS__ <<
"Cannot configure DAQInterface because it is in the wrong state"
758 <<
" (" << daqinterface_state_ <<
" != stopped)!" << __E__;
762 set_thread_message_(
"Calling setdaqcomps");
763 __GEN_COUT__ <<
"Calling setdaqcomps" << __E__;
764 __GEN_COUT__ <<
"Status before setdaqcomps: " << daqinterface_state_ << __E__;
765 if(daqinterface_ptr_ ==
nullptr)
767 __GEN_SS__ <<
"DAQInterface is not initialized. "
768 "Check earlier Python import/constructor errors (e.g. syntax) "
773 PyObjectGuard pName1(PyUnicode_FromString(
"setdaqcomps"));
775 PyObjectGuard readerDict(PyDict_New());
776 for(
auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
779 label_to_proc_type_map_[reader.label] =
"BoardReader";
780 PyObjectGuard readerName(PyUnicode_FromString(reader.label.c_str()));
782 int list_size = reader.allowed_processors !=
"" ? 4 : 3;
784 PyObjectGuard readerData(PyList_New(list_size));
785 PyObject* readerHost = PyUnicode_FromString(reader.hostname.c_str());
786 PyObject* readerPort = PyUnicode_FromString(
"-1");
787 PyObject* readerSubsystem =
788 PyUnicode_FromString(std::to_string(reader.subsystem).c_str());
789 PyList_SetItem(readerData.get(), 0, readerHost);
790 PyList_SetItem(readerData.get(), 1, readerPort);
791 PyList_SetItem(readerData.get(), 2, readerSubsystem);
792 if(reader.allowed_processors !=
"")
794 PyObject* readerAllowedProcessors =
795 PyUnicode_FromString(reader.allowed_processors.c_str());
796 PyList_SetItem(readerData.get(), 3, readerAllowedProcessors);
798 PyDict_SetItem(readerDict.get(), readerName.get(), readerData.get());
800 PyObjectGuard res1(PyObject_CallMethodObjArgs(
801 daqinterface_ptr_, pName1.get(), readerDict.get(), NULL));
802 __COUT_MULTI_LBL__(0, captureStderrAndStdout_(
"setdaqcomps"),
"setdaqcomps");
804 if(checkPythonError(res1.get()))
806 std::string err_msg = capturePyErr(
"setdaqcomps");
807 __GEN_SS__ <<
"Error calling setdaqcomps: " << err_msg << __E__;
812 __GEN_COUT__ <<
"Status after setdaqcomps: " << daqinterface_state_ << __E__;
814 thread_progress_bar_.
step();
815 set_thread_message_(
"Calling do_boot");
816 __GEN_COUT_INFO__ <<
"Calling do_boot" << __E__;
817 __GEN_COUT__ <<
"Status before boot: " << daqinterface_state_ << __E__;
820 PyObjectGuard pNameBoot(PyUnicode_FromString(
"do_boot"));
821 PyObjectGuard pBootArgs(
825 PyObjectGuard resBoot1(PyObject_CallMethodObjArgs(
826 daqinterface_ptr_, pNameBoot.get(), pBootArgs.get(), NULL));
828 std::string doBootOutput = captureStderrAndStdout_(
"do_boot");
829 __COUT_MULTI_LBL__(0, doBootOutput,
"do_boot");
831 if(checkPythonError(resBoot1.get()))
835 std::string err1 = capturePyErr(
"do_boot");
837 __GEN_COUT_INFO__ <<
"Error on first boot attempt: " << err1
838 <<
". Recovering and retrying..." << __E__;
841 PyObjectGuard pNameRecover(PyUnicode_FromString(
"do_recover"));
842 PyObjectGuard resRecover(
843 PyObject_CallMethodObjArgs(daqinterface_ptr_, pNameRecover.get(), NULL));
844 __COUT_MULTI_LBL__(0, captureStderrAndStdout_(
"do_recover"),
"do_recover");
846 if(checkPythonError(resRecover.get()))
849 std::string errRec = capturePyErr(
"do_recover");
851 std::stringstream oss;
852 oss <<
"Error calling recover transition!!!! " << errRec;
853 if(doBootOutput.size() > OUT_ON_ERR_SIZE)
854 oss <<
"... last " << OUT_ON_ERR_SIZE
855 <<
" chars: " << doBootOutput.substr(doBootOutput.size() - 1000);
860 __GEN_SS__ << oss.str() << __E__;
865 thread_progress_bar_.
step();
866 set_thread_message_(
"Calling do_boot (retry)");
867 __GEN_COUT_INFO__ <<
"Calling do_boot again" << __E__;
870 PyObjectGuard resBoot2(PyObject_CallMethodObjArgs(
871 daqinterface_ptr_, pNameBoot.get(), pBootArgs.get(), NULL));
873 doBootOutput = captureStderrAndStdout_(
"do_boot (retry)");
874 __COUT_MULTI_LBL__(0, doBootOutput,
"do_boot (retry)");
876 if(checkPythonError(resBoot2.get()))
879 std::string err2 = capturePyErr(
"do_boot retry");
881 std::stringstream oss;
882 oss <<
"Error calling boot transition (2nd try): " << err2;
883 if(doBootOutput.size() > OUT_ON_ERR_SIZE)
884 oss <<
"... last " << OUT_ON_ERR_SIZE
885 <<
" chars: " << doBootOutput.substr(doBootOutput.size() - 1000);
889 __GEN_SS__ << oss.str() << __E__;
895 if(daqinterface_state_ !=
"booted")
897 std::cout <<
"Do boot output on error: \n" << doBootOutput << __E__;
898 __GEN_SS__ <<
"DAQInterface boot transition failed! "
899 <<
"Status after boot attempt: " << daqinterface_state_ << __E__;
901 if(doBootOutput.size() > OUT_ON_ERR_SIZE)
902 ss <<
"... last " << OUT_ON_ERR_SIZE
903 <<
" characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
908 __GEN_COUT__ <<
"Status after boot: " << daqinterface_state_ << __E__;
910 thread_progress_bar_.
step();
911 set_thread_message_(
"Calling do_config");
912 __GEN_COUT_INFO__ <<
"Calling do_config" << __E__;
913 __GEN_COUT__ <<
"Status before config: " << daqinterface_state_ << __E__;
914 std::string doConfigOutput =
"";
918 PyObjectGuard pName3(PyUnicode_FromString(
"do_config"));
920 PyObjectGuard pArg(Py_BuildValue(
"[s]", FAKE_CONFIG_NAME));
923 PyObjectGuard res3(PyObject_CallMethodObjArgs(
924 daqinterface_ptr_, pName3.get(), pArg.get(), NULL));
927 if(checkPythonError(res3.get()))
930 std::string err = capturePyErr(
"do_config");
933 doConfigOutput = captureStderrAndStdout_(
"do_config");
935 __GEN_SS__ <<
"Error calling config transition: " << err << __E__;
940 doConfigOutput = captureStderrAndStdout_(
"do_config");
941 __COUT_MULTI_LBL__(0, doConfigOutput,
"do_config");
945 PyObjectGuard strRes(PyObject_Str(res3.get()));
946 const char* res_cstr =
"";
949 res_cstr = PyUnicode_AsUTF8(strRes.get());
952 __SUP_COUTT__ <<
"do_config result=" << (res_cstr ? res_cstr :
"N/A") << __E__;
956 if(daqinterface_state_ !=
"ready")
958 __GEN_SS__ <<
"DAQInterface config transition failed!" << __E__
959 <<
"Supervisor state: \"" << daqinterface_state_ <<
"\" != \"ready\" "
961 auto doConfigOutput_recover_i =
962 doConfigOutput.find(
"RECOVER transition underway");
963 if(doConfigOutput_recover_i == std::string::npos)
964 ss << doConfigOutput;
965 else if(doConfigOutput_recover_i >
967 ss <<
"... tail of " << OUT_ON_ERR_SIZE <<
" characters before recovery: "
968 << doConfigOutput.substr(
969 doConfigOutput_recover_i - OUT_ON_ERR_SIZE +
970 std::string(
"RECOVER transition underway").size(),
973 ss << doConfigOutput.substr(
975 doConfigOutput_recover_i +
976 std::string(
"RECOVER transition underway").size());
979 __GEN_COUT__ <<
"Status after config: " << daqinterface_state_ << __E__;
981 set_thread_message_(
"Configured");
982 __GEN_COUT_INFO__ <<
"Configured." << __E__;
985 catch(
const std::runtime_error& e)
987 set_thread_message_(
"ERROR");
988 __SS__ <<
"Error was caught while configuring: " << e.what() << __E__;
989 __COUT_ERR__ <<
"\n" << ss.str();
990 std::lock_guard<std::mutex> lock(thread_mutex_);
991 thread_error_message_ = ss.str();
995 set_thread_message_(
"ERROR");
996 __SS__ <<
"Unknown error was caught while configuring. Please checked the logs."
998 __COUT_ERR__ <<
"\n" << ss.str();
1000 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1002 std::lock_guard<std::mutex> lock(thread_mutex_);
1003 thread_error_message_ = ss.str();
1010 set_thread_message_(
"Halting");
1011 __SUP_COUT__ <<
"Halting..." << __E__;
1016 std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1020 __COUTS__(50) <<
"Do not have python lock for halt. tries=" << tries << __E__;
1024 __COUTS__(50) <<
"Have python lock!" << __E__;
1028 __SUP_COUT__ <<
"Status before halt: " << daqinterface_state_ << __E__;
1030 if(daqinterface_state_ ==
"running")
1033 PyObjectGuard pName(PyUnicode_FromString(
"do_stop_running"));
1035 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1037 0, captureStderrAndStdout_(
"do_stop_running"),
"do_stop_running");
1039 if(res.get() == NULL)
1041 std::string err = capturePyErr();
1042 __SS__ <<
"Error calling DAQ Interface stop transition: " << err
1048 PyObjectGuard pName(PyUnicode_FromString(
"do_command"));
1049 PyObjectGuard pArg(PyUnicode_FromString(
"Shutdown"));
1051 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1053 0, captureStderrAndStdout_(
"do_command Shutdown"),
"do_command Shutdown");
1055 if(checkPythonError(res.get()))
1057 std::string err = capturePyErr(
"do_command Shutdown");
1058 __SS__ <<
"Error calling DAQ Interface halt transition: " << err << __E__;
1063 __SUP_COUT__ <<
"Status after halt: " << daqinterface_state_ << __E__;
1067 __SUP_COUT__ <<
"Halted." << __E__;
1068 set_thread_message_(
"Halted");
1070 catch(
const std::runtime_error& e)
1072 const std::string transitionName =
"Halting";
1074 if(theStateMachine_.getProvenanceStateName() ==
1075 RunControlStateMachine::FAILED_STATE_NAME ||
1076 theStateMachine_.getProvenanceStateName() ==
1077 RunControlStateMachine::HALTED_STATE_NAME)
1079 __SUP_COUT_INFO__ <<
"Error was caught while halting (but ignoring because "
1080 "previous state was '"
1081 << RunControlStateMachine::FAILED_STATE_NAME
1082 <<
"'): " << e.what() << __E__;
1086 __SUP_SS__ <<
"Error was caught while " << transitionName <<
": " << e.what()
1088 __SUP_COUT_ERR__ <<
"\n" << ss.str();
1089 theStateMachine_.setErrorMessage(ss.str());
1090 throw toolbox::fsm::exception::Exception(
1091 "Transition Error" ,
1093 "ARTDAQSupervisorBase::transition" + transitionName ,
1101 const std::string transitionName =
"Halting";
1103 if(theStateMachine_.getProvenanceStateName() ==
1104 RunControlStateMachine::FAILED_STATE_NAME ||
1105 theStateMachine_.getProvenanceStateName() ==
1106 RunControlStateMachine::HALTED_STATE_NAME)
1108 __SUP_COUT_INFO__ <<
"Unknown error was caught while halting (but ignoring "
1109 "because previous state was '"
1110 << RunControlStateMachine::FAILED_STATE_NAME <<
"')." << __E__;
1114 __SUP_SS__ <<
"Unknown error was caught while " << transitionName
1115 <<
". Please checked the logs." << __E__;
1116 __SUP_COUT_ERR__ <<
"\n" << ss.str();
1117 theStateMachine_.setErrorMessage(ss.str());
1119 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1121 throw toolbox::fsm::exception::Exception(
1122 "Transition Error" ,
1124 "ARTDAQSupervisorBase::transition" + transitionName ,
1135 set_thread_message_(
"Initializing");
1136 __SUP_COUT__ <<
"Initializing..." << __E__;
1138 __SUP_COUT__ <<
"Initialized." << __E__;
1139 set_thread_message_(
"Initialized");
1141 catch(
const std::runtime_error& e)
1143 __SS__ <<
"Error was caught while Initializing: " << e.what() << __E__;
1148 __SS__ <<
"Unknown error was caught while Initializing. Please checked the logs."
1150 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1155 void ARTDAQSupervisor::transitionPausing(toolbox::Event::Reference )
1158 set_thread_message_(
"Pausing");
1159 __SUP_COUT__ <<
"Pausing..." << __E__;
1160 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1163 __SUP_COUT__ <<
"Status before pause: " << daqinterface_state_ << __E__;
1165 PyObjectGuard pName(PyUnicode_FromString(
"do_command"));
1166 PyObjectGuard pArg(PyUnicode_FromString(
"Pause"));
1168 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1170 0, captureStderrAndStdout_(
"do_command Pause"),
"do_command Pause");
1172 if(checkPythonError(res.get()))
1174 std::string err = capturePyErr(
"do_command Pause");
1175 __SS__ <<
"Error calling DAQ Interface Pause transition: " << err << __E__;
1180 __SUP_COUT__ <<
"Status after pause: " << daqinterface_state_ << __E__;
1182 __SUP_COUT__ <<
"Paused." << __E__;
1183 set_thread_message_(
"Paused");
1185 catch(
const std::runtime_error& e)
1187 __SS__ <<
"Error was caught while Pausing: " << e.what() << __E__;
1192 __SS__ <<
"Unknown error was caught while Pausing. Please checked the logs." << __E__;
1193 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1198 void ARTDAQSupervisor::transitionResuming(toolbox::Event::Reference )
1201 set_thread_message_(
"Resuming");
1202 __SUP_COUT__ <<
"Resuming..." << __E__;
1203 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1206 __SUP_COUT__ <<
"Status before resume: " << daqinterface_state_ << __E__;
1207 PyObjectGuard pName(PyUnicode_FromString(
"do_command"));
1208 PyObjectGuard pArg(PyUnicode_FromString(
"Resume"));
1210 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1212 0, captureStderrAndStdout_(
"do_command Resume"),
"do_command Resume");
1214 if(checkPythonError(res.get()))
1216 std::string err = capturePyErr(
"do_command Resume");
1217 __SS__ <<
"Error calling DAQ Interface Resume transition: " << err << __E__;
1222 __SUP_COUT__ <<
"Status after resume: " << daqinterface_state_ << __E__;
1223 __SUP_COUT__ <<
"Resumed." << __E__;
1224 set_thread_message_(
"Resumed");
1226 catch(
const std::runtime_error& e)
1228 __SS__ <<
"Error was caught while Resuming: " << e.what() << __E__;
1233 __SS__ <<
"Unknown error was caught while Resuming. Please checked the logs."
1235 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1240 void ARTDAQSupervisor::transitionStarting(toolbox::Event::Reference )
1243 __SUP_COUT__ <<
"transitionStarting" << __E__;
1246 if(RunControlStateMachine::getIterationIndex() == 0 &&
1247 RunControlStateMachine::getSubIterationIndex() == 0)
1249 thread_error_message_ =
"";
1250 thread_progress_bar_.resetProgressBar(0);
1251 last_thread_progress_update_ = time(0);
1254 std::thread(&ARTDAQSupervisor::startingThread,
this).detach();
1256 __SUP_COUT_INFO__ <<
"Starting thread started." << __E__;
1258 RunControlStateMachine::
1259 indicateIterationWork();
1263 std::string errorMessage;
1265 std::lock_guard<std::mutex> lock(
1267 errorMessage = thread_error_message_;
1269 int progress = thread_progress_bar_.
read();
1270 __SUP_COUTV__(errorMessage);
1271 __SUP_COUTV__(progress);
1272 __SUP_COUTV__(thread_progress_bar_.
isComplete());
1275 if(errorMessage ==
"" &&
1276 time(0) - last_thread_progress_update_ > 600)
1278 __SUP_SS__ <<
"There has been no update from the start thread for "
1279 << (time(0) - last_thread_progress_update_)
1280 <<
" seconds, assuming something is wrong and giving up! "
1281 <<
"Last progress received was " << progress << __E__;
1282 errorMessage = ss.str();
1285 if(errorMessage !=
"")
1287 __SUP_SS__ <<
"Error was caught in starting thread: " << errorMessage
1289 __SUP_COUT_ERR__ <<
"\n" << ss.str();
1291 theStateMachine_.setErrorMessage(ss.str());
1292 throw toolbox::fsm::exception::Exception(
1293 "Transition Error" ,
1295 "CoreSupervisorBase::transitionStarting" ,
1303 __SUP_COUT__ <<
"Not done yet..." << __E__;
1307 RunControlStateMachine::
1308 indicateIterationWork();
1310 if(last_thread_progress_read_ != progress)
1312 last_thread_progress_read_ = progress;
1313 last_thread_progress_update_ = time(0);
1320 __SUP_COUT_INFO__ <<
"Starting transition completed!" << __E__;
1321 __SUP_COUTV__(getProcessInfo_());
1328 catch(
const std::runtime_error& e)
1330 __SS__ <<
"Error was caught while Starting: " << e.what() << __E__;
1335 __SS__ <<
"Unknown error was caught while Starting. Please checked the logs."
1337 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1342 void ARTDAQSupervisor::startingThread()
1345 std::string uid = theConfigurationManager_
1346 ->
getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
1347 "/" + CorePropertySupervisorBase::getSupervisorUID() +
1348 "/" +
"LinkToSupervisorTable")
1351 __COUT__ <<
"Supervisor uid is " << uid <<
", getting supervisor table node" << __E__;
1352 const std::string mfSubject_ = supervisorClassNoNamespace_ +
"-" + uid;
1353 __GEN_COUT__ <<
"Starting..." << __E__;
1354 set_thread_message_(
"Starting");
1356 thread_progress_bar_.
step();
1359 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1361 __GEN_COUT__ <<
"Status before start: " << daqinterface_state_ << __E__;
1362 auto runNumber = SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
1364 .getValue(
"RunNumber");
1366 thread_progress_bar_.
step();
1368 __GEN_COUT_INFO__ <<
"Calling do_start_running" << __E__;
1369 PyObjectGuard pName(PyUnicode_FromString(
"do_start_running"));
1370 int run_number = std::stoi(runNumber);
1371 PyObjectGuard pStateArgs(PyLong_FromLong(run_number));
1372 PyObjectGuard res(PyObject_CallMethodObjArgs(
1373 daqinterface_ptr_, pName.get(), pStateArgs.get(), NULL));
1375 0, captureStderrAndStdout_(
"do_start_running"),
"do_start_running");
1377 thread_progress_bar_.
step();
1379 if(res.get() == NULL)
1381 std::string err = capturePyErr();
1382 __SS__ <<
"Error calling start transition: " << err << __E__;
1387 thread_progress_bar_.
step();
1389 __GEN_COUT__ <<
"Status after start: " << daqinterface_state_ << __E__;
1390 if(daqinterface_state_ !=
"running")
1392 __SS__ <<
"DAQInterface start transition failed!" << __E__;
1396 thread_progress_bar_.
step();
1399 set_thread_message_(
"Started");
1400 thread_progress_bar_.
step();
1402 __GEN_COUT_INFO__ <<
"Started." << __E__;
1406 catch(
const std::runtime_error& e)
1408 __SS__ <<
"Error was caught while Starting: " << e.what() << __E__;
1409 __COUT_ERR__ <<
"\n" << ss.str();
1410 std::lock_guard<std::mutex> lock(thread_mutex_);
1411 thread_error_message_ = ss.str();
1415 __SS__ <<
"Unknown error was caught while Starting. Please checked the logs."
1417 __COUT_ERR__ <<
"\n" << ss.str();
1419 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1421 std::lock_guard<std::mutex> lock(thread_mutex_);
1422 thread_error_message_ = ss.str();
1426 void ARTDAQSupervisor::transitionStopping(toolbox::Event::Reference )
1429 __SUP_COUT__ <<
"Stopping..." << __E__;
1430 set_thread_message_(
"Stopping");
1431 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1433 __SUP_COUT__ <<
"Status before stop: " << daqinterface_state_ << __E__;
1434 PyObjectGuard pName(PyUnicode_FromString(
"do_stop_running"));
1435 PyObjectGuard res(PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1436 __COUT_MULTI_LBL__(0, captureStderrAndStdout_(
"do_stop_running"),
"do_stop_running");
1438 if(checkPythonError(res.get()))
1440 std::string err = capturePyErr(
"do_stop_running");
1441 __SS__ <<
"Error calling DAQ Interface stop transition: " << err << __E__;
1445 __SUP_COUT__ <<
"Status after stop: " << daqinterface_state_ << __E__;
1446 __SUP_COUT__ <<
"Stopped." << __E__;
1447 set_thread_message_(
"Stopped");
1449 catch(
const std::runtime_error& e)
1451 __SS__ <<
"Error was caught while Stopping: " << e.what() << __E__;
1456 __SS__ <<
"Unknown error was caught while Stopping. Please checked the logs."
1458 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1463 void ots::ARTDAQSupervisor::enteringError(toolbox::Event::Reference )
1465 __SUP_COUT__ <<
"Entering error recovery state" << __E__;
1466 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1468 __SUP_COUT__ <<
"Status before error: " << daqinterface_state_ << __E__;
1470 PyObjectGuard pName(PyUnicode_FromString(
"do_recover"));
1471 PyObjectGuard res(PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1472 __COUT_MULTI_LBL__(0, captureStderrAndStdout_(
"do_recover"),
"do_recover");
1474 if(checkPythonError(res.get()))
1476 std::string err = capturePyErr(
"do_recover");
1478 __SUP_COUT_WARN__ <<
"Error calling DAQ Interface recover transition: " << err
1484 __SUP_COUT__ <<
"Status after error: " << daqinterface_state_ << __E__;
1485 __SUP_COUT__ <<
"EnteringError DONE." << __E__;
1489 std::vector<SupervisorInfo::SubappInfo> ots::ARTDAQSupervisor::getSubappInfo(
void)
1491 auto apps = getAndParseProcessInfo_();
1493 std::map<int, SupervisorInfo::SubappInfo> subapp_infos;
1494 for(
auto& app : apps)
1498 info.
name = app.label;
1499 info.detail =
"Rank " + std::to_string(app.rank) +
", subsystem " +
1500 std::to_string(app.subsystem);
1501 info.lastStatusTime = time(0);
1502 info.progress = 100;
1503 info.status = artdaqStateToOtsState(app.state);
1504 info.url =
"http://" + app.host +
":" + std::to_string(app.port) +
"/RPC2";
1505 info.class_name =
"ARTDAQ " + labelToProcType_(app.label);
1507 subapp_infos[app.rank] = info;
1510 std::vector<SupervisorInfo::SubappInfo> output;
1511 for(
auto& [rank, info] : subapp_infos)
1513 output.push_back(info);
1522 bool ots::ARTDAQSupervisor::checkPythonError(PyObject* result)
1524 if(result == NULL || PyErr_Occurred())
1535 std::string ots::ARTDAQSupervisor::capturePyErr(std::string label )
1537 std::string err_msg =
"Unknown Python Error";
1538 PyObject * pType, *pValue, *pTraceback;
1539 PyErr_Fetch(&pType, &pValue, &pTraceback);
1540 PyErr_NormalizeException(&pType, &pValue, &pTraceback);
1545 PyObjectGuard traceback_module(PyImport_ImportModule(
"traceback"));
1546 if(traceback_module.get() != NULL)
1548 PyObjectGuard format_exception(
1549 PyObject_GetAttrString(traceback_module.get(),
"format_exception"));
1550 if(format_exception.get() != NULL)
1552 PyObjectGuard formatted(
1553 PyObject_CallFunctionObjArgs(format_exception.get(),
1555 pValue ? pValue : Py_None,
1556 pTraceback ? pTraceback : Py_None,
1558 if(formatted.get() != NULL)
1561 PyObjectGuard empty_string(PyUnicode_FromString(
""));
1562 PyObjectGuard joined(
1563 PyUnicode_Join(empty_string.get(), formatted.get()));
1564 if(joined.get() != NULL)
1566 const char* traceback_cstr = PyUnicode_AsUTF8(joined.get());
1568 err_msg = traceback_cstr;
1575 if(err_msg ==
"Unknown Python Error" && pValue != NULL)
1577 PyObjectGuard pStr(PyObject_Str(pValue));
1578 if(pStr.get() != NULL)
1580 const char* error_cstr = PyUnicode_AsUTF8(pStr.get());
1582 err_msg = error_cstr;
1589 Py_XDECREF(pTraceback);
1593 err_msg = label +
":\n" + err_msg;
1599 std::string ots::ARTDAQSupervisor::captureStderrAndStdout_(std::string label )
1604 if(PyErr_Occurred())
1609 std::string outString =
"";
1610 PyObjectGuard out(PyObject_CallMethod(stringIO_out_,
"getvalue", NULL));
1612 if(checkPythonError(out.get()))
1615 capturePyErr(
"captureStderrAndStdout getvalue");
1619 const char* text = PyUnicode_AsUTF8(out.get());
1621 return text ? text :
"";
1624 void ots::ARTDAQSupervisor::getDAQState_()
1626 __SUP_COUTS__(50) <<
"Getting DAQInterface python lock" << __E__;
1627 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1628 __SUP_COUTS__(50) <<
"Have DAQInterface python lock" << __E__;
1630 if(daqinterface_ptr_ == NULL)
1632 daqinterface_state_ =
"";
1633 __SUP_COUT_WARN__ <<
"daqinterface_ptr_ is not initialized!" << __E__;
1638 PyObjectGuard pName(PyUnicode_FromString(
"state"));
1639 PyObjectGuard pArg(PyUnicode_FromString(
"DAQInterface"));
1650 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), pArg.get(), NULL));
1652 if(checkPythonError(res.get()))
1657 std::string err_msg = capturePyErr(
"state");
1659 std::ostringstream ss;
1660 ss <<
"Attempt n " << tries
1661 <<
". Error calling 'state'. Python Exception: " << err_msg;
1665 __COUT_ERR__ << ss.str() << __E__;
1666 daqinterface_state_ =
"ERROR";
1670 __COUT__ << ss.str() << __E__;
1679 PyObjectGuard strRes(PyObject_Str(res.get()));
1682 daqinterface_state_ = std::string(PyUnicode_AsUTF8(strRes.get()));
1687 daqinterface_state_ =
"UNKNOWN";
1690 __SUP_COUTS__(20) <<
"getDAQState_ state=" << daqinterface_state_ << __E__;
1698 std::string ots::ARTDAQSupervisor::getProcessInfo_(
void)
1700 __SUP_COUTS__(50) <<
"Getting DAQInterface state lock" << __E__;
1701 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1702 __SUP_COUTS__(50) <<
"Have DAQInterface state lock" << __E__;
1704 if(daqinterface_ptr_ ==
nullptr)
1709 PyObjectGuard pName(PyUnicode_FromString(
"artdaq_process_info"));
1710 PyObjectGuard pArg(PyUnicode_FromString(
"DAQInterface"));
1711 PyObjectGuard pArg2(PyBool_FromLong(
true));
1712 PyObjectGuard res(PyObject_CallMethodObjArgs(
1713 daqinterface_ptr_, pName.get(), pArg.get(), pArg2.get(), NULL));
1715 if(checkPythonError(res.get()))
1717 std::string err = capturePyErr(
"artdaq_process_info");
1718 __SS__ <<
"Error calling artdaq_process_info function: " << err << __E__;
1723 std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1724 daqinterface_status_ = std::string(PyUnicode_AsUTF8(res.get()));
1725 return daqinterface_status_;
1728 std::string ots::ARTDAQSupervisor::artdaqStateToOtsState(std::string state)
1730 if(state ==
"nonexistent" || state ==
"nonexistant")
1731 return RunControlStateMachine::INITIAL_STATE_NAME;
1732 if(state ==
"Ready")
1733 return "Configured";
1734 if(state ==
"Running")
1735 return RunControlStateMachine::RUNNING_STATE_NAME;
1736 if(state ==
"Paused")
1737 return RunControlStateMachine::PAUSED_STATE_NAME;
1738 if(state ==
"Stopped")
1739 return RunControlStateMachine::HALTED_STATE_NAME;
1741 TLOG(TLVL_WARNING) <<
"Unrecognized state name " << state;
1742 return RunControlStateMachine::FAILED_STATE_NAME;
1745 std::string ots::ARTDAQSupervisor::labelToProcType_(std::string label)
1747 if(label_to_proc_type_map_.count(label))
1749 return label_to_proc_type_map_[label];
1756 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo>
1757 ots::ARTDAQSupervisor::getAndParseProcessInfo_()
1759 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> output;
1764 std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1768 __COUTS__(50) <<
"Do not have python lock." << __E__;
1769 std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1770 info = daqinterface_status_;
1774 __COUTS__(50) <<
"Have python lock!" << __E__;
1775 info = getProcessInfo_();
1777 __COUTVS__(20, info);
1779 auto procs = tokenize_(info);
1788 std::regex re(
"(.*?) at ([^:]*):(\\d+) \\(subsystem (\\d+), rank (\\d+)\\): (.*)");
1790 for(
auto& proc : procs)
1793 if(std::regex_match(proc, match, re))
1795 DAQInterfaceProcessInfo info;
1797 info.label = match[1];
1798 info.host = match[2];
1799 info.port = std::stoi(match[3]);
1800 info.subsystem = std::stoi(match[4]);
1801 info.rank = std::stoi(match[5]);
1802 info.state = match[6];
1804 output.push_back(info);
1812 std::unique_ptr<artdaq::CommanderInterface>>>
1813 ots::ARTDAQSupervisor::makeCommandersFromProcessInfo()
1816 std::pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>>
1818 auto infos = getAndParseProcessInfo_();
1820 for(
auto& info : infos)
1822 artdaq::Commandable cm;
1823 fhicl::ParameterSet ps;
1825 ps.put<std::string>(
"commanderPluginType",
"xmlrpc");
1826 ps.put<
int>(
"id", info.port);
1827 ps.put<std::string>(
"server_url", info.host);
1829 output.emplace_back(std::make_pair<DAQInterfaceProcessInfo,
1830 std::unique_ptr<artdaq::CommanderInterface>>(
1831 std::move(info), artdaq::MakeCommanderPlugin(ps, cm)));
1838 std::list<std::string> ots::ARTDAQSupervisor::tokenize_(std::string
const& input)
1841 std::list<std::string> output;
1843 while(pos != std::string::npos && pos < input.size())
1845 auto newpos = input.find(
'\n', pos);
1846 if(newpos != std::string::npos)
1848 output.emplace_back(input, pos, newpos - pos);
1854 output.emplace_back(input, pos);
1863 void ots::ARTDAQSupervisor::daqinterfaceRunner_()
1866 TLOG(TLVL_TRACE) <<
"Runner thread starting";
1867 runner_running_ =
true;
1868 while(runner_running_)
1870 if(daqinterface_ptr_ != NULL)
1872 std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1874 std::string state_before = daqinterface_state_;
1876 __SUP_COUTS__(2) <<
"Runner state_before=" << state_before
1877 <<
" state now=" << daqinterface_state_
1878 <<
" ?= running, ready, or booted" << __E__;
1880 if(daqinterface_state_ ==
"running" || daqinterface_state_ ==
"ready" ||
1881 daqinterface_state_ ==
"booted")
1885 TLOG(TLVL_TRACE) <<
"Calling DAQInterface::check_proc_heartbeats";
1886 PyObjectGuard pName(PyUnicode_FromString(
"check_proc_heartbeats"));
1888 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName.get(), NULL));
1889 __COUT_MULTI_LBL__(1,
1890 captureStderrAndStdout_(
"check_proc_heartbeats"),
1891 "check_proc_heartbeats");
1893 <<
"Done with DAQInterface::check_proc_heartbeats call";
1895 if(res.get() == NULL)
1897 runner_running_ =
false;
1898 std::string err = capturePyErr(
"check_proc_heartbeats");
1899 __SS__ <<
"Error calling check_proc_heartbeats function: " << err
1905 catch(cet::exception& ex)
1907 runner_running_ =
false;
1908 std::string err = capturePyErr(
"check_proc_heartbeats");
1909 __SS__ <<
"An cet::exception occurred while calling "
1910 "check_proc_heartbeats function "
1911 << ex.explain_self() <<
": " << err << __E__;
1915 catch(std::exception& ex)
1917 runner_running_ =
false;
1918 std::string err = capturePyErr(
"check_proc_heartbeats");
1919 __SS__ <<
"An std::exception occurred while calling "
1920 "check_proc_heartbeats function: "
1921 << ex.what() <<
"\n\n"
1928 runner_running_ =
false;
1929 std::string err = capturePyErr(
"check_proc_heartbeats");
1930 __SS__ <<
"An unknown Error occurred while calling "
1931 "check_proc_heartbeats function: "
1939 if(daqinterface_state_ != state_before)
1941 runner_running_ =
false;
1943 __SS__ <<
"DAQInterface state unexpectedly changed from "
1944 << state_before <<
" to " << daqinterface_state_
1945 <<
". Check supervisor log file for more info!" << __E__;
1953 __SUP_COUT__ <<
"daqinterface_ptr_ is null" << __E__;
1958 runner_running_ =
false;
1959 TLOG(TLVL_TRACE) <<
"Runner thread complete";
1963 __SS__ <<
"An error occurred in "
1964 "start_runner_/daqinterfaceRunner_ thread "
1970 catch(
const std::runtime_error& e)
1972 ss <<
"Here is the error: " << e.what() << __E__;
1976 ss <<
"Unexpected error!" << __E__;
1978 __COUT_ERR__ << ss.str();
1981 std::lock_guard<std::mutex> lock(
1983 thread_error_message_ = ss.str();
1986 theStateMachine_.setErrorMessage(ss.str());
1988 sendAsyncExceptionToGateway(
1996 void ots::ARTDAQSupervisor::stop_runner_()
1998 runner_running_ =
false;
1999 if(runner_thread_ && runner_thread_->joinable())
2001 runner_thread_->join();
2002 runner_thread_.reset(
nullptr);
2007 void ots::ARTDAQSupervisor::start_runner_()
2011 std::make_unique<std::thread>(&ots::ARTDAQSupervisor::daqinterfaceRunner_,
this);
virtual void transitionHalting(toolbox::Event::Reference event) override
virtual void transitionInitializing(toolbox::Event::Reference event) override
static const std::string ARTDAQ_FCL_PATH
Tree-path rule is, if the last link in the path is a group link with a specified group ID,...
static std::string getBootFileContentFromInfo(const ARTDAQInfo &info, const std::string &setupScript, int debugLevel)
ConfigurationTree getNode(const std::string &nodeString, bool doNotThrowOnBrokenUIDLinks=false) const
"root/parent/parent/"
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
const std::string & getValueAsString(bool returnLinkTableValue=false) const
void getValue(T &value) const
ITRACEController * theTRACEController_
only define for an app that receives a command
bool isComplete()
get functions
int read()
if stepsToComplete==0, then define any progress as 50%, thread safe
void complete()
declare complete, thread safe
defines used also by OtsConfigurationWizardSupervisor
void INIT_MF(const char *name)
std::string name
Also key in map.