3 #define TRACEMF_USE_VERBATIM 1
4 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisor.hh"
6 #include "artdaq-core/Utilities/configureMessageFacility.hh"
7 #include "artdaq/BuildInfo/GetPackageBuildInfo.hh"
8 #include "artdaq/DAQdata/Globals.hh"
9 #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
10 #include "cetlib_except/exception.h"
11 #include "fhiclcpp/make_ParameterSet.h"
12 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisorTRACEController.h"
14 #include "artdaq-core/Utilities/ExceptionHandler.hh"
16 #include <boost/exception/all.hpp>
17 #include <boost/filesystem.hpp>
22 #define OUT_ON_ERR_SIZE 2000
28 #define FAKE_CONFIG_NAME "ots_config"
29 #define DAQINTERFACE_PORT \
30 std::atoi(__ENV__("ARTDAQ_BASE_PORT")) + \
31 (partition_ * std::atoi(__ENV__("ARTDAQ_PORTS_PER_PARTITION")))
34 static std::unordered_map<int, struct sigaction> old_actions =
35 std::unordered_map<int, struct sigaction>();
36 static bool sighandler_init =
false;
37 static void signal_handler(
int signum)
40 #if TRACE_REVNUM < 1459
41 TRACE_STREAMER(TLVL_ERROR, &(
"ARTDAQsupervisor")[0], 0, 0, 0)
43 TRACE_STREAMER(TLVL_ERROR, TLOG2(
"ARTDAQsupervisor", 0), 0)
45 <<
"A signal of type " << signum
46 <<
" was caught by ARTDAQSupervisor. Shutting down DAQInterface, "
47 "then proceeding with default handlers!";
53 pthread_sigmask(SIG_UNBLOCK, NULL, &set);
54 pthread_sigmask(SIG_UNBLOCK, &set, NULL);
56 #if TRACE_REVNUM < 1459
57 TRACE_STREAMER(TLVL_ERROR, &(
"ARTDAQsupervisor")[0], 0, 0, 0)
59 TRACE_STREAMER(TLVL_ERROR, TLOG2(
"ARTDAQsupervisor", 0), 0)
61 <<
"Calling default signal handler";
64 sigaction(signum, &old_actions[signum], NULL);
65 kill(getpid(), signum);
71 sigaction(SIGINT, &old_actions[SIGINT], NULL);
72 kill(getpid(), SIGINT);
78 static std::mutex sighandler_mutex;
79 std::unique_lock<std::mutex> lk(sighandler_mutex);
83 sighandler_init =
true;
85 std::vector<int> signals = {
96 for(
auto signal : signals)
98 struct sigaction old_action;
99 sigaction(signal, NULL, &old_action);
103 if(old_action.sa_handler != SIG_IGN)
105 struct sigaction action;
106 action.sa_handler = signal_handler;
107 sigemptyset(&action.sa_mask);
108 for(
auto sigblk : signals)
110 sigaddset(&action.sa_mask, sigblk);
116 sigaction(signal, &action, NULL);
117 old_actions[signal] = old_action;
124 ARTDAQSupervisor::ARTDAQSupervisor(xdaq::ApplicationStub* stub)
126 , daqinterface_ptr_(NULL)
127 , partition_(getSupervisorProperty(
"partition", 0))
128 , daqinterface_state_(
"notrunning")
129 , runner_thread_(nullptr)
131 __SUP_COUT__ <<
"Constructor." << __E__;
134 init_sighandler(
this);
141 auto settings_file = __ENV__(
"DAQINTERFACE_SETTINGS");
142 std::ofstream o(settings_file, std::ios::trunc);
144 setenv(
"DAQINTERFACE_PARTITION_NUMBER", std::to_string(partition_).c_str(), 1);
145 auto logfileName = std::string(__ENV__(
"OTSDAQ_LOG_DIR")) +
146 "/DAQInteface/DAQInterface_partition" +
147 std::to_string(partition_) +
".log";
148 setenv(
"DAQINTERFACE_LOGFILE", logfileName.c_str(), 1);
150 o <<
"log_directory: "
151 << getSupervisorProperty(
"log_directory", std::string(__ENV__(
"OTSDAQ_LOG_DIR")))
155 const std::string record_directory = getSupervisorProperty(
156 "record_directory", ARTDAQTableBase::ARTDAQ_FCL_PATH +
"/run_records/");
157 mkdir(record_directory.c_str(), 0755);
158 o <<
"record_directory: " << record_directory << std::endl;
161 o <<
"package_hashes_to_save: "
162 << getSupervisorProperty(
"package_hashes_to_save",
"[artdaq]") << std::endl;
164 o <<
"spack_root_for_bash_scripts: "
165 << getSupervisorProperty(
"spack_root_for_bash_scripts",
166 std::string(__ENV__(
"SPACK_ROOT")))
168 o <<
"boardreader timeout: " << getSupervisorProperty(
"boardreader_timeout", 30)
170 o <<
"eventbuilder timeout: " << getSupervisorProperty(
"eventbuilder_timeout", 30)
172 o <<
"datalogger timeout: " << getSupervisorProperty(
"datalogger_timeout", 30)
174 o <<
"dispatcher timeout: " << getSupervisorProperty(
"dispatcher_timeout", 30)
177 if(!getSupervisorProperty(
"advanced_memory_usage",
false))
179 o <<
"max_fragment_size_bytes: "
180 << getSupervisorProperty(
"max_fragment_size_bytes", 1048576) << std::endl;
182 o <<
"transfer_plugin_to_use: "
183 << getSupervisorProperty(
"transfer_plugin_to_use",
"TCPSocket") << std::endl;
184 if(getSupervisorProperty(
"transfer_plugin_from_brs",
"") !=
"")
186 o <<
"transfer_plugin_from_brs: "
187 << getSupervisorProperty(
"transfer_plugin_from_brs",
"") << std::endl;
189 if(getSupervisorProperty(
"transfer_plugin_from_ebs",
"") !=
"")
191 o <<
"transfer_plugin_from_ebs: "
192 << getSupervisorProperty(
"transfer_plugin_from_ebs",
"") << std::endl;
194 if(getSupervisorProperty(
"transfer_plugin_from_dls",
"") !=
"")
196 o <<
"transfer_plugin_from_dls: "
197 << getSupervisorProperty(
"transfer_plugin_from_dls",
"") << std::endl;
199 o <<
"all_events_to_all_dispatchers: " << std::boolalpha
200 << getSupervisorProperty(
"all_events_to_all_dispatchers",
true) << std::endl;
201 if(getSupervisorProperty(
"data_directory_override",
"") !=
"")
203 o <<
"data_directory_override: "
204 << getSupervisorProperty(
"data_directory_override",
"") << std::endl;
206 o <<
"max_configurations_to_list: "
207 << getSupervisorProperty(
"max_configurations_to_list", 10) << std::endl;
208 o <<
"disable_unique_rootfile_labels: "
209 << getSupervisorProperty(
"disable_unique_rootfile_labels",
false) << std::endl;
210 o <<
"use_messageviewer: " << std::boolalpha
211 << getSupervisorProperty(
"use_messageviewer",
false) << std::endl;
212 o <<
"use_messagefacility: " << std::boolalpha
213 << getSupervisorProperty(
"use_messagefacility",
true) << std::endl;
214 o <<
"fake_messagefacility: " << std::boolalpha
215 << getSupervisorProperty(
"fake_messagefacility",
false) << std::endl;
216 o <<
"kill_existing_processes: " << std::boolalpha
217 << getSupervisorProperty(
"kill_existing_processes",
true) << std::endl;
218 o <<
"advanced_memory_usage: " << std::boolalpha
219 << getSupervisorProperty(
"advanced_memory_usage",
false) << std::endl;
220 o <<
"strict_fragment_id_mode: " << std::boolalpha
221 << getSupervisorProperty(
"strict_fragment_id_mode",
false) << std::endl;
222 o <<
"disable_private_network_bookkeeping: " << std::boolalpha
223 << getSupervisorProperty(
"disable_private_network_bookkeeping",
false) << std::endl;
224 o <<
"allowed_processors: " << getSupervisorProperty(
"allowed_processors",
"0-255")
231 if(CorePropertySupervisorBase::theTRACEController_)
233 __SUP_COUT__ <<
"Destroying TRACE Controller..." << __E__;
234 delete CorePropertySupervisorBase::
236 CorePropertySupervisorBase::theTRACEController_ =
nullptr;
238 CorePropertySupervisorBase::theTRACEController_ =
241 ->setSupervisorPtr(
this);
243 __SUP_COUT__ <<
"Constructed." << __E__;
247 ARTDAQSupervisor::~ARTDAQSupervisor(
void)
249 __SUP_COUT__ <<
"Destructor." << __E__;
251 __SUP_COUT__ <<
"Destructed." << __E__;
255 void ARTDAQSupervisor::destroy(
void)
257 __SUP_COUT__ <<
"Destroying..." << __E__;
259 if(daqinterface_ptr_ != NULL)
261 __SUP_COUT__ <<
"Calling recover transition" << __E__;
262 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
263 PyObject* pName = PyUnicode_FromString(
"do_recover");
264 PyObject_CallMethodObjArgs(
265 daqinterface_ptr_, pName, NULL);
267 __SUP_COUT__ <<
"Making sure that correct state has been reached" << __E__;
269 while(daqinterface_state_ !=
"stopped")
272 __SUP_COUT__ <<
"State is " << daqinterface_state_
273 <<
", waiting 1s and retrying..." << __E__;
278 Py_XDECREF(daqinterface_ptr_);
288 daqinterface_ptr_ = NULL;
291 __SUP_COUT__ <<
"Flusing printouts" << __E__;
294 PyRun_SimpleString(R
"(
296 sys.stdout = sys.__stdout__
297 sys.stderr = sys.__stderr__
299 Py_XDECREF(stringIO_out);
300 Py_XDECREF(stringIO_err);
302 __SUP_COUT__ << "Thread and garbage cleanup" << __E__;
305 "import threading; [t.join() for t in threading.enumerate() if t is not "
306 "threading.main_thread()]");
307 PyRun_SimpleString(
"import gc; gc.collect()");
313 __SUP_COUT__ <<
"Destroying TRACE Controller..." << __E__;
318 __SUP_COUT__ <<
"Destroyed." << __E__;
322 void ARTDAQSupervisor::init(
void)
326 __SUP_COUT__ <<
"Initializing..." << __E__;
328 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
331 artdaq::configureMessageFacility(
"ARTDAQSupervisor");
332 __SUP_COUT__ <<
"artdaq MF configured." << __E__;
335 char* daqinterface_dir = getenv(
"ARTDAQ_DAQINTERFACE_DIR");
336 if(daqinterface_dir == NULL)
338 __SS__ <<
"ARTDAQ_DAQINTERFACE_DIR environment variable not set! This "
339 "means that DAQInterface has not been setup!"
345 __SUP_COUT__ <<
"Initializing Python" << __E__;
351 "from io import StringIO\n"
354 " def __init__(self, real, buf):\n"
355 " self.real = real\n"
357 " def write(self, data):\n"
358 " self.real.write(data)\n"
359 " self.buf.write(data)\n"
360 " def flush(self):\n"
361 " self.real.flush()\n"
362 " self.buf.flush()\n"
364 "tee_buffer = StringIO()\n"
365 "sys.stdout = TeeOut(sys.stdout, tee_buffer)\n"
366 "sys.stderr = TeeOut(sys.stderr, tee_buffer)\n");
368 __SUP_COUT__ <<
"Adding DAQInterface directory to PYTHON_PATH" << __E__;
369 PyObject* sysPath = PySys_GetObject((
char*)
"path");
370 PyObject* programName = PyUnicode_FromString(daqinterface_dir);
371 PyList_Append(sysPath, programName);
372 Py_DECREF(programName);
374 __SUP_COUT__ <<
"Creating Module name" << __E__;
375 PyObject* pName = PyUnicode_FromString(
"rc.control.daqinterface");
378 __SUP_COUT__ <<
"Importing module" << __E__;
379 PyObject* pModule = PyImport_Import(pName);
385 __SS__ <<
"Failed to load rc.control.daqinterface" << __E__;
390 __SUP_COUT__ <<
"Loading python module dictionary" << __E__;
391 PyObject* pDict = PyModule_GetDict(pModule);
395 __SS__ <<
"Unable to load module dictionary" << __E__;
402 __SUP_COUT__ <<
"Getting DAQInterface object pointer" << __E__;
403 PyObject* di_obj_ptr = PyDict_GetItemString(pDict,
"DAQInterface");
405 __SUP_COUT__ <<
"Filling out DAQInterface args struct" << __E__;
406 PyObject* pArgs = PyTuple_New(0);
408 PyObject* kwargs = Py_BuildValue(
"{s:s, s:s, s:i, s:i, s:s, s:s}",
422 __SUP_COUT__ <<
"Calling DAQInterface Object Constructor" << __E__;
425 PyObject* sys = PyImport_ImportModule(
"sys");
426 PyObject* io = PyImport_ImportModule(
"io");
433 stringIO_out = PyObject_CallMethod(io,
"StringIO", NULL);
434 stringIO_err = PyObject_CallMethod(io,
"StringIO", NULL);
441 PyObject_SetAttrString(sys,
"stdout", stringIO_out);
442 PyObject_SetAttrString(sys,
"stderr", stringIO_err);
448 PyImport_AddModule(
"__main__");
449 PyObject* globals = PyModule_GetDict(mainmod);
452 PyDict_GetItemString(globals,
"tee_buffer");
456 daqinterface_ptr_ = PyObject_Call(di_obj_ptr, pArgs, kwargs);
461 PyObject* bad = PyObject_CallMethod(sys,
"does_not_exist", NULL);
467 PyObject_CallMethod(stringIO_err,
"getvalue", NULL);
469 __COUT__ <<
"Captured stderr:\n"
470 << PyUnicode_AsUTF8(err_text) <<
"\n";
472 __COUT__ <<
"Capture of stderr failed.";
476 Py_DECREF(di_obj_ptr);
507 __SUP_COUT__ <<
"Initialized." << __E__;
511 void ARTDAQSupervisor::transitionConfiguring(toolbox::Event::Reference )
513 __SUP_COUT__ <<
"transitionConfiguring" << __E__;
516 if(RunControlStateMachine::getIterationIndex() == 0 &&
517 RunControlStateMachine::getSubIterationIndex() == 0)
519 thread_error_message_ =
"";
520 thread_progress_bar_.resetProgressBar(0);
521 last_thread_progress_update_ = time(0);
524 SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
526 .getValue(
"ConfigurationTableGroupName"),
527 TableGroupKey(SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
529 .getValue(
"ConfigurationTableGroupKey")));
531 __SUP_COUT__ <<
"Configuration table group name: " << theGroup.first
532 <<
" key: " << theGroup.second << __E__;
550 ConfigurationManager::LoadGroupType::ALL_TYPES,
553 catch(
const std::runtime_error& e)
555 __SS__ <<
"Error loading table group '" << theGroup.first <<
"("
556 << theGroup.second <<
")! \n"
557 << e.what() << __E__;
558 __SUP_COUT_ERR__ << ss.str();
562 theStateMachine_.setErrorMessage(ss.str());
563 throw toolbox::fsm::exception::Exception(
566 "ARTDAQSupervisor::transitionConfiguring" ,
573 __SS__ <<
"Unknown error loading table group '" << theGroup.first <<
"("
574 << theGroup.second <<
")!" << __E__;
575 __SUP_COUT_ERR__ << ss.str();
579 theStateMachine_.setErrorMessage(ss.str());
580 throw toolbox::fsm::exception::Exception(
583 "ARTDAQSupervisor::transitionConfiguring" ,
590 std::thread(&ARTDAQSupervisor::configuringThread,
this).detach();
592 __SUP_COUT__ <<
"Configuring thread started." << __E__;
594 RunControlStateMachine::
595 indicateIterationWork();
599 std::string errorMessage;
601 std::lock_guard<std::mutex> lock(
603 errorMessage = thread_error_message_;
605 int progress = thread_progress_bar_.
read();
606 __SUP_COUTVS__(2, errorMessage);
607 __SUP_COUTVS__(2, progress);
608 __SUP_COUTVS__(2, thread_progress_bar_.
isComplete());
611 if(errorMessage ==
"" &&
612 time(0) - last_thread_progress_update_ > 600)
614 __SUP_SS__ <<
"There has been no update from the configuration thread for "
615 << (time(0) - last_thread_progress_update_)
616 <<
" seconds, assuming something is wrong and giving up! "
617 <<
"Last progress received was " << progress << __E__;
618 errorMessage = ss.str();
621 if(errorMessage !=
"")
623 __SUP_SS__ <<
"Error was caught in configuring thread: " << errorMessage
625 __SUP_COUT_ERR__ <<
"\n" << ss.str();
627 theStateMachine_.setErrorMessage(ss.str());
628 throw toolbox::fsm::exception::Exception(
631 "CoreSupervisorBase::transitionConfiguring" ,
639 __SUP_COUT__ <<
"Not done yet..." << __E__;
643 RunControlStateMachine::
644 indicateIterationWork();
646 if(last_thread_progress_read_ != progress)
648 last_thread_progress_read_ = progress;
649 last_thread_progress_update_ = time(0);
656 __SUP_COUT_INFO__ <<
"Complete configuring transition!" << __E__;
657 __SUP_COUTV__(getProcessInfo_());
665 void ARTDAQSupervisor::configuringThread()
668 std::string uid = theConfigurationManager_
669 ->
getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
670 "/" + CorePropertySupervisorBase::getSupervisorUID() +
671 "/" +
"LinkToSupervisorTable")
674 __COUT__ <<
"Supervisor uid is " << uid <<
", getting supervisor table node" << __E__;
676 const std::string mfSubject_ = supervisorClassNoNamespace_ +
"-" + uid;
680 thread_progress_bar_.
step();
682 set_thread_message_(
"ConfigGen");
684 auto info = ARTDAQTableBase::extractARTDAQInfo(
688 getSupervisorProperty(
"max_fragment_size_bytes", 8888),
689 getSupervisorProperty(
"routing_timeout_ms", 1999),
690 getSupervisorProperty(
"routing_retry_count", 12),
691 &thread_progress_bar_);
694 if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::BoardReader) == 0)
696 __GEN_SS__ <<
"There must be at least one enabled BoardReader!" << __E__;
700 if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::EventBuilder) == 0)
702 __GEN_SS__ <<
"There must be at least one enabled EventBuilder!" << __E__;
707 thread_progress_bar_.
step();
708 set_thread_message_(
"Writing boot.txt");
710 __GEN_COUT__ <<
"Writing boot.txt" << __E__;
712 int debugLevel = theSupervisorNode.
getNode(
"DAQInterfaceDebugLevel").
getValue<
int>();
713 std::string setupScript = theSupervisorNode.
getNode(
"DAQSetupScript").
getValue();
716 o <<
"DAQ setup script: " << setupScript << std::endl;
717 o <<
"debug level: " << debugLevel << std::endl;
720 if(info.subsystems.size() > 1)
722 for(
auto& ss : info.subsystems)
726 o <<
"Subsystem id: " << ss.first << std::endl;
727 if(ss.second.destination != 0)
729 o <<
"Subsystem destination: " << ss.second.destination << std::endl;
731 for(
auto& sss : ss.second.sources)
733 o <<
"Subsystem source: " << sss << std::endl;
735 if(ss.second.eventMode)
737 o <<
"Subsystem fragmentMode: False" << std::endl;
743 for(
auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
745 o <<
"EventBuilder host: " << builder.hostname << std::endl;
746 o <<
"EventBuilder label: " << builder.label << std::endl;
747 label_to_proc_type_map_[builder.label] =
"EventBuilder";
748 if(builder.subsystem != 1)
750 o <<
"EventBuilder subsystem: " << builder.subsystem << std::endl;
752 if(builder.allowed_processors !=
"")
754 o <<
"EventBuilder allowed_processors: " << builder.allowed_processors
759 for(
auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
761 o <<
"DataLogger host: " << logger.hostname << std::endl;
762 o <<
"DataLogger label: " << logger.label << std::endl;
763 label_to_proc_type_map_[logger.label] =
"DataLogger";
764 if(logger.subsystem != 1)
766 o <<
"DataLogger subsystem: " << logger.subsystem << std::endl;
768 if(logger.allowed_processors !=
"")
770 o <<
"DataLogger allowed_processors: " << logger.allowed_processors
775 for(
auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
777 o <<
"Dispatcher host: " << dispatcher.hostname << std::endl;
778 o <<
"Dispatcher label: " << dispatcher.label << std::endl;
779 o <<
"Dispatcher port: " << dispatcher.port << std::endl;
780 label_to_proc_type_map_[dispatcher.label] =
"Dispatcher";
781 if(dispatcher.subsystem != 1)
783 o <<
"Dispatcher subsystem: " << dispatcher.subsystem << std::endl;
785 if(dispatcher.allowed_processors !=
"")
787 o <<
"Dispatcher allowed_processors: " << dispatcher.allowed_processors
792 for(
auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
794 o <<
"RoutingManager host: " << rmanager.hostname << std::endl;
795 o <<
"RoutingManager label: " << rmanager.label << std::endl;
796 label_to_proc_type_map_[rmanager.label] =
"RoutingManager";
797 if(rmanager.subsystem != 1)
799 o <<
"RoutingManager subsystem: " << rmanager.subsystem << std::endl;
801 if(rmanager.allowed_processors !=
"")
803 o <<
"RoutingManager allowed_processors: " << rmanager.allowed_processors
810 thread_progress_bar_.
step();
811 set_thread_message_(
"Writing Fhicl Files");
813 __GEN_COUT__ <<
"Building configuration directory" << __E__;
815 boost::system::error_code ignored;
820 for(
auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
822 symlink(ARTDAQTableBase::getFlatFHICLFilename(
823 ARTDAQTableBase::ARTDAQAppType::BoardReader, reader.label)
826 reader.label +
".fcl")
829 for(
auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
831 symlink(ARTDAQTableBase::getFlatFHICLFilename(
832 ARTDAQTableBase::ARTDAQAppType::EventBuilder, builder.label)
835 builder.label +
".fcl")
838 for(
auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
840 symlink(ARTDAQTableBase::getFlatFHICLFilename(
841 ARTDAQTableBase::ARTDAQAppType::DataLogger, logger.label)
844 logger.label +
".fcl")
847 for(
auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
849 symlink(ARTDAQTableBase::getFlatFHICLFilename(
850 ARTDAQTableBase::ARTDAQAppType::Dispatcher, dispatcher.label)
853 dispatcher.label +
".fcl")
856 for(
auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
858 symlink(ARTDAQTableBase::getFlatFHICLFilename(
859 ARTDAQTableBase::ARTDAQAppType::RoutingManager, rmanager.label)
862 rmanager.label +
".fcl")
866 thread_progress_bar_.
step();
868 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
870 if(daqinterface_state_ !=
"stopped" && daqinterface_state_ !=
"")
872 __GEN_SS__ <<
"Cannot configure DAQInterface because it is in the wrong state"
873 <<
" (" << daqinterface_state_ <<
" != stopped)!" << __E__;
877 set_thread_message_(
"Calling setdaqcomps");
878 __GEN_COUT__ <<
"Calling setdaqcomps" << __E__;
879 __GEN_COUT__ <<
"Status before setdaqcomps: " << daqinterface_state_ << __E__;
880 PyObject* pName1 = PyUnicode_FromString(
"setdaqcomps");
882 PyObject* readerDict = PyDict_New();
883 for(
auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
885 label_to_proc_type_map_[reader.label] =
"BoardReader";
886 PyObject* readerName = PyUnicode_FromString(reader.label.c_str());
888 int list_size = reader.allowed_processors !=
"" ? 4 : 3;
890 PyObject* readerData = PyList_New(list_size);
891 PyObject* readerHost = PyUnicode_FromString(reader.hostname.c_str());
892 PyObject* readerPort = PyUnicode_FromString(
"-1");
893 PyObject* readerSubsystem =
894 PyUnicode_FromString(std::to_string(reader.subsystem).c_str());
895 PyList_SetItem(readerData, 0, readerHost);
896 PyList_SetItem(readerData, 1, readerPort);
897 PyList_SetItem(readerData, 2, readerSubsystem);
898 if(reader.allowed_processors !=
"")
900 PyObject* readerAllowedProcessors =
901 PyUnicode_FromString(reader.allowed_processors.c_str());
902 PyList_SetItem(readerData, 3, readerAllowedProcessors);
904 PyDict_SetItem(readerDict, readerName, readerData);
907 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName1, readerDict, NULL);
908 __COUT_MULTI_LBL__(0, captureStderrAndStdout_(
"setdaqcomps"),
"setdaqcomps");
910 Py_DECREF(readerDict);
914 std::string err = capturePyErr(
"setdaqcomps");
915 __GEN_SS__ <<
"Error calling setdaqcomps transition: " << err << __E__;
920 __GEN_COUT__ <<
"Status after setdaqcomps: " << daqinterface_state_ << __E__;
922 thread_progress_bar_.
step();
923 set_thread_message_(
"Calling do_boot");
924 __GEN_COUT_INFO__ <<
"Calling do_boot" << __E__;
925 __GEN_COUT__ <<
"Status before boot: " << daqinterface_state_ << __E__;
926 PyObject* pName2 = PyUnicode_FromString(
"do_boot");
927 PyObject* pStateArgs1 =
930 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName2, pStateArgs1, NULL);
931 std::string doBootOutput = captureStderrAndStdout_(
"do_boot");
932 __COUT_MULTI_LBL__(0, doBootOutput,
"do_boot");
936 std::string err = capturePyErr();
937 __GEN_COUT_INFO__ <<
"Error on first boost attempt, recovering and retrying: "
940 PyObject* pName = PyUnicode_FromString(
"do_recover");
941 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
942 __COUT_MULTI_LBL__(0, captureStderrAndStdout_(
"do_recover"),
"do_recover");
946 std::string err = capturePyErr();
947 __GEN_SS__ <<
"Error calling recover transition!!!! " << err << __E__;
948 if(doBootOutput.size() > OUT_ON_ERR_SIZE)
949 ss <<
"... last " << OUT_ON_ERR_SIZE
950 <<
" characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
956 thread_progress_bar_.
step();
957 set_thread_message_(
"Calling do_boot (retry)");
958 __GEN_COUT_INFO__ <<
"Calling do_boot again" << __E__;
959 __GEN_COUT__ <<
"Status before boot: " << daqinterface_state_ << __E__;
961 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName2, pStateArgs1, NULL);
962 doBootOutput = captureStderrAndStdout_(
"do_boot (retry)");
963 __COUT_MULTI_LBL__(0, doBootOutput,
"do_boot (retry)");
967 std::string err = capturePyErr();
968 __GEN_SS__ <<
"Error calling boot transition (2nd try): " << err << __E__;
969 if(doBootOutput.size() > OUT_ON_ERR_SIZE)
970 ss <<
"... last " << OUT_ON_ERR_SIZE
971 <<
" characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
979 if(daqinterface_state_ !=
"booted")
981 std::cout <<
"Do boot output on error: \n" << doBootOutput << __E__;
982 __GEN_SS__ <<
"DAQInterface boot transition failed! "
983 <<
"Status after boot attempt: " << daqinterface_state_ << __E__;
985 if(doBootOutput.size() > OUT_ON_ERR_SIZE)
986 ss <<
"... last " << OUT_ON_ERR_SIZE
987 <<
" characters: " << doBootOutput.substr(doBootOutput.size() - 1000);
992 __GEN_COUT__ <<
"Status after boot: " << daqinterface_state_ << __E__;
994 thread_progress_bar_.
step();
995 set_thread_message_(
"Calling do_config");
996 __GEN_COUT_INFO__ <<
"Calling do_config" << __E__;
997 __GEN_COUT__ <<
"Status before config: " << daqinterface_state_ << __E__;
998 std::string doConfigOutput =
"";
1000 PyObject* pName3 = PyUnicode_FromString(
"do_config");
1001 PyObject* pStateArgs2 = Py_BuildValue(
"[s]", FAKE_CONFIG_NAME);
1003 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName3, pStateArgs2, NULL);
1004 doConfigOutput = captureStderrAndStdout_(
"do_config");
1005 __COUT_MULTI_LBL__(0, doConfigOutput,
"do_config");
1008 std::string err = capturePyErr(
"do_config");
1009 __GEN_SS__ <<
"Error calling config transition: " << err << __E__;
1012 const char* res_cstr = PyUnicode_AsUTF8(res3);
1013 __SUP_COUTT__ <<
"do_config result=" << (res_cstr ? res_cstr :
"") << __E__;
1017 if(daqinterface_state_ !=
"ready")
1019 __GEN_SS__ <<
"DAQInterface config transition failed!" << __E__
1020 <<
"Supervisor state: \"" << daqinterface_state_ <<
"\" != \"ready\" "
1022 auto doConfigOutput_recover_i =
1023 doConfigOutput.find(
"RECOVER transition underway");
1024 if(doConfigOutput_recover_i == std::string::npos)
1025 ss << doConfigOutput;
1026 else if(doConfigOutput_recover_i >
1028 ss <<
"... tail of " << OUT_ON_ERR_SIZE <<
" characters before recovery: "
1029 << doConfigOutput.substr(
1030 doConfigOutput_recover_i - OUT_ON_ERR_SIZE +
1031 std::string(
"RECOVER transition underway").size(),
1034 ss << doConfigOutput.substr(
1036 doConfigOutput_recover_i +
1037 std::string(
"RECOVER transition underway").size());
1040 __GEN_COUT__ <<
"Status after config: " << daqinterface_state_ << __E__;
1042 set_thread_message_(
"Configured");
1043 __GEN_COUT_INFO__ <<
"Configured." << __E__;
1046 catch(
const std::runtime_error& e)
1048 set_thread_message_(
"ERROR");
1049 __SS__ <<
"Error was caught while configuring: " << e.what() << __E__;
1050 __COUT_ERR__ <<
"\n" << ss.str();
1051 std::lock_guard<std::mutex> lock(thread_mutex_);
1052 thread_error_message_ = ss.str();
1056 set_thread_message_(
"ERROR");
1057 __SS__ <<
"Unknown error was caught while configuring. Please checked the logs."
1059 __COUT_ERR__ <<
"\n" << ss.str();
1061 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1063 std::lock_guard<std::mutex> lock(thread_mutex_);
1064 thread_error_message_ = ss.str();
1071 set_thread_message_(
"Halting");
1072 __SUP_COUT__ <<
"Halting..." << __E__;
1077 std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1081 __COUTS__(50) <<
"Do not have python lock for halt. tries=" << tries << __E__;
1085 __COUTS__(50) <<
"Have python lock!" << __E__;
1089 __SUP_COUT__ <<
"Status before halt: " << daqinterface_state_ << __E__;
1091 if(daqinterface_state_ ==
"running")
1094 PyObject* pName = PyUnicode_FromString(
"do_stop_running");
1095 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1097 0, captureStderrAndStdout_(
"do_stop_running"),
"do_stop_running");
1101 std::string err = capturePyErr();
1102 __SS__ <<
"Error calling DAQ Interface stop transition: " << err
1108 PyObject* pName = PyUnicode_FromString(
"do_command");
1109 PyObject* pArg = PyUnicode_FromString(
"Shutdown");
1110 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1112 0, captureStderrAndStdout_(
"do_command Shutdown"),
"do_command Shutdown");
1116 std::string err = capturePyErr();
1117 __SS__ <<
"Error calling DAQ Interface halt transition: " << err << __E__;
1122 __SUP_COUT__ <<
"Status after halt: " << daqinterface_state_ << __E__;
1126 __SUP_COUT__ <<
"Halted." << __E__;
1127 set_thread_message_(
"Halted");
1129 catch(
const std::runtime_error& e)
1131 const std::string transitionName =
"Halting";
1133 if(theStateMachine_.getProvenanceStateName() ==
1134 RunControlStateMachine::FAILED_STATE_NAME ||
1135 theStateMachine_.getProvenanceStateName() ==
1136 RunControlStateMachine::HALTED_STATE_NAME)
1138 __SUP_COUT_INFO__ <<
"Error was caught while halting (but ignoring because "
1139 "previous state was '"
1140 << RunControlStateMachine::FAILED_STATE_NAME
1141 <<
"'): " << e.what() << __E__;
1145 __SUP_SS__ <<
"Error was caught while " << transitionName <<
": " << e.what()
1147 __SUP_COUT_ERR__ <<
"\n" << ss.str();
1148 theStateMachine_.setErrorMessage(ss.str());
1149 throw toolbox::fsm::exception::Exception(
1150 "Transition Error" ,
1152 "ARTDAQSupervisorBase::transition" + transitionName ,
1160 const std::string transitionName =
"Halting";
1162 if(theStateMachine_.getProvenanceStateName() ==
1163 RunControlStateMachine::FAILED_STATE_NAME ||
1164 theStateMachine_.getProvenanceStateName() ==
1165 RunControlStateMachine::HALTED_STATE_NAME)
1167 __SUP_COUT_INFO__ <<
"Unknown error was caught while halting (but ignoring "
1168 "because previous state was '"
1169 << RunControlStateMachine::FAILED_STATE_NAME <<
"')." << __E__;
1173 __SUP_SS__ <<
"Unknown error was caught while " << transitionName
1174 <<
". Please checked the logs." << __E__;
1175 __SUP_COUT_ERR__ <<
"\n" << ss.str();
1176 theStateMachine_.setErrorMessage(ss.str());
1178 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1180 throw toolbox::fsm::exception::Exception(
1181 "Transition Error" ,
1183 "ARTDAQSupervisorBase::transition" + transitionName ,
1194 set_thread_message_(
"Initializing");
1195 __SUP_COUT__ <<
"Initializing..." << __E__;
1197 __SUP_COUT__ <<
"Initialized." << __E__;
1198 set_thread_message_(
"Initialized");
1200 catch(
const std::runtime_error& e)
1202 __SS__ <<
"Error was caught while Initializing: " << e.what() << __E__;
1207 __SS__ <<
"Unknown error was caught while Initializing. Please checked the logs."
1209 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1214 void ARTDAQSupervisor::transitionPausing(toolbox::Event::Reference )
1217 set_thread_message_(
"Pausing");
1218 __SUP_COUT__ <<
"Pausing..." << __E__;
1219 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1222 __SUP_COUT__ <<
"Status before pause: " << daqinterface_state_ << __E__;
1224 PyObject* pName = PyUnicode_FromString(
"do_command");
1225 PyObject* pArg = PyUnicode_FromString(
"Pause");
1226 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1228 0, captureStderrAndStdout_(
"do_command Pause"),
"do_command Pause");
1232 std::string err = capturePyErr();
1233 __SS__ <<
"Error calling DAQ Interface Pause transition: " << err << __E__;
1238 __SUP_COUT__ <<
"Status after pause: " << daqinterface_state_ << __E__;
1240 __SUP_COUT__ <<
"Paused." << __E__;
1241 set_thread_message_(
"Paused");
1243 catch(
const std::runtime_error& e)
1245 __SS__ <<
"Error was caught while Pausing: " << e.what() << __E__;
1250 __SS__ <<
"Unknown error was caught while Pausing. Please checked the logs." << __E__;
1251 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1256 void ARTDAQSupervisor::transitionResuming(toolbox::Event::Reference )
1259 set_thread_message_(
"Resuming");
1260 __SUP_COUT__ <<
"Resuming..." << __E__;
1261 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1264 __SUP_COUT__ <<
"Status before resume: " << daqinterface_state_ << __E__;
1265 PyObject* pName = PyUnicode_FromString(
"do_command");
1266 PyObject* pArg = PyUnicode_FromString(
"Resume");
1267 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1269 0, captureStderrAndStdout_(
"do_command Resume"),
"do_command Resume");
1273 std::string err = capturePyErr();
1274 __SS__ <<
"Error calling DAQ Interface Resume transition: " << err << __E__;
1278 __SUP_COUT__ <<
"Status after resume: " << daqinterface_state_ << __E__;
1279 __SUP_COUT__ <<
"Resumed." << __E__;
1280 set_thread_message_(
"Resumed");
1282 catch(
const std::runtime_error& e)
1284 __SS__ <<
"Error was caught while Resuming: " << e.what() << __E__;
1289 __SS__ <<
"Unknown error was caught while Resuming. Please checked the logs."
1291 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1296 void ARTDAQSupervisor::transitionStarting(toolbox::Event::Reference )
1299 __SUP_COUT__ <<
"transitionStarting" << __E__;
1302 if(RunControlStateMachine::getIterationIndex() == 0 &&
1303 RunControlStateMachine::getSubIterationIndex() == 0)
1305 thread_error_message_ =
"";
1306 thread_progress_bar_.resetProgressBar(0);
1307 last_thread_progress_update_ = time(0);
1310 std::thread(&ARTDAQSupervisor::startingThread,
this).detach();
1312 __SUP_COUT_INFO__ <<
"Starting thread started." << __E__;
1314 RunControlStateMachine::
1315 indicateIterationWork();
1319 std::string errorMessage;
1321 std::lock_guard<std::mutex> lock(
1323 errorMessage = thread_error_message_;
1325 int progress = thread_progress_bar_.
read();
1326 __SUP_COUTV__(errorMessage);
1327 __SUP_COUTV__(progress);
1328 __SUP_COUTV__(thread_progress_bar_.
isComplete());
1331 if(errorMessage ==
"" &&
1332 time(0) - last_thread_progress_update_ > 600)
1334 __SUP_SS__ <<
"There has been no update from the start thread for "
1335 << (time(0) - last_thread_progress_update_)
1336 <<
" seconds, assuming something is wrong and giving up! "
1337 <<
"Last progress received was " << progress << __E__;
1338 errorMessage = ss.str();
1341 if(errorMessage !=
"")
1343 __SUP_SS__ <<
"Error was caught in starting thread: " << errorMessage
1345 __SUP_COUT_ERR__ <<
"\n" << ss.str();
1347 theStateMachine_.setErrorMessage(ss.str());
1348 throw toolbox::fsm::exception::Exception(
1349 "Transition Error" ,
1351 "CoreSupervisorBase::transitionStarting" ,
1359 __SUP_COUT__ <<
"Not done yet..." << __E__;
1363 RunControlStateMachine::
1364 indicateIterationWork();
1366 if(last_thread_progress_read_ != progress)
1368 last_thread_progress_read_ = progress;
1369 last_thread_progress_update_ = time(0);
1376 __SUP_COUT_INFO__ <<
"Starting transition completed!" << __E__;
1377 __SUP_COUTV__(getProcessInfo_());
1384 catch(
const std::runtime_error& e)
1386 __SS__ <<
"Error was caught while Starting: " << e.what() << __E__;
1391 __SS__ <<
"Unknown error was caught while Starting. Please checked the logs."
1393 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1398 void ARTDAQSupervisor::startingThread()
1401 std::string uid = theConfigurationManager_
1402 ->
getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
1403 "/" + CorePropertySupervisorBase::getSupervisorUID() +
1404 "/" +
"LinkToSupervisorTable")
1407 __COUT__ <<
"Supervisor uid is " << uid <<
", getting supervisor table node" << __E__;
1408 const std::string mfSubject_ = supervisorClassNoNamespace_ +
"-" + uid;
1409 __GEN_COUT__ <<
"Starting..." << __E__;
1410 set_thread_message_(
"Starting");
1412 thread_progress_bar_.
step();
1415 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1417 __GEN_COUT__ <<
"Status before start: " << daqinterface_state_ << __E__;
1418 auto runNumber = SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
1420 .getValue(
"RunNumber");
1422 thread_progress_bar_.
step();
1424 __GEN_COUT_INFO__ <<
"Calling do_start_running" << __E__;
1425 PyObject* pName = PyUnicode_FromString(
"do_start_running");
1426 int run_number = std::stoi(runNumber);
1427 PyObject* pStateArgs = PyLong_FromLong(run_number);
1429 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pStateArgs, NULL);
1431 0, captureStderrAndStdout_(
"do_start_running"),
"do_start_running");
1433 thread_progress_bar_.
step();
1437 std::string err = capturePyErr();
1438 __SS__ <<
"Error calling start transition: " << err << __E__;
1443 thread_progress_bar_.
step();
1445 __GEN_COUT__ <<
"Status after start: " << daqinterface_state_ << __E__;
1446 if(daqinterface_state_ !=
"running")
1448 __SS__ <<
"DAQInterface start transition failed!" << __E__;
1452 thread_progress_bar_.
step();
1455 set_thread_message_(
"Started");
1456 thread_progress_bar_.
step();
1458 __GEN_COUT_INFO__ <<
"Started." << __E__;
1462 catch(
const std::runtime_error& e)
1464 __SS__ <<
"Error was caught while Starting: " << e.what() << __E__;
1465 __COUT_ERR__ <<
"\n" << ss.str();
1466 std::lock_guard<std::mutex> lock(thread_mutex_);
1467 thread_error_message_ = ss.str();
1471 __SS__ <<
"Unknown error was caught while Starting. Please checked the logs."
1473 __COUT_ERR__ <<
"\n" << ss.str();
1475 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1477 std::lock_guard<std::mutex> lock(thread_mutex_);
1478 thread_error_message_ = ss.str();
1482 void ARTDAQSupervisor::transitionStopping(toolbox::Event::Reference )
1485 __SUP_COUT__ <<
"Stopping..." << __E__;
1486 set_thread_message_(
"Stopping");
1487 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1489 __SUP_COUT__ <<
"Status before stop: " << daqinterface_state_ << __E__;
1490 PyObject* pName = PyUnicode_FromString(
"do_stop_running");
1491 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1492 __COUT_MULTI_LBL__(0, captureStderrAndStdout_(
"do_stop_running"),
"do_stop_running");
1496 std::string err = capturePyErr();
1497 __SS__ <<
"Error calling DAQ Interface stop transition: " << err << __E__;
1501 __SUP_COUT__ <<
"Status after stop: " << daqinterface_state_ << __E__;
1502 __SUP_COUT__ <<
"Stopped." << __E__;
1503 set_thread_message_(
"Stopped");
1505 catch(
const std::runtime_error& e)
1507 __SS__ <<
"Error was caught while Stopping: " << e.what() << __E__;
1512 __SS__ <<
"Unknown error was caught while Stopping. Please checked the logs."
1514 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1519 void ots::ARTDAQSupervisor::enteringError(toolbox::Event::Reference )
1521 __SUP_COUT__ <<
"Entering error recovery state" << __E__;
1522 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1524 __SUP_COUT__ <<
"Status before error: " << daqinterface_state_ << __E__;
1526 PyObject* pName = PyUnicode_FromString(
"do_recover");
1527 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1528 __COUT_MULTI_LBL__(0, captureStderrAndStdout_(
"do_recover"),
"do_recover");
1532 std::string err = capturePyErr();
1534 __SUP_COUT_WARN__ <<
"Error calling DAQ Interface recover transition: " << err
1539 __SUP_COUT__ <<
"Status after error: " << daqinterface_state_ << __E__;
1540 __SUP_COUT__ <<
"EnteringError DONE." << __E__;
1544 std::vector<SupervisorInfo::SubappInfo> ots::ARTDAQSupervisor::getSubappInfo(
void)
1546 auto apps = getAndParseProcessInfo_();
1547 std::vector<SupervisorInfo::SubappInfo> output;
1548 for(
auto& app : apps)
1552 info.
name = app.label;
1553 info.detail =
"Rank " + std::to_string(app.rank) +
", subsystem " +
1554 std::to_string(app.subsystem);
1555 info.lastStatusTime = time(0);
1556 info.progress = 100;
1557 info.status = artdaqStateToOtsState(app.state);
1558 info.url =
"http://" + app.host +
":" + std::to_string(app.port) +
"/RPC2";
1559 info.class_name =
"ARTDAQ " + labelToProcType_(app.label);
1561 output.push_back(info);
1567 std::string ots::ARTDAQSupervisor::capturePyErr(std::string label )
1569 return captureStderrAndStdout_(
1576 PyObject* err_text = PyObject_CallMethod(stringIO_err,
"getvalue", NULL);
1577 std::string err =
"";
1579 err =
"Capture of " + label +
"PyErr failed.";
1581 err =
"Capture of " + label +
"PyErr: " + std::string(PyUnicode_AsUTF8(err_text));
1585 PyObject* r1 = PyObject_CallMethod(stringIO_err,
"seek",
"i", 0);
1587 PyObject* r2 = PyObject_CallMethod(stringIO_err,
"truncate", NULL);
1594 std::string ots::ARTDAQSupervisor::captureStderrAndStdout_(std::string label )
1601 std::string outString =
"";
1602 PyObject* out = PyObject_CallMethod(stringIO_out,
"getvalue", NULL);
1606 const char* text = PyUnicode_AsUTF8(out);
1615 PyObject* out_text = PyObject_CallMethod(stringIO_out,
"getvalue", NULL);
1620 const char* out_cstr = PyUnicode_AsUTF8(out_text);
1621 if(out_cstr && strlen(out_cstr))
1622 outString =
"Captured " + label +
"stdout:\n" +
1623 std::string(out_cstr ? out_cstr :
"") +
"\n";
1625 outString =
"Captured " + label +
"stdout empty.\n";
1628 std::string errString =
"";
1629 PyObject* err_text = PyObject_CallMethod(stringIO_err,
"getvalue", NULL);
1631 __SUP_COUT__ <<
"Capture of " << label <<
"stderr failed.";
1634 const char* err_cstr = PyUnicode_AsUTF8(err_text);
1635 if(err_cstr && strlen(err_cstr))
1636 errString =
"Captured " + label +
"stderr:\n" +
1637 std::string(err_cstr ? err_cstr :
"") +
"\n";
1639 errString =
"Captured " + label +
"stderr empty.\n";
1644 PyObject* r1 = PyObject_CallMethod(stringIO_out,
"seek",
"i", 0);
1646 PyObject* r2 = PyObject_CallMethod(stringIO_out,
"truncate", NULL);
1650 PyObject* r1 = PyObject_CallMethod(stringIO_err,
"seek",
"i", 0);
1652 PyObject* r2 = PyObject_CallMethod(stringIO_err,
"truncate", NULL);
1656 return errString + outString;
1660 __COUT_ERR__ <<
"Exception caught while capturing python output!" << __E__;
1667 void ots::ARTDAQSupervisor::getDAQState_()
1669 __SUP_COUTS__(50) <<
"Getting DAQInterface python lock" << __E__;
1670 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1671 __SUP_COUTS__(50) <<
"Have DAQInterface python lock" << __E__;
1673 if(daqinterface_ptr_ ==
nullptr)
1675 daqinterface_state_ =
"";
1677 <<
"daqinterface_ptr_ is not initialized! Check logs for errors." << __E__;
1684 PyObject* pName = PyUnicode_FromString(
"state");
1685 PyObject* pArg = PyUnicode_FromString(
"DAQInterface");
1686 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1691 std::string err = capturePyErr(
"getDAQState_");
1693 <<
"Attempt n " << tries
1694 <<
". Error calling 'state' function from getDAQState_() - here was the "
1700 daqinterface_state_ =
"";
1701 __COUT__ << ss.str();
1703 __COUT_ERR__ << ss.str();
1705 __SUP_COUTS__(2) <<
"no getDAQState_ state=" << daqinterface_state_ << __E__;
1708 daqinterface_state_ = std::string(PyUnicode_AsUTF8(res));
1709 __SUP_COUTS__(20) <<
"getDAQState_ state=" << daqinterface_state_ << __E__;
1716 std::string ots::ARTDAQSupervisor::getProcessInfo_(
void)
1718 __SUP_COUTS__(50) <<
"Getting DAQInterface state lock" << __E__;
1719 std::lock_guard<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1720 __SUP_COUTS__(50) <<
"Have DAQInterface state lock" << __E__;
1722 if(daqinterface_ptr_ ==
nullptr)
1727 PyObject* pName = PyUnicode_FromString(
"artdaq_process_info");
1728 PyObject* pArg = PyUnicode_FromString(
"DAQInterface");
1729 PyObject* pArg2 = PyBool_FromLong(
true);
1731 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, pArg2, NULL);
1735 std::string err = capturePyErr();
1736 __SS__ <<
"Error calling artdaq_process_info function: " << err << __E__;
1741 std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1742 daqinterface_status_ = std::string(PyUnicode_AsUTF8(res));
1743 return daqinterface_status_;
1746 std::string ots::ARTDAQSupervisor::artdaqStateToOtsState(std::string state)
1748 if(state ==
"nonexistant")
1749 return RunControlStateMachine::INITIAL_STATE_NAME;
1750 if(state ==
"Ready")
1751 return "Configured";
1752 if(state ==
"Running")
1753 return RunControlStateMachine::RUNNING_STATE_NAME;
1754 if(state ==
"Paused")
1755 return RunControlStateMachine::PAUSED_STATE_NAME;
1756 if(state ==
"Stopped")
1757 return RunControlStateMachine::HALTED_STATE_NAME;
1759 TLOG(TLVL_WARNING) <<
"Unrecognized state name " << state;
1760 return RunControlStateMachine::FAILED_STATE_NAME;
1763 std::string ots::ARTDAQSupervisor::labelToProcType_(std::string label)
1765 if(label_to_proc_type_map_.count(label))
1767 return label_to_proc_type_map_[label];
1774 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo>
1775 ots::ARTDAQSupervisor::getAndParseProcessInfo_()
1777 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> output;
1782 std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_,
1786 __COUTS__(50) <<
"Do not have python lock." << __E__;
1787 std::lock_guard<std::mutex> lock(daqinterface_statusMutex_);
1788 info = daqinterface_status_;
1792 __COUTS__(50) <<
"Have python lock!" << __E__;
1793 info = getProcessInfo_();
1795 __COUTVS__(20, info);
1797 auto procs = tokenize_(info);
1806 std::regex re(
"(.*?) at ([^:]*):(\\d+) \\(subsystem (\\d+), rank (\\d+)\\): (.*)");
1808 for(
auto& proc : procs)
1811 if(std::regex_match(proc, match, re))
1813 DAQInterfaceProcessInfo info;
1815 info.label = match[1];
1816 info.host = match[2];
1817 info.port = std::stoi(match[3]);
1818 info.subsystem = std::stoi(match[4]);
1819 info.rank = std::stoi(match[5]);
1820 info.state = match[6];
1822 output.push_back(info);
1830 std::unique_ptr<artdaq::CommanderInterface>>>
1831 ots::ARTDAQSupervisor::makeCommandersFromProcessInfo()
1834 std::pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>>
1836 auto infos = getAndParseProcessInfo_();
1838 for(
auto& info : infos)
1840 artdaq::Commandable cm;
1841 fhicl::ParameterSet ps;
1843 ps.put<std::string>(
"commanderPluginType",
"xmlrpc");
1844 ps.put<
int>(
"id", info.port);
1845 ps.put<std::string>(
"server_url", info.host);
1847 output.emplace_back(std::make_pair<DAQInterfaceProcessInfo,
1848 std::unique_ptr<artdaq::CommanderInterface>>(
1849 std::move(info), artdaq::MakeCommanderPlugin(ps, cm)));
1856 std::list<std::string> ots::ARTDAQSupervisor::tokenize_(std::string
const& input)
1859 std::list<std::string> output;
1861 while(pos != std::string::npos && pos < input.size())
1863 auto newpos = input.find(
'\n', pos);
1864 if(newpos != std::string::npos)
1866 output.emplace_back(input, pos, newpos - pos);
1872 output.emplace_back(input, pos);
1881 void ots::ARTDAQSupervisor::daqinterfaceRunner_()
1883 TLOG(TLVL_TRACE) <<
"Runner thread starting";
1884 runner_running_ =
true;
1885 while(runner_running_)
1887 if(daqinterface_ptr_ != NULL)
1889 std::unique_lock<std::recursive_mutex> lk(daqinterface_pythonMutex_);
1891 std::string state_before = daqinterface_state_;
1893 __SUP_COUTS__(2) <<
"Runner state_before=" << state_before
1894 <<
" state now=" << daqinterface_state_
1895 <<
" ?= running, ready, or booted" << __E__;
1897 if(daqinterface_state_ ==
"running" || daqinterface_state_ ==
"ready" ||
1898 daqinterface_state_ ==
"booted")
1902 TLOG(TLVL_TRACE) <<
"Calling DAQInterface::check_proc_heartbeats";
1903 PyObject* pName = PyUnicode_FromString(
"check_proc_heartbeats");
1905 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1906 __COUT_MULTI_LBL__(1,
1907 captureStderrAndStdout_(
"check_proc_heartbeats"),
1908 "check_proc_heartbeats");
1910 <<
"Done with DAQInterface::check_proc_heartbeats call";
1914 runner_running_ =
false;
1915 std::string err = capturePyErr(
"check_proc_heartbeats");
1916 __SS__ <<
"Error calling check_proc_heartbeats function: " << err
1922 catch(cet::exception& ex)
1924 runner_running_ =
false;
1925 std::string err = capturePyErr(
"check_proc_heartbeats");
1926 __SS__ <<
"An cet::exception occurred while calling "
1927 "check_proc_heartbeats function "
1928 << ex.explain_self() <<
": " << err << __E__;
1932 catch(std::exception& ex)
1934 runner_running_ =
false;
1935 std::string err = capturePyErr(
"check_proc_heartbeats");
1936 __SS__ <<
"An std::exception occurred while calling "
1937 "check_proc_heartbeats function: "
1938 << ex.what() <<
"\n\n"
1945 runner_running_ =
false;
1946 std::string err = capturePyErr(
"check_proc_heartbeats");
1947 __SS__ <<
"An unknown Error occurred while calling "
1948 "check_proc_heartbeats function: "
1956 if(daqinterface_state_ != state_before)
1958 runner_running_ =
false;
1960 __SS__ <<
"DAQInterface state unexpectedly changed from "
1961 << state_before <<
" to " << daqinterface_state_
1962 <<
". Check supervisor log file for more info!" << __E__;
1970 __SUP_COUT__ <<
"daqinterface_ptr_ is null" << __E__;
1975 runner_running_ =
false;
1976 TLOG(TLVL_TRACE) <<
"Runner thread complete";
1980 void ots::ARTDAQSupervisor::stop_runner_()
1982 runner_running_ =
false;
1983 if(runner_thread_ && runner_thread_->joinable())
1985 runner_thread_->join();
1986 runner_thread_.reset(
nullptr);
1991 void ots::ARTDAQSupervisor::start_runner_()
1995 std::make_unique<std::thread>(&ots::ARTDAQSupervisor::daqinterfaceRunner_,
this);
virtual void transitionHalting(toolbox::Event::Reference event) override
virtual void transitionInitializing(toolbox::Event::Reference event) override
static const std::string ARTDAQ_FCL_PATH
Tree-path rule is, if the last link in the path is a group link with a specified group ID,...
void loadTableGroup(const std::string &tableGroupName, const TableGroupKey &tableGroupKey, bool doActivate=false, std::map< std::string, TableVersion > *groupMembers=0, ProgressBar *progressBar=0, std::string *accumulateWarnings=0, std::string *groupComment=0, std::string *groupAuthor=0, std::string *groupCreateTime=0, bool doNotLoadMember=false, std::string *groupTypeString=0, std::map< std::string, std::string > *groupAliases=0, ConfigurationManager::LoadGroupType groupTypeToLoad=ConfigurationManager::LoadGroupType::ALL_TYPES, bool ignoreVersionTracking=false)
ConfigurationTree getNode(const std::string &nodeString, bool doNotThrowOnBrokenUIDLinks=false) const
"root/parent/parent/"
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
const std::string & getValueAsString(bool returnLinkTableValue=false) const
void getValue(T &value) const
ITRACEController * theTRACEController_
only define for an app that receives a command
bool isComplete()
get functions
int read()
if stepsToComplete==0, then define any progress as 50%, thread safe
void complete()
declare complete, thread safe
void INIT_MF(const char *name)
static std::string stackTrace(void)
std::string name
Also key in map.