1 #include "otsdaq/CoreSupervisors/FESupervisor.h"
2 #include "otsdaq/ConfigurationInterface/ConfigurationManager.h"
3 #include "otsdaq/FECore/FEVInterfacesManager.h"
4 #include "otsdaq/TablePlugins/ARTDAQTableBase/ARTDAQTableBase.h"
6 #include "artdaq/DAQdata/Globals.hh"
12 #include "artdaq-core/Utilities/ExceptionHandler.hh"
41 FESupervisor::FESupervisor(xdaq::ApplicationStub* stub)
44 , dp_socket_{dp_context_, zmq::socket_type::pub}
46 __SUP_COUT__ <<
"Constructing..." << __E__;
49 &FESupervisor::macroMakerSupervisorRequest,
50 "MacroMakerSupervisorRequest",
54 this, &FESupervisor::workLoopStatusRequest,
"WorkLoopStatusRequest", XDAQ_NS_URI);
57 &FESupervisor::frontEndCommunicationRequest,
61 std::string dataPublishingEndpoint;
64 dataPublishingEndpoint = getSupervisorProperty(
65 "data_publishing_endpoint");
67 catch(
const std::runtime_error& e)
69 __SUP_COUT__ <<
"Data publishing property 'data_publishing_endpoint' not set. "
70 "Defaulting to data publishing disabled."
75 dataPublishingEndpoint = __ENV__(
76 "DATA_PUBLISHING_ENDPOINT");
78 catch(
const std::runtime_error& e)
81 <<
"Data publishing environment variable 'DATA_PUBLISHING_ENDPOINT' "
82 "not set. Defaulting to data publishing disabled."
86 __SUP_COUT_INFO__ <<
"dataPublishingEndpoint = " << dataPublishingEndpoint << __E__;
88 if(dataPublishingEndpoint.size())
90 __SUP_COUT__ <<
"Initializing data publishing to endpoint '"
91 << dataPublishingEndpoint <<
"'..." << __E__;
93 initDataPublishing(dataPublishingEndpoint,
94 getSupervisorProperty(
"data_publishing_topic",
""));
99 CoreSupervisorBase::theStateMachineImplementation_.push_back(
101 CorePropertySupervisorBase::getContextTreeNode(),
102 CorePropertySupervisorBase::getSupervisorConfigurationPath()));
106 if(CorePropertySupervisorBase::allSupervisorInfo_.isMacroMakerMode())
108 __SUP_COUT_WARN__ <<
"Error caught constructing FE Interface Manager. In "
109 "Macro Maker mode, the input fhicl defines the "
110 "configuration tree, make sure you specified a valid "
117 catch(
const std::runtime_error& e)
119 __SUP_COUT_WARN__ << e.what() << __E__;
126 extractFEInterfacesManager();
128 __SUP_COUT__ <<
"Constructed." << __E__;
130 if(CorePropertySupervisorBase::allSupervisorInfo_.isMacroMakerMode())
132 __SUP_COUT_INFO__ <<
"Macro Maker mode, so configuring at startup!" << __E__;
133 if(!theFEInterfacesManager_)
135 __SUP_SS__ <<
"Missing FE Interface manager!" << __E__;
146 __SUP_COUT__ <<
"Configuring all state machine implementations..." << __E__;
147 __SUP_COUTV__(stateMachinesIterationDone_.size());
148 preStateMachineExecutionLoop();
149 __SUP_COUTV__(stateMachinesIterationDone_.size());
150 for(
unsigned int i = 0; i < theStateMachineImplementation_.size(); ++i)
152 __SUP_COUT__ <<
"Configuring state machine i " << i << __E__;
155 if(subIterationWorkStateMachineIndex_ != (
unsigned int)-1 &&
156 i != subIterationWorkStateMachineIndex_)
159 if(stateMachinesIterationDone_[i])
162 preStateMachineExecution(i);
163 theStateMachineImplementation_[i]->parentSupervisor_ =
166 theStateMachineImplementation_[i]->configure();
169 postStateMachineExecution(i);
171 postStateMachineExecutionLoop();
174 CorePropertySupervisorBase::indicateOtsAlive(0);
176 catch(
const std::runtime_error& e)
178 __SUP_SS__ <<
"Error was caught while configuring: " << e.what() << __E__;
179 __SUP_COUT_ERR__ <<
"\n" << ss.str();
185 <<
"Unknown error was caught while configuring. Please checked the logs."
191 catch(
const std::exception& e)
193 ss <<
"Exception message: " << e.what();
198 __SUP_COUT_ERR__ <<
"\n" << ss.str();
205 FESupervisor::~FESupervisor(
void)
207 __SUP_COUT__ <<
"Destroying..." << __E__;
221 artdaq::Globals::CleanUpGlobals();
223 __SUP_COUT__ <<
"Destructed." << __E__;
227 xoap::MessageReference FESupervisor::frontEndCommunicationRequest(
228 xoap::MessageReference message)
233 if(!theFEInterfacesManager_)
235 __SUP_SS__ <<
"No FE Interface Manager!" << __E__;
239 typeParameter.addParameter(
"type");
240 SOAPUtilities::receive(message, typeParameter);
242 std::string type = typeParameter.getValue(
"type");
252 rxParameters.addParameter(
"requester");
253 rxParameters.addParameter(
"targetInterfaceID");
259 rxParameters.addParameter(
"value");
260 SOAPUtilities::receive(message, rxParameters);
262 std::string requester = rxParameters.getValue(
"requester");
263 std::string targetInterfaceID = rxParameters.getValue(
"targetInterfaceID");
264 std::string value = rxParameters.getValue(
"value");
266 __SUP_COUTV__(requester);
267 __SUP_COUTV__(targetInterfaceID);
268 __SUP_COUTV__(value);
275 std::lock_guard<std::mutex> lock(
278 theFEInterfacesManager_
279 ->frontEndCommunicationReceiveBuffer_[targetInterfaceID][requester]
282 __SUP_COUT__ <<
"Number of target interface ID '" << targetInterfaceID
284 << theFEInterfacesManager_
285 ->frontEndCommunicationReceiveBuffer_[targetInterfaceID]
289 <<
"Number of source interface ID '" << requester <<
"' values received: "
290 << theFEInterfacesManager_
291 ->frontEndCommunicationReceiveBuffer_[targetInterfaceID][requester]
295 return SOAPUtilities::makeSOAPMessageReference(
"Received");
297 else if(type ==
"feMacro")
301 rxParameters.addParameter(
"feMacroName");
302 rxParameters.addParameter(
"inputArgs");
304 SOAPUtilities::receive(message, rxParameters);
306 std::string requester = rxParameters.getValue(
"requester");
307 std::string targetInterfaceID = rxParameters.getValue(
"targetInterfaceID");
308 std::string feMacroName = rxParameters.getValue(
"feMacroName");
309 std::string inputArgs = rxParameters.getValue(
"inputArgs");
311 __SUP_COUTV__(requester);
312 __SUP_COUTV__(targetInterfaceID);
313 __SUP_COUTV__(feMacroName);
314 __SUP_COUTV__(inputArgs);
316 std::string outputArgs;
320 requester, targetInterfaceID, feMacroName, inputArgs, outputArgs);
322 catch(std::runtime_error& e)
324 __SUP_SS__ <<
"In Supervisor with LID="
325 << getApplicationDescriptor()->getLocalId()
326 <<
" the FE Macro named '" << feMacroName <<
"' with target FE '"
327 << targetInterfaceID <<
"' failed. Here is the error:\n\n"
328 << e.what() << __E__;
333 __SUP_SS__ <<
"In Supervisor with LID="
334 << getApplicationDescriptor()->getLocalId()
335 <<
" the FE Macro named '" << feMacroName <<
"' with target FE '"
336 << targetInterfaceID <<
"' failed due to an unknown error."
342 catch(
const std::exception& e)
344 ss <<
"Exception message: " << e.what();
352 __SUP_COUTV__(outputArgs);
354 xoap::MessageReference replyMessage =
355 SOAPUtilities::makeSOAPMessageReference(
"feMacrosResponse");
357 txParameters.addParameter(
"requester", requester);
358 txParameters.addParameter(
"targetInterfaceID", targetInterfaceID);
359 txParameters.addParameter(
"feMacroName", feMacroName);
360 txParameters.addParameter(
"outputArgs", outputArgs);
361 SOAPUtilities::addParameters(replyMessage, txParameters);
363 __SUP_COUT__ <<
"Sending FE macro result: "
364 << SOAPUtilities::translate(replyMessage) << __E__;
368 else if(type ==
"feMacroMultiDimensionalStart" ||
369 type ==
"macroMultiDimensionalStart")
375 rxParameters.addParameter(
"macroString");
376 rxParameters.addParameter(
"macroName");
379 rxParameters.addParameter(
"feMacroName");
381 rxParameters.addParameter(
"enableSavingOutput");
382 rxParameters.addParameter(
"outputFilePath");
383 rxParameters.addParameter(
"outputFileRadix");
384 rxParameters.addParameter(
"inputArgs");
386 SOAPUtilities::receive(message, rxParameters);
388 std::string requester = rxParameters.getValue(
"requester");
389 std::string targetInterfaceID = rxParameters.getValue(
"targetInterfaceID");
390 std::string macroName, macroString;
393 macroName = rxParameters.getValue(
"macroName");
394 macroString = rxParameters.getValue(
"macroString");
395 __SUP_COUTV__(macroString);
398 macroName = rxParameters.getValue(
"feMacroName");
399 bool enableSavingOutput = rxParameters.getValue(
"enableSavingOutput") ==
"1";
400 std::string outputFilePath = rxParameters.getValue(
"outputFilePath");
401 std::string outputFileRadix = rxParameters.getValue(
"outputFileRadix");
402 std::string inputArgs = rxParameters.getValue(
"inputArgs");
404 __SUP_COUTV__(requester);
405 __SUP_COUTV__(targetInterfaceID);
406 __SUP_COUTV__(macroName);
407 __SUP_COUTV__(enableSavingOutput);
408 __SUP_COUTV__(outputFilePath);
409 __SUP_COUTV__(outputFileRadix);
410 __SUP_COUTV__(inputArgs);
425 catch(std::runtime_error& e)
427 __SUP_SS__ <<
"In Supervisor with LID="
428 << getApplicationDescriptor()->getLocalId()
429 <<
" the Macro named '" << macroName <<
"' with target FE '"
431 <<
"' failed to start multi-dimensional launch. "
432 <<
"Here is the error:\n\n"
433 << e.what() << __E__;
438 __SUP_SS__ <<
"In Supervisor with LID="
439 << getApplicationDescriptor()->getLocalId()
440 <<
" the Macro named '" << macroName <<
"' with target FE '"
442 <<
"' failed to start multi-dimensional launch "
443 <<
"due to an unknown error." << __E__;
448 catch(
const std::exception& e)
450 ss <<
"Exception message: " << e.what();
470 catch(std::runtime_error& e)
472 __SUP_SS__ <<
"In Supervisor with LID="
473 << getApplicationDescriptor()->getLocalId()
474 <<
" the FE Macro named '" << macroName <<
"' with target FE '"
476 <<
"' failed to start multi-dimensional launch. "
477 <<
"Here is the error:\n\n"
478 << e.what() << __E__;
483 __SUP_SS__ <<
"In Supervisor with LID="
484 << getApplicationDescriptor()->getLocalId()
485 <<
" the FE Macro named '" << macroName <<
"' with target FE '"
487 <<
"' failed to start multi-dimensional launch "
488 <<
"due to an unknown error." << __E__;
493 catch(
const std::exception& e)
495 ss <<
"Exception message: " << e.what();
504 xoap::MessageReference replyMessage =
505 SOAPUtilities::makeSOAPMessageReference(type +
"Done");
508 SOAPUtilities::addParameters(replyMessage, txParameters);
510 __SUP_COUT__ <<
"Sending FE macro result: "
511 << SOAPUtilities::translate(replyMessage) << __E__;
515 else if(type ==
"feMacroMultiDimensionalCheck" ||
516 type ==
"macroMultiDimensionalCheck")
520 rxParameters.addParameter(
"macroName");
522 rxParameters.addParameter(
"feMacroName");
523 rxParameters.addParameter(
"targetInterfaceID");
525 SOAPUtilities::receive(message, rxParameters);
527 std::string targetInterfaceID = rxParameters.getValue(
"targetInterfaceID");
528 std::string macroName;
530 macroName = rxParameters.getValue(
"macroName");
532 macroName = rxParameters.getValue(
"feMacroName");
543 catch(std::runtime_error& e)
545 __SUP_SS__ <<
"In Supervisor with LID="
546 << getApplicationDescriptor()->getLocalId()
547 <<
" the FE Macro named '" << macroName <<
"' with target FE '"
549 <<
"' failed to check multi-dimensional launch. "
550 <<
"Here is the error:\n\n"
551 << e.what() << __E__;
556 __SUP_SS__ <<
"In Supervisor with LID="
557 << getApplicationDescriptor()->getLocalId()
558 <<
" the FE Macro named '" << macroName <<
"' with target FE '"
560 <<
"' failed to check multi-dimensional launch "
561 <<
"due to an unknown error." << __E__;
566 catch(
const std::exception& e)
568 ss <<
"Exception message: " << e.what();
576 xoap::MessageReference replyMessage =
577 SOAPUtilities::makeSOAPMessageReference(type +
"Done");
579 txParameters.addParameter(
"Done", done ?
"1" :
"0");
580 SOAPUtilities::addParameters(replyMessage, txParameters);
588 __SUP_SS__ <<
"Unrecognized FE Communication type: " << type << __E__;
592 catch(
const std::runtime_error& e)
594 __SUP_SS__ <<
"Error encountered processing FE communication request: " << e.what()
596 __SUP_COUT_ERR__ << ss.str();
599 parameters.addParameter(
"Error", ss.str());
600 return SOAPUtilities::makeSOAPMessageReference(
601 supervisorClassNoNamespace_ +
"FailFECommunicationRequest", parameters);
605 __SUP_SS__ <<
"Unknown error encountered processing FE communication request."
611 catch(
const std::exception& e)
613 ss <<
"Exception message: " << e.what();
618 __SUP_COUT_ERR__ << ss.str();
621 parameters.addParameter(
"Error", ss.str());
622 return SOAPUtilities::makeSOAPMessageReference(
623 supervisorClassNoNamespace_ +
"FailFECommunicationRequest", parameters);
634 xoap::MessageReference message)
636 __SUP_COUT__ <<
"$$$$$$$$$$$$$$$$$" << __E__;
640 parameters.addParameter(
"Request");
642 __SUP_COUT__ <<
"Received Macro Maker message: " << SOAPUtilities::translate(message)
645 SOAPUtilities::receive(message, parameters);
646 std::string
request = parameters.getValue(
"Request");
648 __SUP_COUT__ <<
"request: " <<
request << __E__;
664 if(theFEInterfacesManager_)
665 retParameters.addParameter(
668 std::to_string(getApplicationDescriptor()->getLocalId())));
670 retParameters.addParameter(
"FEList",
"");
673 if(theStateMachine_.getErrorMessage() !=
"")
674 retParameters.addParameter(
"frontEndError",
675 theStateMachine_.getErrorMessage());
677 return SOAPUtilities::makeSOAPMessageReference(
678 supervisorClassNoNamespace_ +
"Response", retParameters);
680 else if(
request ==
"UniversalWrite")
682 if(!theFEInterfacesManager_)
684 __SUP_SS__ <<
"No FE Interface Manager! Are you configured?" << __E__;
689 requestParameters.addParameter(
"InterfaceID");
690 requestParameters.addParameter(
"Address");
691 requestParameters.addParameter(
"Data");
692 SOAPUtilities::receive(message, requestParameters);
693 std::string interfaceID = requestParameters.getValue(
"InterfaceID");
694 std::string addressStr = requestParameters.getValue(
"Address");
695 std::string dataStr = requestParameters.getValue(
"Data");
697 __SUP_COUT__ <<
"Address: " << addressStr <<
" Data: " << dataStr
698 <<
" InterfaceID: " << interfaceID << __E__;
705 <<
"theFEInterfacesManager_->getInterfaceUniversalAddressSize(index) "
709 <<
"theFEInterfacesManager_->getInterfaceUniversalDataSize(index) "
719 __SUP_COUT__ <<
"Translating address: ";
721 std::string addressTmp;
724 char* address = &addressTmp[0];
726 if(addressStr.size() % 2)
727 addressStr =
"0" + addressStr;
729 for(; i < addressStr.size() &&
734 tmpHex[0] = addressStr[addressStr.size() - 1 - i - 1];
735 tmpHex[1] = addressStr[addressStr.size() - 1 - i];
736 sscanf(tmpHex,
"%hhX", (
unsigned char*)&address[i / 2]);
737 printf(
"%2.2X", (
unsigned char)address[i / 2]);
745 printf(
"%2.2X", (
unsigned char)address[i / 2]);
750 __SUP_COUT__ <<
"Translating data: ";
755 char* data = &dataTmp[0];
757 if(dataStr.size() % 2)
758 dataStr =
"0" + dataStr;
761 for(; i < dataStr.size() &&
766 tmpHex[0] = dataStr[dataStr.size() - 1 - i - 1];
767 tmpHex[1] = dataStr[dataStr.size() - 1 - i];
768 sscanf(tmpHex,
"%hhX", (
unsigned char*)&data[i / 2]);
769 printf(
"%2.2X", (
unsigned char)data[i / 2]);
777 printf(
"%2.2X", (
unsigned char)data[i / 2]);
789 theFEInterfacesManager_->
universalWrite(interfaceID, address, data);
794 return SOAPUtilities::makeSOAPMessageReference(
795 supervisorClassNoNamespace_ +
"DataWritten", retParameters);
797 else if(
request ==
"UniversalRead")
799 if(!theFEInterfacesManager_)
801 __SUP_SS__ <<
"No FE Interface Manager! Are you configured?" << __E__;
807 requestParameters.addParameter(
"InterfaceID");
808 requestParameters.addParameter(
"Address");
809 SOAPUtilities::receive(message, requestParameters);
810 std::string interfaceID = requestParameters.getValue(
"InterfaceID");
811 std::string addressStr = requestParameters.getValue(
"Address");
813 __SUP_COUT__ <<
"Address: " << addressStr <<
" InterfaceID: " << interfaceID
822 <<
"theFEInterfacesManager_->getInterfaceUniversalAddressSize(index) "
826 <<
"theFEInterfacesManager_->getInterfaceUniversalDataSize(index) "
833 __SUP_COUT__ <<
"Translating address: ";
835 std::string addressTmp;
838 char* address = &addressTmp[0];
840 if(addressStr.size() % 2)
841 addressStr =
"0" + addressStr;
844 for(; i < addressStr.size() &&
849 tmpHex[0] = addressStr[addressStr.size() - 1 - i - 1];
850 tmpHex[1] = addressStr[addressStr.size() - 1 - i];
851 sscanf(tmpHex,
"%hhX", (
unsigned char*)&address[i / 2]);
852 printf(
"%2.2X", (
unsigned char)address[i / 2]);
860 printf(
"%2.2X", (
unsigned char)address[i / 2]);
865 unsigned int dataSz =
868 dataStr.resize(dataSz);
869 char* data = &dataStr[0];
877 theFEInterfacesManager_->
universalRead(interfaceID, address, data);
879 catch(
const std::runtime_error& e)
883 __SUP_COUT_ERR__ <<
"Exception caught during read: " << e.what() << __E__;
884 retParameters.addParameter(
"dataResult",
"Time Out Error");
885 return SOAPUtilities::makeSOAPMessageReference(
886 supervisorClassNoNamespace_ +
"aa", retParameters);
892 __SUP_COUT_ERR__ <<
"Exception caught during read." << __E__;
893 retParameters.addParameter(
"dataResult",
"Time Out Error");
894 return SOAPUtilities::makeSOAPMessageReference(
895 supervisorClassNoNamespace_ +
"aa", retParameters);
901 std::string str8(data);
903 __SUP_COUT__ <<
"decResult[" << dataSz
904 <<
" bytes]: " << *((
unsigned long long*)(&str8[0]))
908 std::string hexResultStr;
909 hexResultStr.reserve(dataSz * 2 + 1);
910 char* hexResult = &hexResultStr[0];
914 for(
unsigned int i = 0; i < dataSz; ++i)
916 sprintf(&hexResult[i * 2],
"%2.2X", (
unsigned char)data[dataSz - 1 - i]);
919 __SUP_COUT__ <<
"hexResult[" << strlen(hexResult)
920 <<
" nibbles]: " << std::string(hexResult) << __E__;
922 retParameters.addParameter(
"dataResult", hexResult);
923 return SOAPUtilities::makeSOAPMessageReference(
924 supervisorClassNoNamespace_ +
"aa", retParameters);
926 else if(
request ==
"GetInterfaceMacros")
928 if(theFEInterfacesManager_)
930 __SUP_COUT__ <<
"Getting FE Macros from FE Interface Manager..." << __E__;
931 retParameters.addParameter(
934 CorePropertySupervisorBase::getSupervisorUID(),
935 std::to_string(CoreSupervisorBase::getSupervisorLID())));
939 __SUP_COUT__ <<
"No FE Macros because there is no FE Interface Manager..."
941 retParameters.addParameter(
"FEMacros",
"");
944 return SOAPUtilities::makeSOAPMessageReference(
945 supervisorClassNoNamespace_ +
"Response", retParameters);
947 else if(
request ==
"RunInterfaceMacro")
949 if(!theFEInterfacesManager_)
951 __SUP_SS__ <<
"Missing FE Interface Manager! Are you configured?"
958 requestParameters.addParameter(
"feMacroName");
959 requestParameters.addParameter(
"inputArgs");
960 requestParameters.addParameter(
"outputArgs");
961 requestParameters.addParameter(
"InterfaceID");
962 requestParameters.addParameter(
"userPermissions");
963 requestParameters.addParameter(
"AsyncSupported");
964 SOAPUtilities::receive(message, requestParameters);
965 std::string interfaceID = requestParameters.getValue(
"InterfaceID");
966 std::string feMacroName = requestParameters.getValue(
"feMacroName");
967 std::string inputArgs = requestParameters.getValue(
"inputArgs");
968 std::string outputArgs = requestParameters.getValue(
"outputArgs");
969 std::string userPermissions = requestParameters.getValue(
"userPermissions");
970 bool asyncSupported = requestParameters.getValue(
"AsyncSupported") ==
"1";
975 __COUTV__(userPermissions);
976 std::map<std::string, WebUsers::permissionLevel_t> userPermissionLevelsMap;
978 userPermissions, userPermissionLevelsMap);
988 auto FEMacroIt = fe->getMapOfFEMacroFunctions().find(feMacroName);
989 if(FEMacroIt == fe->getMapOfFEMacroFunctions().end())
991 __SUP_SS__ <<
"FE Macro '" << feMacroName <<
"' of interfaceID '"
992 << interfaceID <<
"' was not found!" << __E__;
997 __COUTV__(FEMacro.requiredUserPermissions_);
998 std::map<std::string, WebUsers::permissionLevel_t>
999 FERequiredUserPermissionsMap;
1001 FEMacro.requiredUserPermissions_, FERequiredUserPermissionsMap);
1005 userPermissionLevelsMap, FERequiredUserPermissionsMap))
1007 __SUP_SS__ <<
"Invalid user permission for FE Macro '" << feMacroName
1008 <<
"' of interfaceID '" << interfaceID <<
"'!\n\n"
1009 <<
"Must have access level of at least '"
1011 <<
".' Users permissions level is only '" << userPermissions
1019 auto task = std::make_shared<AsyncMacroTask>();
1021 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1022 task->taskID = ++asyncMacroTaskIDCounter_;
1023 if(asyncMacroTaskIDCounter_ == 0)
1024 task->taskID = ++asyncMacroTaskIDCounter_;
1025 task->interfaceID = interfaceID;
1026 task->startTime = time(0);
1027 asyncMacroTasks_.push_back(task);
1036 outputArgs]()
mutable {
1038 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1039 task->threadID = std::this_thread::get_id();
1045 ->clearFEMacroPercentDone(std::this_thread::get_id());
1054 interfaceID, localFEMacro, inputArgs, outputArgs);
1058 ->clearFEMacroPercentDone(std::this_thread::get_id());
1064 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1065 task->outputArgs = outputArgs;
1066 task->doneTime = time(0);
1070 catch(
const std::exception& e)
1075 ->clearFEMacroPercentDone(std::this_thread::get_id());
1080 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1082 "In Supervisor with LID=" +
1083 std::to_string(getApplicationDescriptor()->getLocalId()) +
1084 " the FE Macro named '" + interfaceID +
1085 "' failed. Here is the error:\n\n" + e.what();
1086 task->doneTime = time(0);
1094 ->clearFEMacroPercentDone(std::this_thread::get_id());
1099 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1101 "In Supervisor with LID=" +
1102 std::to_string(getApplicationDescriptor()->getLocalId()) +
1103 " the FE Macro named '" + interfaceID +
1104 "' failed due to an unknown error.";
1105 task->doneTime = time(0);
1110 __SUP_COUT__ <<
"Launched async FE Macro '" << feMacroName
1111 <<
"' for interfaceID '" << interfaceID
1112 <<
"' with taskID=" << task->taskID << __E__;
1116 size_t sleepUs = 100;
1117 for(
int i = 0; i < 10; ++i)
1121 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1124 if(!task->error.empty())
1126 std::string errorCopy = task->error;
1127 for(
size_t j = 0; j < asyncMacroTasks_.size(); ++j)
1128 if(asyncMacroTasks_[j]->taskID == task->taskID)
1130 asyncMacroTasks_.erase(
1131 asyncMacroTasks_.begin() + j);
1134 __SUP_SS__ << errorCopy;
1137 retParameters.addParameter(
"outputArgs",
1139 for(
size_t j = 0; j < asyncMacroTasks_.size(); ++j)
1140 if(asyncMacroTasks_[j]->taskID == task->taskID)
1142 asyncMacroTasks_.erase(asyncMacroTasks_.begin() +
1146 return SOAPUtilities::makeSOAPMessageReference(
1147 supervisorClassNoNamespace_ +
"Response",
1152 if(sleepUs > 1000 * 1000)
1153 sleepUs = 1000 * 1000;
1157 retParameters.addParameter(
"NotDoneTaskID", std::to_string(task->taskID));
1158 return SOAPUtilities::makeSOAPMessageReference(
1159 supervisorClassNoNamespace_ +
"Response", retParameters);
1167 interfaceID, FEMacro, inputArgs, outputArgs);
1169 catch(std::runtime_error& e)
1171 __SUP_SS__ <<
"In Supervisor with LID="
1172 << getApplicationDescriptor()->getLocalId()
1173 <<
" the FE Macro named '" << feMacroName
1174 <<
"' with target FE '" << interfaceID
1175 <<
"' failed. Here is the error:\n\n"
1176 << e.what() << __E__;
1179 catch(std::exception& e)
1181 __SUP_SS__ <<
"In Supervisor with LID="
1182 << getApplicationDescriptor()->getLocalId()
1183 <<
" the FE Macro named '" << feMacroName
1184 <<
"' with target FE '" << interfaceID
1185 <<
"' failed. Here is the error:\n\n"
1186 << e.what() << __E__;
1191 __SUP_SS__ <<
"In Supervisor with LID="
1192 << getApplicationDescriptor()->getLocalId()
1193 <<
" the FE Macro named '" << feMacroName
1194 <<
"' with target FE '" << interfaceID
1195 <<
"' failed due to an unknown error." << __E__;
1200 catch(
const std::exception& e)
1202 ss <<
"Exception message: " << e.what();
1210 retParameters.addParameter(
"outputArgs", outputArgs);
1211 return SOAPUtilities::makeSOAPMessageReference(
1212 supervisorClassNoNamespace_ +
"Response", retParameters);
1215 else if(
request ==
"RunMacroMakerMacro")
1217 if(!theFEInterfacesManager_)
1219 __SUP_SS__ <<
"Missing FE Interface Manager! Are you configured?"
1226 requestParameters.addParameter(
"macroName");
1227 requestParameters.addParameter(
"macroString");
1228 requestParameters.addParameter(
"inputArgs");
1229 requestParameters.addParameter(
"outputArgs");
1230 requestParameters.addParameter(
"InterfaceID");
1231 requestParameters.addParameter(
"AsyncSupported");
1232 SOAPUtilities::receive(message, requestParameters);
1233 std::string interfaceID = requestParameters.getValue(
"InterfaceID");
1234 std::string macroName = requestParameters.getValue(
"macroName");
1235 std::string macroString = requestParameters.getValue(
"macroString");
1236 std::string inputArgs = requestParameters.getValue(
"inputArgs");
1237 std::string outputArgs = requestParameters.getValue(
"outputArgs");
1238 bool asyncSupported = requestParameters.getValue(
"AsyncSupported") ==
"1";
1243 auto task = std::make_shared<AsyncMacroTask>();
1245 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1246 task->taskID = ++asyncMacroTaskIDCounter_;
1247 if(asyncMacroTaskIDCounter_ == 0)
1248 task->taskID = ++asyncMacroTaskIDCounter_;
1249 task->interfaceID = interfaceID;
1250 task->startTime = time(0);
1251 asyncMacroTasks_.push_back(task);
1260 outputArgs]()
mutable {
1262 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1263 task->threadID = std::this_thread::get_id();
1268 ->clearFEMacroPercentDone(std::this_thread::get_id());
1277 interfaceID, macroString, inputArgs, outputArgs);
1281 ->clearFEMacroPercentDone(std::this_thread::get_id());
1287 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1288 task->outputArgs = outputArgs;
1289 task->doneTime = time(0);
1293 catch(
const std::exception& e)
1298 ->clearFEMacroPercentDone(std::this_thread::get_id());
1303 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1305 "In Supervisor with LID=" +
1306 std::to_string(getApplicationDescriptor()->getLocalId()) +
1307 " the MacroMaker Macro named '" + macroName +
1308 "' with target FE '" + interfaceID +
1309 "' failed. Here is the error:\n\n" + e.what();
1310 task->doneTime = time(0);
1318 ->clearFEMacroPercentDone(std::this_thread::get_id());
1323 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1325 "In Supervisor with LID=" +
1326 std::to_string(getApplicationDescriptor()->getLocalId()) +
1327 " the MacroMaker Macro named '" + macroName +
1328 "' with target FE '" + interfaceID +
1329 "' failed due to an unknown error.";
1330 task->doneTime = time(0);
1335 __SUP_COUT__ <<
"Launched async MacroMaker Macro '" << macroName
1336 <<
"' for interfaceID '" << interfaceID
1337 <<
"' with taskID=" << task->taskID << __E__;
1341 size_t sleepUs = 100;
1342 for(
int i = 0; i < 10; ++i)
1346 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1349 if(!task->error.empty())
1351 std::string errorCopy = task->error;
1352 for(
size_t j = 0; j < asyncMacroTasks_.size(); ++j)
1353 if(asyncMacroTasks_[j]->taskID == task->taskID)
1355 asyncMacroTasks_.erase(
1356 asyncMacroTasks_.begin() + j);
1359 __SUP_SS__ << errorCopy;
1362 retParameters.addParameter(
"outputArgs",
1364 for(
size_t j = 0; j < asyncMacroTasks_.size(); ++j)
1365 if(asyncMacroTasks_[j]->taskID == task->taskID)
1367 asyncMacroTasks_.erase(asyncMacroTasks_.begin() +
1371 return SOAPUtilities::makeSOAPMessageReference(
1372 supervisorClassNoNamespace_ +
"Response",
1377 if(sleepUs > 1000 * 1000)
1378 sleepUs = 1000 * 1000;
1382 retParameters.addParameter(
"NotDoneTaskID", std::to_string(task->taskID));
1383 return SOAPUtilities::makeSOAPMessageReference(
1384 supervisorClassNoNamespace_ +
"Response", retParameters);
1392 interfaceID, macroString, inputArgs, outputArgs);
1394 catch(std::runtime_error& e)
1396 __SUP_SS__ <<
"In Supervisor with LID="
1397 << getApplicationDescriptor()->getLocalId()
1398 <<
" the MacroMaker Macro named '" << macroName
1399 <<
"' with target FE '" << interfaceID
1400 <<
"' failed. Here is the error:\n\n"
1401 << e.what() << __E__;
1406 __SUP_SS__ <<
"In Supervisor with LID="
1407 << getApplicationDescriptor()->getLocalId()
1408 <<
" the MacroMaker Macro named '" << macroName
1409 <<
"' with target FE '" << interfaceID
1410 <<
"' failed due to an unknown error." << __E__;
1415 catch(
const std::exception& e)
1417 ss <<
"Exception message: " << e.what();
1425 retParameters.addParameter(
"outputArgs", outputArgs);
1426 return SOAPUtilities::makeSOAPMessageReference(
1427 supervisorClassNoNamespace_ +
"Response", retParameters);
1430 else if(
request ==
"CheckMacro")
1433 requestParameters.addParameter(
"TaskID");
1434 SOAPUtilities::receive(message, requestParameters);
1435 uint64_t taskID = std::stoull(requestParameters.getValue(
"TaskID"));
1437 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1440 time_t now = time(0);
1441 for(
size_t i = 0; i < asyncMacroTasks_.size();)
1443 auto& t = asyncMacroTasks_[i];
1444 if(t->done && t->doneTime > 0 && (now - t->doneTime) > 5 * 60)
1445 asyncMacroTasks_.erase(asyncMacroTasks_.begin() + i);
1451 std::shared_ptr<AsyncMacroTask> foundTask;
1452 for(
auto& t : asyncMacroTasks_)
1454 if(t->taskID == taskID)
1463 __SUP_SS__ <<
"Async macro task with ID=" << taskID
1464 <<
" was not found. Perhaps it completed more than 5 "
1472 if(!foundTask->error.empty())
1474 std::string errorCopy = foundTask->error;
1476 for(
size_t i = 0; i < asyncMacroTasks_.size(); ++i)
1478 if(asyncMacroTasks_[i]->taskID == taskID)
1480 asyncMacroTasks_.erase(asyncMacroTasks_.begin() + i);
1484 __SUP_SS__ << errorCopy;
1488 retParameters.addParameter(
"outputArgs", foundTask->outputArgs);
1491 for(
size_t i = 0; i < asyncMacroTasks_.size(); ++i)
1493 if(asyncMacroTasks_[i]->taskID == taskID)
1495 asyncMacroTasks_.erase(asyncMacroTasks_.begin() + i);
1500 return SOAPUtilities::makeSOAPMessageReference(
1501 supervisorClassNoNamespace_ +
"Response", retParameters);
1506 if(theFEInterfacesManager_ && foundTask->threadID != std::thread::id())
1510 int progress = theFEInterfacesManager_
1512 ->getFEMacroPercentDone(foundTask->threadID);
1514 retParameters.addParameter(
"Progress",
1515 std::to_string(progress));
1521 retParameters.addParameter(
"NotDoneTaskID", std::to_string(taskID));
1522 return SOAPUtilities::makeSOAPMessageReference(
1523 supervisorClassNoNamespace_ +
"Response", retParameters);
1528 __SUP_SS__ <<
"Unrecognized request received! '" <<
request <<
"'" << __E__;
1532 catch(
const std::runtime_error& e)
1534 __SUP_SS__ <<
"Error occurred handling request: " << e.what() << __E__;
1535 __SUP_COUT_ERR__ << ss.str();
1536 retParameters.addParameter(
"Error", ss.str());
1540 __SUP_SS__ <<
"Error occurred handling request." << __E__;
1545 catch(
const std::exception& e)
1547 ss <<
"Exception message: " << e.what();
1552 __SUP_COUT_ERR__ << ss.str();
1553 retParameters.addParameter(
"Error", ss.str());
1556 return SOAPUtilities::makeSOAPMessageReference(
1557 supervisorClassNoNamespace_ +
"FailRequest", retParameters);
1562 xoap::MessageReference FESupervisor::workLoopStatusRequest(
1563 xoap::MessageReference )
1565 if(!theFEInterfacesManager_)
1567 __SUP_SS__ <<
"Invalid request for front-end workloop status from Supervisor "
1568 "without a FEVInterfacesManager."
1573 return SOAPUtilities::makeSOAPMessageReference(
1575 ? CoreSupervisorBase::WORK_LOOP_DONE
1576 : CoreSupervisorBase::WORK_LOOP_WORKING));
1589 theFEInterfacesManager_ = 0;
1591 for(
unsigned int i = 0; i < theStateMachineImplementation_.size(); ++i)
1595 theFEInterfacesManager_ =
1597 if(!theFEInterfacesManager_)
1600 __SUP_SS__ <<
"Dynamic cast failure!" << __E__;
1603 __SUP_COUT__ <<
"State Machine " << i <<
" WAS of type FEVInterfacesManager"
1610 __SUP_COUT__ <<
"State Machine " << i
1611 <<
" was NOT of type FEVInterfacesManager" << __E__;
1615 __SUP_COUT__ <<
"theFEInterfacesManager pointer = " << theFEInterfacesManager_
1618 return theFEInterfacesManager_;
1622 void FESupervisor::transitionConfiguring(toolbox::Event::Reference )
1624 __SUP_COUT__ <<
"transitionConfiguring" << __E__;
1625 CoreSupervisorBase::configureInit();
1630 __COUTV__(CorePropertySupervisorBase::getSupervisorConfigurationPath());
1633 CorePropertySupervisorBase::getSupervisorTableNode();
1635 std::string metric_string =
"";
1636 bool metricStringSetup =
true;
1639 std::ostringstream oss;
1640 std::string tabString =
"";
1641 std::string commentsString =
"";
1643 oss, tabString, commentsString,
"" , feSupervisorNode);
1644 metric_string = oss.str();
1648 metricStringSetup =
false;
1654 __SUP_COUT__ <<
"Metric manager is not instantiated! Attempting to fix."
1656 metricMan = std::make_unique<artdaq::MetricManager>();
1658 std::string metricNamePreamble =
1659 feSupervisorNode.
getNode(
"/SlowControlsMetricManagerChannelNamePreamble")
1661 __SUP_COUTV__(metricNamePreamble);
1664 fhicl::ParameterSet metric_pset = fhicl::ParameterSet::make(metric_string);
1666 __SUP_COUTV__(metricNamePreamble);
1669 metricMan->initialize(metric_pset.get<fhicl::ParameterSet>(
"metrics"),
1670 metricNamePreamble);
1674 if(metricStringSetup)
1677 __SUP_COUT__ <<
"Ignore metric manager initialize error because metric "
1678 "string is not setup."
1681 __SUP_COUT__ <<
"transitionConfiguring metric manager(" << metricMan
1682 <<
") initialized = " << metricMan->Initialized() << __E__;
1684 catch(
const std::runtime_error& e)
1686 __SS__ <<
"Error loading metrics in FESupervisor::transitionConfiguring(): "
1687 << e.what() << __E__;
1688 __SUP_COUT_ERR__ << ss.str();
1692 theStateMachine_.setErrorMessage(ss.str());
1693 throw toolbox::fsm::exception::Exception(
1694 "Transition Error" ,
1696 "FESupervisor::transitionConfiguring" ,
1703 __SS__ <<
"Error loading metrics in FESupervisor::transitionConfiguring()"
1709 catch(
const std::exception& e)
1711 ss <<
"Exception message: " << e.what();
1716 __SUP_COUT_ERR__ << ss.str();
1720 theStateMachine_.setErrorMessage(ss.str());
1721 throw toolbox::fsm::exception::Exception(
1722 "Transition Error" ,
1724 "FESupervisor::transitionConfiguring" ,
1730 CoreSupervisorBase::transitionConfiguringFSMs();
1732 __SUP_COUT__ <<
"transitionConfiguring done." << __E__;
1738 __SUP_COUT__ <<
"transitionHalting" << __E__;
1739 TLOG_DEBUG(7) <<
"transitionHalting";
1743 const int timeoutSeconds = 30;
1744 const time_t deadline = time(0) + timeoutSeconds;
1745 bool hasOutstanding =
true;
1746 while(hasOutstanding && time(0) < deadline)
1749 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1750 hasOutstanding =
false;
1751 for(
const auto& t : asyncMacroTasks_)
1755 hasOutstanding =
true;
1762 __SUP_COUT__ <<
"Waiting for outstanding async macro tasks to complete "
1769 __SUP_COUT_WARN__ <<
"Timed out waiting for async macro tasks; "
1770 "proceeding with halt."
1772 std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1773 asyncMacroTasks_.clear();
1781 if(metricMan && metricMan->Initialized())
1783 TLOG_DEBUG(7) <<
"Metric manager(" << metricMan <<
") shutting down..."
1787 TLOG_DEBUG(7) <<
"Metric manager shutdown." << __E__;
1790 __SUP_COUT__ <<
"Metric manager(" << metricMan <<
") already shutdown."
1793 metricMan.reset(
nullptr);
1797 __SS__ <<
"Error shutting down metrics in FESupervisor::transitionHalting()"
1803 catch(
const std::exception& e)
1805 ss <<
"Exception message: " << e.what();
1810 __SUP_COUT_ERR__ << ss.str();
1814 theStateMachine_.setErrorMessage(ss.str());
1815 throw toolbox::fsm::exception::Exception(
1816 "Transition Error" ,
1818 "FESupervisor::transitionHalting" ,
1824 __SUP_COUT__ <<
"transitionHalting done." << __E__;
1829 const std::string& topic)
1831 if(dp_isInitialized_)
1838 dp_endpoint_ = endpoint;
1844 dp_socket_.bind(dp_endpoint_);
1846 catch(
const zmq::error_t& e)
1848 __SUP_SS__ <<
"initDataPublishing() - bind to zmq '" + dp_endpoint_ +
1849 "' failed: " + e.what()
1854 dp_isInitialized_ =
true;
1860 if(dp_isInitialized_)
1864 dp_socket_.set(zmq::sockopt::linger, 0);
1867 catch(
const zmq::error_t&)
1873 if(alsoCloseContext)
1874 dp_context_.close();
1876 catch(
const zmq::error_t&)
1880 dp_isInitialized_ =
false;
1894 if(!dp_isInitialized_)
1896 __SUP_SS__ <<
"publishData() called before zmq init()" << __E__;
1902 auto rc_topic = dp_socket_.send(zmq::buffer(dp_topic_), zmq::send_flags::sndmore);
1905 __SUP_SS__ <<
"publishData() - failed to send zmq topic" << __E__;
1911 if(dataPtr ==
nullptr && dataSize > 0)
1913 __SUP_SS__ <<
"FESupervisor::publishData() - dataPtr is nullptr but dataSize is "
1914 << dataSize << __E__;
1918 dp_socket_.send(zmq::buffer(dataPtr, dataSize), zmq::send_flags::none);
1921 __SUP_SS__ <<
"publishData() - failed to send zmq payload" << __E__;
static void insertMetricsBlock(std::ostream &out, std::string &tabStr, std::string &commentStr, const std::string &parentPath, ConfigurationTree daqNode)
insertMetricsBlock
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
T getValueWithDefault(const T &defaultValue) const
static void extractPermissionsMapFromString(const std::string &permissionsString, std::map< std::string, WebUsers::permissionLevel_t > &permissionsMap)
static bool doPermissionsGrantAccess(std::map< std::string, WebUsers::permissionLevel_t > &permissionLevelsMap, std::map< std::string, WebUsers::permissionLevel_t > &permissionThresholdsMap)
virtual void transitionHalting(toolbox::Event::Reference event)
virtual void request(const std::string &requestType, cgicc::Cgicc &cgiIn, HttpXmlDocument &xmlOut, const WebUsers::RequestUserInfo &userInfo)
void publishData(const char *dataPtr, size_t dataSize)
virtual void transitionHalting(toolbox::Event::Reference event) override
void initDataPublishing(const std::string &endpoint, const std::string &topic="test")
void closeDataPublishing(bool alsoCloseContext=true)
xoap::MessageReference macroMakerSupervisorRequest(xoap::MessageReference message)
const FEVInterface & getFEInterface(const std::string &interfaceID) const
getFEInterface
std::string getFEMacrosString(const std::string &supervisorName, const std::string &supervisorLid)
used by MacroMaker
FEVInterface * getFEInterfaceP(const std::string &interfaceID)
getFEInterfaceP
void runFEMacro(const std::string &interfaceID, const FEVInterface::frontEndMacroStruct_t &feMacro, const std::string &inputArgs, std::string &outputArgs)
used by MacroMaker and FE calling indirectly
unsigned int getInterfaceUniversalAddressSize(const std::string &interfaceID)
used by MacroMaker
void startFEMacroMultiDimensional(const std::string &requester, const std::string &interfaceID, const std::string &feMacroName, const bool enableSavingOutput, const std::string &outputFilePath, const std::string &outputFileRadix, const std::string &inputArgs)
used by iterator calling (i.e. FESupervisor)
void universalWrite(const std::string &interfaceID, char *address, char *writeValue)
used by MacroMaker
bool allFEWorkloopsAreDone(void)
used by Iterator, e.g.
void runFEMacroByFE(const std::string &callingInterfaceID, const std::string &interfaceID, const std::string &feMacroName, const std::string &inputArgs, std::string &outputArgs)
used by FE calling (i.e. FESupervisor)
unsigned int getInterfaceUniversalDataSize(const std::string &interfaceID)
used by MacroMaker
void universalRead(const std::string &interfaceID, char *address, char *returnValue)
used by MacroMaker
void startMacroMultiDimensional(const std::string &requester, const std::string &interfaceID, const std::string ¯oName, const std::string ¯oString, const bool enableSavingOutput, const std::string &outputFilePath, const std::string &outputFileRadix, const std::string &inputArgs)
used by iterator calling (i.e. FESupervisor)
bool checkMacroMultiDimensional(const std::string &interfaceID, const std::string ¯oName)
used by iterator calling (i.e. FESupervisor)
std::mutex frontEndCommunicationReceiveMutex_
FE communication helpers.
void runMacro(const std::string &interfaceID, const std::string ¯oObjectString, const std::string &inputArgs, std::string &outputArgs)
used by MacroMaker
std::string getFEListString(const std::string &supervisorLid)
used by MacroMaker
defines used also by OtsConfigurationWizardSupervisor
< members fully define a front-end macro function
static std::string mapToString(const std::map< std::string, T > &mapToReturn, const std::string &primaryDelimeter=", ", const std::string &secondaryDelimeter=": ")