otsdaq  3.09.00
FESupervisor.cc
1 #include "otsdaq/CoreSupervisors/FESupervisor.h"
2 #include "otsdaq/ConfigurationInterface/ConfigurationManager.h"
3 #include "otsdaq/FECore/FEVInterfacesManager.h"
4 #include "otsdaq/TablePlugins/ARTDAQTableBase/ARTDAQTableBase.h"
5 
6 #include "artdaq/DAQdata/Globals.hh" // instantiates artdaq::Globals::metricMan_
7 
8 #include <cstring>
9 #include <iostream>
10 #include <string>
11 
12 #include "artdaq-core/Utilities/ExceptionHandler.hh" /*for artdaq::ExceptionHandler*/
13 
36 using namespace ots;
37 
38 XDAQ_INSTANTIATOR_IMPL(FESupervisor)
39 
40 //==============================================================================
41 FESupervisor::FESupervisor(xdaq::ApplicationStub* stub)
42  : CoreSupervisorBase(stub)
43  , dp_context_{1} // 1 I/O thread – sufficient for most tests
44  , dp_socket_{dp_context_, zmq::socket_type::pub}
45 {
46  __SUP_COUT__ << "Constructing..." << __E__;
47 
48  xoap::bind(this,
49  &FESupervisor::macroMakerSupervisorRequest,
50  "MacroMakerSupervisorRequest",
51  XDAQ_NS_URI);
52 
53  xoap::bind(
54  this, &FESupervisor::workLoopStatusRequest, "WorkLoopStatusRequest", XDAQ_NS_URI);
55 
56  xoap::bind(this,
57  &FESupervisor::frontEndCommunicationRequest,
58  "FECommunication",
59  XDAQ_NS_URI);
60 
61  std::string dataPublishingEndpoint;
62  try
63  {
64  dataPublishingEndpoint = getSupervisorProperty(
65  "data_publishing_endpoint"); //like "tcp://0.0.0.0:$((OTS_MAIN_PORT-2))"
66  }
67  catch(const std::runtime_error& e)
68  {
69  __SUP_COUT__ << "Data publishing property 'data_publishing_endpoint' not set. "
70  "Defaulting to data publishing disabled."
71  << __E__;
72 
73  try
74  {
75  dataPublishingEndpoint = __ENV__(
76  "DATA_PUBLISHING_ENDPOINT"); //like "tcp://0.0.0.0:$((OTS_MAIN_PORT-2))"
77  }
78  catch(const std::runtime_error& e)
79  {
80  __SUP_COUT__
81  << "Data publishing environment variable 'DATA_PUBLISHING_ENDPOINT' "
82  "not set. Defaulting to data publishing disabled."
83  << __E__;
84  }
85  }
86  __SUP_COUT_INFO__ << "dataPublishingEndpoint = " << dataPublishingEndpoint << __E__;
87 
88  if(dataPublishingEndpoint.size())
89  {
90  __SUP_COUT__ << "Initializing data publishing to endpoint '"
91  << dataPublishingEndpoint << "'..." << __E__;
92 
93  initDataPublishing(dataPublishingEndpoint,
94  getSupervisorProperty("data_publishing_topic", ""));
95  }
96 
97  try
98  {
99  CoreSupervisorBase::theStateMachineImplementation_.push_back(
101  CorePropertySupervisorBase::getContextTreeNode(),
102  CorePropertySupervisorBase::getSupervisorConfigurationPath()));
103  }
104  catch(...)
105  {
106  if(CorePropertySupervisorBase::allSupervisorInfo_.isMacroMakerMode())
107  {
108  __SUP_COUT_WARN__ << "Error caught constructing FE Interface Manager. In "
109  "Macro Maker mode, the input fhicl defines the "
110  "configuration tree, make sure you specified a valid "
111  "fcl file path."
112  << __E__;
113  try
114  {
115  throw;
116  }
117  catch(const std::runtime_error& e)
118  {
119  __SUP_COUT_WARN__ << e.what() << __E__;
120  throw;
121  }
122  }
123  throw;
124  }
125 
126  extractFEInterfacesManager();
127 
128  __SUP_COUT__ << "Constructed." << __E__;
129 
130  if(CorePropertySupervisorBase::allSupervisorInfo_.isMacroMakerMode())
131  {
132  __SUP_COUT_INFO__ << "Macro Maker mode, so configuring at startup!" << __E__;
133  if(!theFEInterfacesManager_)
134  {
135  __SUP_SS__ << "Missing FE Interface manager!" << __E__;
136  __SUP_SS_THROW__;
137  }
138 
139  // copied from CoreSupervisorBase::transitionConfiguring()
140 
141  // Now that the configuration manager has all the necessary configurations,
142  // create all objects that depend on the configuration (the first iteration)
143 
144  try
145  {
146  __SUP_COUT__ << "Configuring all state machine implementations..." << __E__;
147  __SUP_COUTV__(stateMachinesIterationDone_.size());
148  preStateMachineExecutionLoop();
149  __SUP_COUTV__(stateMachinesIterationDone_.size());
150  for(unsigned int i = 0; i < theStateMachineImplementation_.size(); ++i)
151  {
152  __SUP_COUT__ << "Configuring state machine i " << i << __E__;
153 
154  // if one state machine is doing a sub-iteration, then target that one
155  if(subIterationWorkStateMachineIndex_ != (unsigned int)-1 &&
156  i != subIterationWorkStateMachineIndex_)
157  continue; // skip those not in the sub-iteration
158 
159  if(stateMachinesIterationDone_[i])
160  continue; // skip state machines already done
161 
162  preStateMachineExecution(i);
163  theStateMachineImplementation_[i]->parentSupervisor_ =
164  this; // for backwards compatibility, kept out of configure
165  // parameters
166  theStateMachineImplementation_[i]->configure(); // e.g. for FESupervisor,
167  // this is configure of
168  // FEVInterfacesManager
169  postStateMachineExecution(i);
170  }
171  postStateMachineExecutionLoop();
172 
173  //ony indicate alive after configure
174  CorePropertySupervisorBase::indicateOtsAlive(0);
175  }
176  catch(const std::runtime_error& e)
177  {
178  __SUP_SS__ << "Error was caught while configuring: " << e.what() << __E__;
179  __SUP_COUT_ERR__ << "\n" << ss.str();
180  throw;
181  }
182  catch(...)
183  {
184  __SUP_SS__
185  << "Unknown error was caught while configuring. Please checked the logs."
186  << __E__;
187  try
188  {
189  throw;
190  } //one more try to printout extra info
191  catch(const std::exception& e)
192  {
193  ss << "Exception message: " << e.what();
194  }
195  catch(...)
196  {
197  }
198  __SUP_COUT_ERR__ << "\n" << ss.str();
199  throw;
200  }
201  } // end Macro Maker mode initial configure
202 } // end constructor
203 
204 //==============================================================================
205 FESupervisor::~FESupervisor(void)
206 {
207  __SUP_COUT__ << "Destroying..." << __E__;
208  // theStateMachineImplementation_ is reset and the object it points to deleted in
209  // ~CoreSupervisorBase()
210 
211  // Destructor must never throw, so we swallow any zmq_error.
212  try
213  {
215  }
216  catch(...)
217  {
218  // nothing – best‑effort cleanup.
219  }
220 
221  artdaq::Globals::CleanUpGlobals(); // destruct metricManager (among other things)
222 
223  __SUP_COUT__ << "Destructed." << __E__;
224 } // end destructor
225 
226 //==============================================================================
227 xoap::MessageReference FESupervisor::frontEndCommunicationRequest(
228  xoap::MessageReference message)
229 try
230 {
231  // LORE__SUP_COUT__ << "FE Request received: " << SOAPUtilities::translate(message) << __E__;
232 
233  if(!theFEInterfacesManager_)
234  {
235  __SUP_SS__ << "No FE Interface Manager!" << __E__;
236  __SUP_SS_THROW__;
237  }
238  SOAPParameters typeParameter, rxParameters; // params for xoap to recv
239  typeParameter.addParameter("type");
240  SOAPUtilities::receive(message, typeParameter);
241 
242  std::string type = typeParameter.getValue("type");
243 
244  // types
245  // feSend
246  // feMacro
247  // feMacroMultiDimensionalStart
248  // macroMultiDimensionalStart
249  // feMacroMultiDimensionalCheck
250  // macroMultiDimensionalCheck
251 
252  rxParameters.addParameter("requester");
253  rxParameters.addParameter("targetInterfaceID");
254 
255  if(type == "feSend")
256  {
257  __SUP_COUTV__(type);
258 
259  rxParameters.addParameter("value");
260  SOAPUtilities::receive(message, rxParameters);
261 
262  std::string requester = rxParameters.getValue("requester");
263  std::string targetInterfaceID = rxParameters.getValue("targetInterfaceID");
264  std::string value = rxParameters.getValue("value");
265 
266  __SUP_COUTV__(requester);
267  __SUP_COUTV__(targetInterfaceID);
268  __SUP_COUTV__(value);
269 
270  // test that the interface exists
271  theFEInterfacesManager_->getFEInterface(targetInterfaceID);
272 
273  // mutex scope
274  {
275  std::lock_guard<std::mutex> lock(
276  theFEInterfacesManager_->frontEndCommunicationReceiveMutex_);
277 
278  theFEInterfacesManager_
279  ->frontEndCommunicationReceiveBuffer_[targetInterfaceID][requester]
280  .emplace(value);
281 
282  __SUP_COUT__ << "Number of target interface ID '" << targetInterfaceID
283  << "' buffers: "
284  << theFEInterfacesManager_
285  ->frontEndCommunicationReceiveBuffer_[targetInterfaceID]
286  .size()
287  << __E__;
288  __SUP_COUT__
289  << "Number of source interface ID '" << requester << "' values received: "
290  << theFEInterfacesManager_
291  ->frontEndCommunicationReceiveBuffer_[targetInterfaceID][requester]
292  .size()
293  << __E__;
294  }
295  return SOAPUtilities::makeSOAPMessageReference("Received");
296  } // end type feSend
297  else if(type == "feMacro")
298  {
299  __SUP_COUTV__(type);
300 
301  rxParameters.addParameter("feMacroName");
302  rxParameters.addParameter("inputArgs");
303 
304  SOAPUtilities::receive(message, rxParameters);
305 
306  std::string requester = rxParameters.getValue("requester");
307  std::string targetInterfaceID = rxParameters.getValue("targetInterfaceID");
308  std::string feMacroName = rxParameters.getValue("feMacroName");
309  std::string inputArgs = rxParameters.getValue("inputArgs");
310 
311  __SUP_COUTV__(requester);
312  __SUP_COUTV__(targetInterfaceID);
313  __SUP_COUTV__(feMacroName);
314  __SUP_COUTV__(inputArgs);
315 
316  std::string outputArgs;
317  try
318  {
319  theFEInterfacesManager_->runFEMacroByFE(
320  requester, targetInterfaceID, feMacroName, inputArgs, outputArgs);
321  }
322  catch(std::runtime_error& e)
323  {
324  __SUP_SS__ << "In Supervisor with LID="
325  << getApplicationDescriptor()->getLocalId()
326  << " the FE Macro named '" << feMacroName << "' with target FE '"
327  << targetInterfaceID << "' failed. Here is the error:\n\n"
328  << e.what() << __E__;
329  __SUP_SS_THROW__;
330  }
331  catch(...)
332  {
333  __SUP_SS__ << "In Supervisor with LID="
334  << getApplicationDescriptor()->getLocalId()
335  << " the FE Macro named '" << feMacroName << "' with target FE '"
336  << targetInterfaceID << "' failed due to an unknown error."
337  << __E__;
338  try
339  {
340  throw;
341  } //one more try to printout extra info
342  catch(const std::exception& e)
343  {
344  ss << "Exception message: " << e.what();
345  }
346  catch(...)
347  {
348  }
349  __SUP_SS_THROW__;
350  }
351 
352  __SUP_COUTV__(outputArgs);
353 
354  xoap::MessageReference replyMessage =
355  SOAPUtilities::makeSOAPMessageReference("feMacrosResponse");
356  SOAPParameters txParameters;
357  txParameters.addParameter("requester", requester);
358  txParameters.addParameter("targetInterfaceID", targetInterfaceID);
359  txParameters.addParameter("feMacroName", feMacroName);
360  txParameters.addParameter("outputArgs", outputArgs);
361  SOAPUtilities::addParameters(replyMessage, txParameters);
362 
363  __SUP_COUT__ << "Sending FE macro result: "
364  << SOAPUtilities::translate(replyMessage) << __E__;
365 
366  return replyMessage;
367  } // end type feMacro
368  else if(type == "feMacroMultiDimensionalStart" || // from iterator
369  type == "macroMultiDimensionalStart") // from iterator
370  {
371  __SUP_COUTV__(type);
372 
373  if(type[0] == 'm')
374  {
375  rxParameters.addParameter("macroString");
376  rxParameters.addParameter("macroName");
377  }
378  else
379  rxParameters.addParameter("feMacroName");
380 
381  rxParameters.addParameter("enableSavingOutput");
382  rxParameters.addParameter("outputFilePath");
383  rxParameters.addParameter("outputFileRadix");
384  rxParameters.addParameter("inputArgs");
385 
386  SOAPUtilities::receive(message, rxParameters);
387 
388  std::string requester = rxParameters.getValue("requester");
389  std::string targetInterfaceID = rxParameters.getValue("targetInterfaceID");
390  std::string macroName, macroString;
391  if(type[0] == 'm')
392  {
393  macroName = rxParameters.getValue("macroName");
394  macroString = rxParameters.getValue("macroString");
395  __SUP_COUTV__(macroString);
396  }
397  else
398  macroName = rxParameters.getValue("feMacroName");
399  bool enableSavingOutput = rxParameters.getValue("enableSavingOutput") == "1";
400  std::string outputFilePath = rxParameters.getValue("outputFilePath");
401  std::string outputFileRadix = rxParameters.getValue("outputFileRadix");
402  std::string inputArgs = rxParameters.getValue("inputArgs");
403 
404  __SUP_COUTV__(requester);
405  __SUP_COUTV__(targetInterfaceID);
406  __SUP_COUTV__(macroName);
407  __SUP_COUTV__(enableSavingOutput);
408  __SUP_COUTV__(outputFilePath);
409  __SUP_COUTV__(outputFileRadix);
410  __SUP_COUTV__(inputArgs);
411 
412  if(type[0] == 'm') // start Macro
413  {
414  try
415  {
416  theFEInterfacesManager_->startMacroMultiDimensional(requester,
417  targetInterfaceID,
418  macroName,
419  macroString,
420  enableSavingOutput,
421  outputFilePath,
422  outputFileRadix,
423  inputArgs);
424  }
425  catch(std::runtime_error& e)
426  {
427  __SUP_SS__ << "In Supervisor with LID="
428  << getApplicationDescriptor()->getLocalId()
429  << " the Macro named '" << macroName << "' with target FE '"
430  << targetInterfaceID
431  << "' failed to start multi-dimensional launch. "
432  << "Here is the error:\n\n"
433  << e.what() << __E__;
434  __SUP_SS_THROW__;
435  }
436  catch(...)
437  {
438  __SUP_SS__ << "In Supervisor with LID="
439  << getApplicationDescriptor()->getLocalId()
440  << " the Macro named '" << macroName << "' with target FE '"
441  << targetInterfaceID
442  << "' failed to start multi-dimensional launch "
443  << "due to an unknown error." << __E__;
444  try
445  {
446  throw;
447  } //one more try to printout extra info
448  catch(const std::exception& e)
449  {
450  ss << "Exception message: " << e.what();
451  }
452  catch(...)
453  {
454  }
455  __SUP_SS_THROW__;
456  }
457  }
458  else // start FE Macro
459  {
460  try
461  {
462  theFEInterfacesManager_->startFEMacroMultiDimensional(requester,
463  targetInterfaceID,
464  macroName,
465  enableSavingOutput,
466  outputFilePath,
467  outputFileRadix,
468  inputArgs);
469  }
470  catch(std::runtime_error& e)
471  {
472  __SUP_SS__ << "In Supervisor with LID="
473  << getApplicationDescriptor()->getLocalId()
474  << " the FE Macro named '" << macroName << "' with target FE '"
475  << targetInterfaceID
476  << "' failed to start multi-dimensional launch. "
477  << "Here is the error:\n\n"
478  << e.what() << __E__;
479  __SUP_SS_THROW__;
480  }
481  catch(...)
482  {
483  __SUP_SS__ << "In Supervisor with LID="
484  << getApplicationDescriptor()->getLocalId()
485  << " the FE Macro named '" << macroName << "' with target FE '"
486  << targetInterfaceID
487  << "' failed to start multi-dimensional launch "
488  << "due to an unknown error." << __E__;
489  try
490  {
491  throw;
492  } //one more try to printout extra info
493  catch(const std::exception& e)
494  {
495  ss << "Exception message: " << e.what();
496  }
497  catch(...)
498  {
499  }
500  __SUP_SS_THROW__;
501  }
502  }
503 
504  xoap::MessageReference replyMessage =
505  SOAPUtilities::makeSOAPMessageReference(type + "Done");
506  SOAPParameters txParameters;
507  // txParameters.addParameter("started", "1");
508  SOAPUtilities::addParameters(replyMessage, txParameters);
509 
510  __SUP_COUT__ << "Sending FE macro result: "
511  << SOAPUtilities::translate(replyMessage) << __E__;
512 
513  return replyMessage;
514  } // end type (fe)MacroMultiDimensionalStart
515  else if(type == "feMacroMultiDimensionalCheck" || // from iterator
516  type == "macroMultiDimensionalCheck")
517  {
518  // LORE__SUP_COUTV__(type);
519  if(type[0] == 'm')
520  rxParameters.addParameter("macroName");
521  else
522  rxParameters.addParameter("feMacroName");
523  rxParameters.addParameter("targetInterfaceID");
524 
525  SOAPUtilities::receive(message, rxParameters);
526 
527  std::string targetInterfaceID = rxParameters.getValue("targetInterfaceID");
528  std::string macroName;
529  if(type[0] == 'm')
530  macroName = rxParameters.getValue("macroName");
531  else
532  macroName = rxParameters.getValue("feMacroName");
533 
534  // LORE__SUP_COUTV__(targetInterfaceID);
535  // LORE__SUP_COUTV__(macroName);
536 
537  bool done = false;
538  try
539  {
540  done = theFEInterfacesManager_->checkMacroMultiDimensional(targetInterfaceID,
541  macroName);
542  }
543  catch(std::runtime_error& e)
544  {
545  __SUP_SS__ << "In Supervisor with LID="
546  << getApplicationDescriptor()->getLocalId()
547  << " the FE Macro named '" << macroName << "' with target FE '"
548  << targetInterfaceID
549  << "' failed to check multi-dimensional launch. "
550  << "Here is the error:\n\n"
551  << e.what() << __E__;
552  __SUP_SS_THROW__;
553  }
554  catch(...)
555  {
556  __SUP_SS__ << "In Supervisor with LID="
557  << getApplicationDescriptor()->getLocalId()
558  << " the FE Macro named '" << macroName << "' with target FE '"
559  << targetInterfaceID
560  << "' failed to check multi-dimensional launch "
561  << "due to an unknown error." << __E__;
562  try
563  {
564  throw;
565  } //one more try to printout extra info
566  catch(const std::exception& e)
567  {
568  ss << "Exception message: " << e.what();
569  }
570  catch(...)
571  {
572  }
573  __SUP_SS_THROW__;
574  }
575 
576  xoap::MessageReference replyMessage =
577  SOAPUtilities::makeSOAPMessageReference(type + "Done");
578  SOAPParameters txParameters;
579  txParameters.addParameter("Done", done ? "1" : "0");
580  SOAPUtilities::addParameters(replyMessage, txParameters);
581 
582  // LORE__SUP_COUT__ << "Sending FE macro result: " << SOAPUtilities::translate(replyMessage) << __E__;
583 
584  return replyMessage;
585  } // end type (fe)MacroMultiDimensionalCheck
586  else
587  {
588  __SUP_SS__ << "Unrecognized FE Communication type: " << type << __E__;
589  __SUP_SS_THROW__;
590  }
591 }
592 catch(const std::runtime_error& e)
593 {
594  __SUP_SS__ << "Error encountered processing FE communication request: " << e.what()
595  << __E__;
596  __SUP_COUT_ERR__ << ss.str();
597 
598  SOAPParameters parameters;
599  parameters.addParameter("Error", ss.str());
600  return SOAPUtilities::makeSOAPMessageReference(
601  supervisorClassNoNamespace_ + "FailFECommunicationRequest", parameters);
602 }
603 catch(...)
604 {
605  __SUP_SS__ << "Unknown error encountered processing FE communication request."
606  << __E__;
607  try
608  {
609  throw;
610  } //one more try to printout extra info
611  catch(const std::exception& e)
612  {
613  ss << "Exception message: " << e.what();
614  }
615  catch(...)
616  {
617  }
618  __SUP_COUT_ERR__ << ss.str();
619 
620  SOAPParameters parameters;
621  parameters.addParameter("Error", ss.str());
622  return SOAPUtilities::makeSOAPMessageReference(
623  supervisorClassNoNamespace_ + "FailFECommunicationRequest", parameters);
624 } // end frontEndCommunicationRequest()
625 
626 //==============================================================================
634  xoap::MessageReference message)
635 {
636  __SUP_COUT__ << "$$$$$$$$$$$$$$$$$" << __E__;
637 
638  // receive request parameters
639  SOAPParameters parameters;
640  parameters.addParameter("Request");
641 
642  __SUP_COUT__ << "Received Macro Maker message: " << SOAPUtilities::translate(message)
643  << __E__;
644 
645  SOAPUtilities::receive(message, parameters);
646  std::string request = parameters.getValue("Request");
647 
648  __SUP_COUT__ << "request: " << request << __E__;
649 
650  // request types:
651  // GetInterfaces
652  // UniversalWrite
653  // UniversalRead
654  // GetInterfaceMacros
655  // RunInterfaceMacro
656  // RunMacroMakerMacro
657 
658  SOAPParameters retParameters;
659 
660  try
661  {
662  if(request == "GetInterfaces")
663  {
664  if(theFEInterfacesManager_)
665  retParameters.addParameter(
666  "FEList",
667  theFEInterfacesManager_->getFEListString(
668  std::to_string(getApplicationDescriptor()->getLocalId())));
669  else // if no FE interfaces, return empty string
670  retParameters.addParameter("FEList", "");
671 
672  // if errors in state machine, send also
673  if(theStateMachine_.getErrorMessage() != "")
674  retParameters.addParameter("frontEndError",
675  theStateMachine_.getErrorMessage());
676 
677  return SOAPUtilities::makeSOAPMessageReference(
678  supervisorClassNoNamespace_ + "Response", retParameters);
679  }
680  else if(request == "UniversalWrite")
681  {
682  if(!theFEInterfacesManager_)
683  {
684  __SUP_SS__ << "No FE Interface Manager! Are you configured?" << __E__;
685  __SUP_SS_THROW__;
686  }
687  // params for running macros
688  SOAPParameters requestParameters;
689  requestParameters.addParameter("InterfaceID");
690  requestParameters.addParameter("Address");
691  requestParameters.addParameter("Data");
692  SOAPUtilities::receive(message, requestParameters);
693  std::string interfaceID = requestParameters.getValue("InterfaceID");
694  std::string addressStr = requestParameters.getValue("Address");
695  std::string dataStr = requestParameters.getValue("Data");
696 
697  __SUP_COUT__ << "Address: " << addressStr << " Data: " << dataStr
698  << " InterfaceID: " << interfaceID << __E__;
699 
700  // parameters interface index!
701  // unsigned int index = stoi(indexStr); // As long as the supervisor has only
702  // one interface, this index will remain 0?
703 
704  __SUP_COUT__
705  << "theFEInterfacesManager_->getInterfaceUniversalAddressSize(index) "
706  << theFEInterfacesManager_->getInterfaceUniversalAddressSize(interfaceID)
707  << __E__;
708  __SUP_COUT__
709  << "theFEInterfacesManager_->getInterfaceUniversalDataSize(index) "
710  << theFEInterfacesManager_->getInterfaceUniversalDataSize(interfaceID)
711  << __E__;
712 
713  // Converting std::string to char*
714  // char address
715 
716  char tmpHex[3]; // for use converting hex to binary
717  tmpHex[2] = '\0';
718 
719  __SUP_COUT__ << "Translating address: ";
720 
721  std::string addressTmp;
722  addressTmp.reserve(
723  theFEInterfacesManager_->getInterfaceUniversalAddressSize(interfaceID));
724  char* address = &addressTmp[0];
725 
726  if(addressStr.size() % 2) // if odd, make even
727  addressStr = "0" + addressStr;
728  unsigned int i = 0;
729  for(; i < addressStr.size() &&
730  i / 2 < theFEInterfacesManager_->getInterfaceUniversalAddressSize(
731  interfaceID);
732  i += 2)
733  {
734  tmpHex[0] = addressStr[addressStr.size() - 1 - i - 1];
735  tmpHex[1] = addressStr[addressStr.size() - 1 - i];
736  sscanf(tmpHex, "%hhX", (unsigned char*)&address[i / 2]);
737  printf("%2.2X", (unsigned char)address[i / 2]);
738  }
739  // finish and fill with 0s
740  for(; i / 2 <
741  theFEInterfacesManager_->getInterfaceUniversalAddressSize(interfaceID);
742  i += 2)
743  {
744  address[i / 2] = 0;
745  printf("%2.2X", (unsigned char)address[i / 2]);
746  }
747 
748  std::cout << __E__;
749 
750  __SUP_COUT__ << "Translating data: ";
751 
752  std::string dataTmp;
753  dataTmp.reserve(
754  theFEInterfacesManager_->getInterfaceUniversalDataSize(interfaceID));
755  char* data = &dataTmp[0];
756 
757  if(dataStr.size() % 2) // if odd, make even
758  dataStr = "0" + dataStr;
759 
760  i = 0;
761  for(; i < dataStr.size() &&
762  i / 2 <
763  theFEInterfacesManager_->getInterfaceUniversalDataSize(interfaceID);
764  i += 2)
765  {
766  tmpHex[0] = dataStr[dataStr.size() - 1 - i - 1];
767  tmpHex[1] = dataStr[dataStr.size() - 1 - i];
768  sscanf(tmpHex, "%hhX", (unsigned char*)&data[i / 2]);
769  printf("%2.2X", (unsigned char)data[i / 2]);
770  }
771  // finish and fill with 0s
772  for(; i / 2 <
773  theFEInterfacesManager_->getInterfaceUniversalDataSize(interfaceID);
774  i += 2)
775  {
776  data[i / 2] = 0;
777  printf("%2.2X", (unsigned char)data[i / 2]);
778  }
779 
780  std::cout << __E__;
781 
782  // char* address = new char[addressStr.size() + 1];
783  // std::copy(addressStr.begin(), addressStr.end(), address);
784  // address[addressStr.size()] = '\0';
785  // char* data = new char[dataStr.size() + 1];
786  // std::copy(dataStr.begin(), dataStr.end(), data);
787  // data[dataStr.size()] = '\0';
788 
789  theFEInterfacesManager_->universalWrite(interfaceID, address, data);
790 
791  // delete[] address;
792  // delete[] data;
793 
794  return SOAPUtilities::makeSOAPMessageReference(
795  supervisorClassNoNamespace_ + "DataWritten", retParameters);
796  }
797  else if(request == "UniversalRead")
798  {
799  if(!theFEInterfacesManager_)
800  {
801  __SUP_SS__ << "No FE Interface Manager! Are you configured?" << __E__;
802  __SUP_SS_THROW__;
803  }
804 
805  // params for running macros
806  SOAPParameters requestParameters;
807  requestParameters.addParameter("InterfaceID");
808  requestParameters.addParameter("Address");
809  SOAPUtilities::receive(message, requestParameters);
810  std::string interfaceID = requestParameters.getValue("InterfaceID");
811  std::string addressStr = requestParameters.getValue("Address");
812 
813  __SUP_COUT__ << "Address: " << addressStr << " InterfaceID: " << interfaceID
814  << __E__;
815 
816  // parameters interface index!
817  // parameter address and data
818  // unsigned int index = stoi(indexStr); // As long as the supervisor has only
819  // one interface, this index will remain 0?
820 
821  __SUP_COUT__
822  << "theFEInterfacesManager_->getInterfaceUniversalAddressSize(index) "
823  << theFEInterfacesManager_->getInterfaceUniversalAddressSize(interfaceID)
824  << __E__;
825  __SUP_COUT__
826  << "theFEInterfacesManager_->getInterfaceUniversalDataSize(index) "
827  << theFEInterfacesManager_->getInterfaceUniversalDataSize(interfaceID)
828  << __E__;
829 
830  char tmpHex[3]; // for use converting hex to binary
831  tmpHex[2] = '\0';
832 
833  __SUP_COUT__ << "Translating address: ";
834 
835  std::string addressTmp;
836  addressTmp.reserve(
837  theFEInterfacesManager_->getInterfaceUniversalAddressSize(interfaceID));
838  char* address = &addressTmp[0];
839 
840  if(addressStr.size() % 2) // if odd, make even
841  addressStr = "0" + addressStr;
842 
843  unsigned int i = 0;
844  for(; i < addressStr.size() &&
845  i / 2 < theFEInterfacesManager_->getInterfaceUniversalAddressSize(
846  interfaceID);
847  i += 2)
848  {
849  tmpHex[0] = addressStr[addressStr.size() - 1 - i - 1];
850  tmpHex[1] = addressStr[addressStr.size() - 1 - i];
851  sscanf(tmpHex, "%hhX", (unsigned char*)&address[i / 2]);
852  printf("%2.2X", (unsigned char)address[i / 2]);
853  }
854  // finish and fill with 0s
855  for(; i / 2 <
856  theFEInterfacesManager_->getInterfaceUniversalAddressSize(interfaceID);
857  i += 2)
858  {
859  address[i / 2] = 0;
860  printf("%2.2X", (unsigned char)address[i / 2]);
861  }
862 
863  std::cout << __E__;
864 
865  unsigned int dataSz =
866  theFEInterfacesManager_->getInterfaceUniversalDataSize(interfaceID);
867  std::string dataStr;
868  dataStr.resize(dataSz);
869  char* data = &dataStr[0];
870 
871  // std::string result =
872  // theFEInterfacesManager_->universalRead(index,address,data);
873  // __SUP_COUT__<< result << __E__ << __E__;
874 
875  try
876  {
877  theFEInterfacesManager_->universalRead(interfaceID, address, data);
878  }
879  catch(const std::runtime_error& e)
880  {
881  // do not allow read exception to crash everything when a macromaker
882  // command
883  __SUP_COUT_ERR__ << "Exception caught during read: " << e.what() << __E__;
884  retParameters.addParameter("dataResult", "Time Out Error");
885  return SOAPUtilities::makeSOAPMessageReference(
886  supervisorClassNoNamespace_ + "aa", retParameters);
887  }
888  catch(...)
889  {
890  // do not allow read exception to crash everything when a macromaker
891  // command
892  __SUP_COUT_ERR__ << "Exception caught during read." << __E__;
893  retParameters.addParameter("dataResult", "Time Out Error");
894  return SOAPUtilities::makeSOAPMessageReference(
895  supervisorClassNoNamespace_ + "aa", retParameters);
896  }
897 
898  // if dataSz is less than 8 show what the unsigned number would be
899  if(dataSz <= 8)
900  {
901  std::string str8(data);
902  str8.resize(8);
903  __SUP_COUT__ << "decResult[" << dataSz
904  << " bytes]: " << *((unsigned long long*)(&str8[0]))
905  << __E__;
906  }
907 
908  std::string hexResultStr;
909  hexResultStr.reserve(dataSz * 2 + 1);
910  char* hexResult = &hexResultStr[0];
911  // go through each byte and convert it to hex value (i.e. 2 0-F chars)
912  // go backwards through source data since should be provided in host order
913  // (i.e. a cast to unsigned long long should acquire expected value)
914  for(unsigned int i = 0; i < dataSz; ++i)
915  {
916  sprintf(&hexResult[i * 2], "%2.2X", (unsigned char)data[dataSz - 1 - i]);
917  }
918 
919  __SUP_COUT__ << "hexResult[" << strlen(hexResult)
920  << " nibbles]: " << std::string(hexResult) << __E__;
921 
922  retParameters.addParameter("dataResult", hexResult);
923  return SOAPUtilities::makeSOAPMessageReference(
924  supervisorClassNoNamespace_ + "aa", retParameters);
925  }
926  else if(request == "GetInterfaceMacros")
927  {
928  if(theFEInterfacesManager_)
929  {
930  __SUP_COUT__ << "Getting FE Macros from FE Interface Manager..." << __E__;
931  retParameters.addParameter(
932  "FEMacros",
933  theFEInterfacesManager_->getFEMacrosString(
934  CorePropertySupervisorBase::getSupervisorUID(),
935  std::to_string(CoreSupervisorBase::getSupervisorLID())));
936  }
937  else
938  {
939  __SUP_COUT__ << "No FE Macros because there is no FE Interface Manager..."
940  << __E__;
941  retParameters.addParameter("FEMacros", "");
942  }
943 
944  return SOAPUtilities::makeSOAPMessageReference(
945  supervisorClassNoNamespace_ + "Response", retParameters);
946  }
947  else if(request == "RunInterfaceMacro")
948  {
949  if(!theFEInterfacesManager_)
950  {
951  __SUP_SS__ << "Missing FE Interface Manager! Are you configured?"
952  << __E__;
953  __SUP_SS_THROW__;
954  }
955 
956  // params for running macros
957  SOAPParameters requestParameters;
958  requestParameters.addParameter("feMacroName");
959  requestParameters.addParameter("inputArgs");
960  requestParameters.addParameter("outputArgs");
961  requestParameters.addParameter("InterfaceID");
962  requestParameters.addParameter("userPermissions");
963  requestParameters.addParameter("AsyncSupported");
964  SOAPUtilities::receive(message, requestParameters);
965  std::string interfaceID = requestParameters.getValue("InterfaceID");
966  std::string feMacroName = requestParameters.getValue("feMacroName");
967  std::string inputArgs = requestParameters.getValue("inputArgs");
968  std::string outputArgs = requestParameters.getValue("outputArgs");
969  std::string userPermissions = requestParameters.getValue("userPermissions");
970  bool asyncSupported = requestParameters.getValue("AsyncSupported") == "1";
971 
972  // check user permission
973  // userPermissions = "allUsers:1";
974  // userPermissions = "allUsers:1 & TDAQ:255";
975  __COUTV__(userPermissions);
976  std::map<std::string, WebUsers::permissionLevel_t> userPermissionLevelsMap;
978  userPermissions, userPermissionLevelsMap);
979  __COUTV__(StringMacros::mapToString(userPermissionLevelsMap));
980 
981  // outputArgs must be filled with the proper argument names
982  // and then the response output values will be returned in the string.
983 
984  // check for interfaceID
985  FEVInterface* fe = theFEInterfacesManager_->getFEInterfaceP(interfaceID);
986 
987  // have pointer to virtual FEInterface, find Macro structure
988  auto FEMacroIt = fe->getMapOfFEMacroFunctions().find(feMacroName);
989  if(FEMacroIt == fe->getMapOfFEMacroFunctions().end())
990  {
991  __SUP_SS__ << "FE Macro '" << feMacroName << "' of interfaceID '"
992  << interfaceID << "' was not found!" << __E__;
993  __SUP_SS_THROW__;
994  }
995  const FEVInterface::frontEndMacroStruct_t& FEMacro = FEMacroIt->second;
996 
997  __COUTV__(FEMacro.requiredUserPermissions_);
998  std::map<std::string, WebUsers::permissionLevel_t>
999  FERequiredUserPermissionsMap;
1001  FEMacro.requiredUserPermissions_, FERequiredUserPermissionsMap);
1002  __COUTV__(StringMacros::mapToString(FERequiredUserPermissionsMap));
1003 
1005  userPermissionLevelsMap, FERequiredUserPermissionsMap))
1006  {
1007  __SUP_SS__ << "Invalid user permission for FE Macro '" << feMacroName
1008  << "' of interfaceID '" << interfaceID << "'!\n\n"
1009  << "Must have access level of at least '"
1010  << StringMacros::mapToString(FERequiredUserPermissionsMap)
1011  << ".' Users permissions level is only '" << userPermissions
1012  << ".'" << __E__;
1013  __SUP_SS_THROW__;
1014  }; // skip icon if no access
1015 
1016  if(asyncSupported)
1017  {
1018  // Async path: launch macro in a thread, return NotDoneTaskID immediately
1019  auto task = std::make_shared<AsyncMacroTask>();
1020  {
1021  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1022  task->taskID = ++asyncMacroTaskIDCounter_;
1023  if(asyncMacroTaskIDCounter_ == 0)
1024  task->taskID = ++asyncMacroTaskIDCounter_;
1025  task->interfaceID = interfaceID;
1026  task->startTime = time(0);
1027  asyncMacroTasks_.push_back(task);
1028  }
1029 
1030  FEVInterface::frontEndMacroStruct_t localFEMacro = FEMacro;
1031  std::thread([this,
1032  task,
1033  interfaceID,
1034  localFEMacro,
1035  inputArgs,
1036  outputArgs]() mutable {
1037  {
1038  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1039  task->threadID = std::this_thread::get_id();
1040  }
1041  // Clear stale progress in case thread ID was reused
1042  try
1043  {
1044  theFEInterfacesManager_->getFEInterfaceP(interfaceID)
1045  ->clearFEMacroPercentDone(std::this_thread::get_id());
1046  }
1047  catch(...)
1048  {
1049  }
1050 
1051  try
1052  {
1053  theFEInterfacesManager_->runFEMacro(
1054  interfaceID, localFEMacro, inputArgs, outputArgs);
1055  try
1056  {
1057  theFEInterfacesManager_->getFEInterfaceP(interfaceID)
1058  ->clearFEMacroPercentDone(std::this_thread::get_id());
1059  }
1060  catch(...)
1061  {
1062  }
1063  {
1064  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1065  task->outputArgs = outputArgs;
1066  task->doneTime = time(0);
1067  task->done = true;
1068  }
1069  }
1070  catch(const std::exception& e)
1071  {
1072  try
1073  {
1074  theFEInterfacesManager_->getFEInterfaceP(interfaceID)
1075  ->clearFEMacroPercentDone(std::this_thread::get_id());
1076  }
1077  catch(...)
1078  {
1079  }
1080  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1081  task->error =
1082  "In Supervisor with LID=" +
1083  std::to_string(getApplicationDescriptor()->getLocalId()) +
1084  " the FE Macro named '" + interfaceID +
1085  "' failed. Here is the error:\n\n" + e.what();
1086  task->doneTime = time(0);
1087  task->done = true;
1088  }
1089  catch(...)
1090  {
1091  try
1092  {
1093  theFEInterfacesManager_->getFEInterfaceP(interfaceID)
1094  ->clearFEMacroPercentDone(std::this_thread::get_id());
1095  }
1096  catch(...)
1097  {
1098  }
1099  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1100  task->error =
1101  "In Supervisor with LID=" +
1102  std::to_string(getApplicationDescriptor()->getLocalId()) +
1103  " the FE Macro named '" + interfaceID +
1104  "' failed due to an unknown error.";
1105  task->doneTime = time(0);
1106  task->done = true;
1107  }
1108  }).detach();
1109 
1110  __SUP_COUT__ << "Launched async FE Macro '" << feMacroName
1111  << "' for interfaceID '" << interfaceID
1112  << "' with taskID=" << task->taskID << __E__;
1113 
1114  // Wait briefly to see if macro finishes quickly (avoids unnecessary polling)
1115  {
1116  size_t sleepUs = 100;
1117  for(int i = 0; i < 10; ++i)
1118  {
1119  usleep(sleepUs);
1120  {
1121  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1122  if(task->done)
1123  {
1124  if(!task->error.empty())
1125  {
1126  std::string errorCopy = task->error;
1127  for(size_t j = 0; j < asyncMacroTasks_.size(); ++j)
1128  if(asyncMacroTasks_[j]->taskID == task->taskID)
1129  {
1130  asyncMacroTasks_.erase(
1131  asyncMacroTasks_.begin() + j);
1132  break;
1133  }
1134  __SUP_SS__ << errorCopy;
1135  __SUP_SS_THROW__;
1136  }
1137  retParameters.addParameter("outputArgs",
1138  task->outputArgs);
1139  for(size_t j = 0; j < asyncMacroTasks_.size(); ++j)
1140  if(asyncMacroTasks_[j]->taskID == task->taskID)
1141  {
1142  asyncMacroTasks_.erase(asyncMacroTasks_.begin() +
1143  j);
1144  break;
1145  }
1146  return SOAPUtilities::makeSOAPMessageReference(
1147  supervisorClassNoNamespace_ + "Response",
1148  retParameters);
1149  }
1150  }
1151  sleepUs *= 5;
1152  if(sleepUs > 1000 * 1000)
1153  sleepUs = 1000 * 1000;
1154  }
1155  }
1156 
1157  retParameters.addParameter("NotDoneTaskID", std::to_string(task->taskID));
1158  return SOAPUtilities::makeSOAPMessageReference(
1159  supervisorClassNoNamespace_ + "Response", retParameters);
1160  }
1161  else
1162  {
1163  // Synchronous path: run macro and return result (backward compatible)
1164  try
1165  {
1166  theFEInterfacesManager_->runFEMacro(
1167  interfaceID, FEMacro, inputArgs, outputArgs);
1168  }
1169  catch(std::runtime_error& e)
1170  {
1171  __SUP_SS__ << "In Supervisor with LID="
1172  << getApplicationDescriptor()->getLocalId()
1173  << " the FE Macro named '" << feMacroName
1174  << "' with target FE '" << interfaceID
1175  << "' failed. Here is the error:\n\n"
1176  << e.what() << __E__;
1177  __SUP_SS_THROW__;
1178  }
1179  catch(std::exception& e)
1180  {
1181  __SUP_SS__ << "In Supervisor with LID="
1182  << getApplicationDescriptor()->getLocalId()
1183  << " the FE Macro named '" << feMacroName
1184  << "' with target FE '" << interfaceID
1185  << "' failed. Here is the error:\n\n"
1186  << e.what() << __E__;
1187  __SUP_SS_THROW__;
1188  }
1189  catch(...)
1190  {
1191  __SUP_SS__ << "In Supervisor with LID="
1192  << getApplicationDescriptor()->getLocalId()
1193  << " the FE Macro named '" << feMacroName
1194  << "' with target FE '" << interfaceID
1195  << "' failed due to an unknown error." << __E__;
1196  try
1197  {
1198  throw;
1199  } //one more try to printout extra info
1200  catch(const std::exception& e)
1201  {
1202  ss << "Exception message: " << e.what();
1203  }
1204  catch(...)
1205  {
1206  }
1207  __SUP_SS_THROW__;
1208  }
1209 
1210  retParameters.addParameter("outputArgs", outputArgs);
1211  return SOAPUtilities::makeSOAPMessageReference(
1212  supervisorClassNoNamespace_ + "Response", retParameters);
1213  }
1214  }
1215  else if(request == "RunMacroMakerMacro")
1216  {
1217  if(!theFEInterfacesManager_)
1218  {
1219  __SUP_SS__ << "Missing FE Interface Manager! Are you configured?"
1220  << __E__;
1221  __SUP_SS_THROW__;
1222  }
1223 
1224  // params for running macros
1225  SOAPParameters requestParameters;
1226  requestParameters.addParameter("macroName");
1227  requestParameters.addParameter("macroString");
1228  requestParameters.addParameter("inputArgs");
1229  requestParameters.addParameter("outputArgs");
1230  requestParameters.addParameter("InterfaceID");
1231  requestParameters.addParameter("AsyncSupported");
1232  SOAPUtilities::receive(message, requestParameters);
1233  std::string interfaceID = requestParameters.getValue("InterfaceID");
1234  std::string macroName = requestParameters.getValue("macroName");
1235  std::string macroString = requestParameters.getValue("macroString");
1236  std::string inputArgs = requestParameters.getValue("inputArgs");
1237  std::string outputArgs = requestParameters.getValue("outputArgs");
1238  bool asyncSupported = requestParameters.getValue("AsyncSupported") == "1";
1239 
1240  if(asyncSupported)
1241  {
1242  // Async path: launch macro in a thread, return NotDoneTaskID immediately
1243  auto task = std::make_shared<AsyncMacroTask>();
1244  {
1245  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1246  task->taskID = ++asyncMacroTaskIDCounter_;
1247  if(asyncMacroTaskIDCounter_ == 0)
1248  task->taskID = ++asyncMacroTaskIDCounter_;
1249  task->interfaceID = interfaceID;
1250  task->startTime = time(0);
1251  asyncMacroTasks_.push_back(task);
1252  }
1253 
1254  std::thread([this,
1255  task,
1256  interfaceID,
1257  macroName,
1258  macroString,
1259  inputArgs,
1260  outputArgs]() mutable {
1261  {
1262  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1263  task->threadID = std::this_thread::get_id();
1264  }
1265  try
1266  {
1267  theFEInterfacesManager_->getFEInterfaceP(interfaceID)
1268  ->clearFEMacroPercentDone(std::this_thread::get_id());
1269  }
1270  catch(...)
1271  {
1272  }
1273 
1274  try
1275  {
1276  theFEInterfacesManager_->runMacro(
1277  interfaceID, macroString, inputArgs, outputArgs);
1278  try
1279  {
1280  theFEInterfacesManager_->getFEInterfaceP(interfaceID)
1281  ->clearFEMacroPercentDone(std::this_thread::get_id());
1282  }
1283  catch(...)
1284  {
1285  }
1286  {
1287  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1288  task->outputArgs = outputArgs;
1289  task->doneTime = time(0);
1290  task->done = true;
1291  }
1292  }
1293  catch(const std::exception& e)
1294  {
1295  try
1296  {
1297  theFEInterfacesManager_->getFEInterfaceP(interfaceID)
1298  ->clearFEMacroPercentDone(std::this_thread::get_id());
1299  }
1300  catch(...)
1301  {
1302  }
1303  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1304  task->error =
1305  "In Supervisor with LID=" +
1306  std::to_string(getApplicationDescriptor()->getLocalId()) +
1307  " the MacroMaker Macro named '" + macroName +
1308  "' with target FE '" + interfaceID +
1309  "' failed. Here is the error:\n\n" + e.what();
1310  task->doneTime = time(0);
1311  task->done = true;
1312  }
1313  catch(...)
1314  {
1315  try
1316  {
1317  theFEInterfacesManager_->getFEInterfaceP(interfaceID)
1318  ->clearFEMacroPercentDone(std::this_thread::get_id());
1319  }
1320  catch(...)
1321  {
1322  }
1323  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1324  task->error =
1325  "In Supervisor with LID=" +
1326  std::to_string(getApplicationDescriptor()->getLocalId()) +
1327  " the MacroMaker Macro named '" + macroName +
1328  "' with target FE '" + interfaceID +
1329  "' failed due to an unknown error.";
1330  task->doneTime = time(0);
1331  task->done = true;
1332  }
1333  }).detach();
1334 
1335  __SUP_COUT__ << "Launched async MacroMaker Macro '" << macroName
1336  << "' for interfaceID '" << interfaceID
1337  << "' with taskID=" << task->taskID << __E__;
1338 
1339  // Wait briefly to see if macro finishes quickly (avoids unnecessary polling)
1340  {
1341  size_t sleepUs = 100;
1342  for(int i = 0; i < 10; ++i)
1343  {
1344  usleep(sleepUs);
1345  {
1346  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1347  if(task->done)
1348  {
1349  if(!task->error.empty())
1350  {
1351  std::string errorCopy = task->error;
1352  for(size_t j = 0; j < asyncMacroTasks_.size(); ++j)
1353  if(asyncMacroTasks_[j]->taskID == task->taskID)
1354  {
1355  asyncMacroTasks_.erase(
1356  asyncMacroTasks_.begin() + j);
1357  break;
1358  }
1359  __SUP_SS__ << errorCopy;
1360  __SUP_SS_THROW__;
1361  }
1362  retParameters.addParameter("outputArgs",
1363  task->outputArgs);
1364  for(size_t j = 0; j < asyncMacroTasks_.size(); ++j)
1365  if(asyncMacroTasks_[j]->taskID == task->taskID)
1366  {
1367  asyncMacroTasks_.erase(asyncMacroTasks_.begin() +
1368  j);
1369  break;
1370  }
1371  return SOAPUtilities::makeSOAPMessageReference(
1372  supervisorClassNoNamespace_ + "Response",
1373  retParameters);
1374  }
1375  }
1376  sleepUs *= 5;
1377  if(sleepUs > 1000 * 1000)
1378  sleepUs = 1000 * 1000;
1379  }
1380  }
1381 
1382  retParameters.addParameter("NotDoneTaskID", std::to_string(task->taskID));
1383  return SOAPUtilities::makeSOAPMessageReference(
1384  supervisorClassNoNamespace_ + "Response", retParameters);
1385  }
1386  else
1387  {
1388  // Synchronous path: run macro and return result (backward compatible)
1389  try
1390  {
1391  theFEInterfacesManager_->runMacro(
1392  interfaceID, macroString, inputArgs, outputArgs);
1393  }
1394  catch(std::runtime_error& e)
1395  {
1396  __SUP_SS__ << "In Supervisor with LID="
1397  << getApplicationDescriptor()->getLocalId()
1398  << " the MacroMaker Macro named '" << macroName
1399  << "' with target FE '" << interfaceID
1400  << "' failed. Here is the error:\n\n"
1401  << e.what() << __E__;
1402  __SUP_SS_THROW__;
1403  }
1404  catch(...)
1405  {
1406  __SUP_SS__ << "In Supervisor with LID="
1407  << getApplicationDescriptor()->getLocalId()
1408  << " the MacroMaker Macro named '" << macroName
1409  << "' with target FE '" << interfaceID
1410  << "' failed due to an unknown error." << __E__;
1411  try
1412  {
1413  throw;
1414  } //one more try to printout extra info
1415  catch(const std::exception& e)
1416  {
1417  ss << "Exception message: " << e.what();
1418  }
1419  catch(...)
1420  {
1421  }
1422  __SUP_SS_THROW__;
1423  }
1424 
1425  retParameters.addParameter("outputArgs", outputArgs);
1426  return SOAPUtilities::makeSOAPMessageReference(
1427  supervisorClassNoNamespace_ + "Response", retParameters);
1428  }
1429  }
1430  else if(request == "CheckMacro")
1431  {
1432  SOAPParameters requestParameters;
1433  requestParameters.addParameter("TaskID");
1434  SOAPUtilities::receive(message, requestParameters);
1435  uint64_t taskID = std::stoull(requestParameters.getValue("TaskID"));
1436 
1437  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1438 
1439  // Cleanup old completed tasks (>5 minutes after completion)
1440  time_t now = time(0);
1441  for(size_t i = 0; i < asyncMacroTasks_.size();)
1442  {
1443  auto& t = asyncMacroTasks_[i];
1444  if(t->done && t->doneTime > 0 && (now - t->doneTime) > 5 * 60)
1445  asyncMacroTasks_.erase(asyncMacroTasks_.begin() + i);
1446  else
1447  ++i;
1448  }
1449 
1450  // Find the requested task
1451  std::shared_ptr<AsyncMacroTask> foundTask;
1452  for(auto& t : asyncMacroTasks_)
1453  {
1454  if(t->taskID == taskID)
1455  {
1456  foundTask = t;
1457  break;
1458  }
1459  }
1460 
1461  if(!foundTask)
1462  {
1463  __SUP_SS__ << "Async macro task with ID=" << taskID
1464  << " was not found. Perhaps it completed more than 5 "
1465  "minutes ago?"
1466  << __E__;
1467  __SUP_SS_THROW__;
1468  }
1469 
1470  if(foundTask->done)
1471  {
1472  if(!foundTask->error.empty())
1473  {
1474  std::string errorCopy = foundTask->error;
1475  // Remove completed task
1476  for(size_t i = 0; i < asyncMacroTasks_.size(); ++i)
1477  {
1478  if(asyncMacroTasks_[i]->taskID == taskID)
1479  {
1480  asyncMacroTasks_.erase(asyncMacroTasks_.begin() + i);
1481  break;
1482  }
1483  }
1484  __SUP_SS__ << errorCopy;
1485  __SUP_SS_THROW__;
1486  }
1487 
1488  retParameters.addParameter("outputArgs", foundTask->outputArgs);
1489 
1490  // Remove completed task
1491  for(size_t i = 0; i < asyncMacroTasks_.size(); ++i)
1492  {
1493  if(asyncMacroTasks_[i]->taskID == taskID)
1494  {
1495  asyncMacroTasks_.erase(asyncMacroTasks_.begin() + i);
1496  break;
1497  }
1498  }
1499 
1500  return SOAPUtilities::makeSOAPMessageReference(
1501  supervisorClassNoNamespace_ + "Response", retParameters);
1502  }
1503  else
1504  {
1505  // Still running — check for real progress from the FE macro
1506  if(theFEInterfacesManager_ && foundTask->threadID != std::thread::id())
1507  {
1508  try
1509  {
1510  int progress = theFEInterfacesManager_
1511  ->getFEInterfaceP(foundTask->interfaceID)
1512  ->getFEMacroPercentDone(foundTask->threadID);
1513  if(progress >= 0)
1514  retParameters.addParameter("Progress",
1515  std::to_string(progress));
1516  }
1517  catch(...)
1518  {
1519  }
1520  }
1521  retParameters.addParameter("NotDoneTaskID", std::to_string(taskID));
1522  return SOAPUtilities::makeSOAPMessageReference(
1523  supervisorClassNoNamespace_ + "Response", retParameters);
1524  }
1525  }
1526  else
1527  {
1528  __SUP_SS__ << "Unrecognized request received! '" << request << "'" << __E__;
1529  __SUP_SS_THROW__;
1530  }
1531  }
1532  catch(const std::runtime_error& e)
1533  {
1534  __SUP_SS__ << "Error occurred handling request: " << e.what() << __E__;
1535  __SUP_COUT_ERR__ << ss.str();
1536  retParameters.addParameter("Error", ss.str());
1537  }
1538  catch(...)
1539  {
1540  __SUP_SS__ << "Error occurred handling request." << __E__;
1541  try
1542  {
1543  throw;
1544  } //one more try to printout extra info
1545  catch(const std::exception& e)
1546  {
1547  ss << "Exception message: " << e.what();
1548  }
1549  catch(...)
1550  {
1551  }
1552  __SUP_COUT_ERR__ << ss.str();
1553  retParameters.addParameter("Error", ss.str());
1554  }
1555 
1556  return SOAPUtilities::makeSOAPMessageReference(
1557  supervisorClassNoNamespace_ + "FailRequest", retParameters);
1558 
1559 } // end macroMakerSupervisorRequest()
1560 
1561 //==============================================================================
1562 xoap::MessageReference FESupervisor::workLoopStatusRequest(
1563  xoap::MessageReference /*message*/)
1564 {
1565  if(!theFEInterfacesManager_)
1566  {
1567  __SUP_SS__ << "Invalid request for front-end workloop status from Supervisor "
1568  "without a FEVInterfacesManager."
1569  << __E__;
1570  __SUP_SS_THROW__;
1571  }
1572 
1573  return SOAPUtilities::makeSOAPMessageReference(
1574  (theFEInterfacesManager_->allFEWorkloopsAreDone()
1575  ? CoreSupervisorBase::WORK_LOOP_DONE
1576  : CoreSupervisorBase::WORK_LOOP_WORKING));
1577 } // end workLoopStatusRequest()
1578 
1579 //==============================================================================
1587 FEVInterfacesManager* FESupervisor::extractFEInterfacesManager()
1588 {
1589  theFEInterfacesManager_ = 0;
1590 
1591  for(unsigned int i = 0; i < theStateMachineImplementation_.size(); ++i)
1592  {
1593  try
1594  {
1595  theFEInterfacesManager_ =
1596  dynamic_cast<FEVInterfacesManager*>(theStateMachineImplementation_[i]);
1597  if(!theFEInterfacesManager_)
1598  {
1599  // dynamic_cast returns null pointer on failure
1600  __SUP_SS__ << "Dynamic cast failure!" << __E__;
1601  __SUP_SS_THROW__;
1602  }
1603  __SUP_COUT__ << "State Machine " << i << " WAS of type FEVInterfacesManager"
1604  << __E__;
1605 
1606  break;
1607  }
1608  catch(...)
1609  {
1610  __SUP_COUT__ << "State Machine " << i
1611  << " was NOT of type FEVInterfacesManager" << __E__;
1612  }
1613  }
1614 
1615  __SUP_COUT__ << "theFEInterfacesManager pointer = " << theFEInterfacesManager_
1616  << __E__;
1617 
1618  return theFEInterfacesManager_;
1619 } // end extractFEInterfaceManager()
1620 
1621 //==============================================================================
1622 void FESupervisor::transitionConfiguring(toolbox::Event::Reference /*event*/)
1623 {
1624  __SUP_COUT__ << "transitionConfiguring" << __E__;
1625  CoreSupervisorBase::configureInit();
1626 
1627  // get pset from Board Reader metric manager table
1628  try
1629  {
1630  __COUTV__(CorePropertySupervisorBase::getSupervisorConfigurationPath());
1631 
1632  ConfigurationTree feSupervisorNode =
1633  CorePropertySupervisorBase::getSupervisorTableNode();
1634 
1635  std::string metric_string = "";
1636  bool metricStringSetup = true;
1637  try
1638  {
1639  std::ostringstream oss;
1640  std::string tabString = "";
1641  std::string commentsString = "";
1643  oss, tabString, commentsString, "" /* parentPath*/, feSupervisorNode);
1644  metric_string = oss.str();
1645  }
1646  catch(...)
1647  {
1648  metricStringSetup = false;
1649  metric_string = "";
1650  } // ignore error
1651 
1652  if(!metricMan)
1653  {
1654  __SUP_COUT__ << "Metric manager is not instantiated! Attempting to fix."
1655  << __E__;
1656  metricMan = std::make_unique<artdaq::MetricManager>();
1657  }
1658  std::string metricNamePreamble =
1659  feSupervisorNode.getNode("/SlowControlsMetricManagerChannelNamePreamble")
1660  .getValueWithDefault<std::string>("");
1661  __SUP_COUTV__(metricNamePreamble);
1662 
1663  // std::string metric_string = "epics: {metricPluginType:epics level:3 channel_name_prefix:Mu2e}";
1664  fhicl::ParameterSet metric_pset = fhicl::ParameterSet::make(metric_string);
1665 
1666  __SUP_COUTV__(metricNamePreamble);
1667  try
1668  {
1669  metricMan->initialize(metric_pset.get<fhicl::ParameterSet>("metrics"),
1670  metricNamePreamble);
1671  }
1672  catch(...)
1673  {
1674  if(metricStringSetup)
1675  throw;
1676  else
1677  __SUP_COUT__ << "Ignore metric manager initialize error because metric "
1678  "string is not setup."
1679  << __E__;
1680  }
1681  __SUP_COUT__ << "transitionConfiguring metric manager(" << metricMan
1682  << ") initialized = " << metricMan->Initialized() << __E__;
1683  }
1684  catch(const std::runtime_error& e)
1685  {
1686  __SS__ << "Error loading metrics in FESupervisor::transitionConfiguring(): "
1687  << e.what() << __E__;
1688  __SUP_COUT_ERR__ << ss.str();
1689  // ExceptionHandler(ExceptionHandlerRethrow::no, ss.str());
1690 
1691  //__SS_THROW_ONLY__;
1692  theStateMachine_.setErrorMessage(ss.str());
1693  throw toolbox::fsm::exception::Exception(
1694  "Transition Error" /*name*/,
1695  ss.str() /* message*/,
1696  "FESupervisor::transitionConfiguring" /*module*/,
1697  __LINE__ /*line*/,
1698  __FUNCTION__ /*function*/
1699  );
1700  }
1701  catch(...)
1702  {
1703  __SS__ << "Error loading metrics in FESupervisor::transitionConfiguring()"
1704  << __E__;
1705  try
1706  {
1707  throw;
1708  } //one more try to printout extra info
1709  catch(const std::exception& e)
1710  {
1711  ss << "Exception message: " << e.what();
1712  }
1713  catch(...)
1714  {
1715  }
1716  __SUP_COUT_ERR__ << ss.str();
1717  // ExceptionHandler(ExceptionHandlerRethrow::no, ss.str());
1718 
1719  //__SS_THROW_ONLY__;
1720  theStateMachine_.setErrorMessage(ss.str());
1721  throw toolbox::fsm::exception::Exception(
1722  "Transition Error" /*name*/,
1723  ss.str() /* message*/,
1724  "FESupervisor::transitionConfiguring" /*module*/,
1725  __LINE__ /*line*/,
1726  __FUNCTION__ /*function*/
1727  );
1728  }
1729 
1730  CoreSupervisorBase::transitionConfiguringFSMs();
1731 
1732  __SUP_COUT__ << "transitionConfiguring done." << __E__;
1733 } // end transitionConfiguring()
1734 
1735 //==============================================================================
1736 void FESupervisor::transitionHalting(toolbox::Event::Reference event)
1737 {
1738  __SUP_COUT__ << "transitionHalting" << __E__;
1739  TLOG_DEBUG(7) << "transitionHalting";
1740 
1741  // Wait for any in-flight async macro tasks before tearing down FE interfaces
1742  {
1743  const int timeoutSeconds = 30;
1744  const time_t deadline = time(0) + timeoutSeconds;
1745  bool hasOutstanding = true;
1746  while(hasOutstanding && time(0) < deadline)
1747  {
1748  {
1749  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1750  hasOutstanding = false;
1751  for(const auto& t : asyncMacroTasks_)
1752  {
1753  if(!t->done)
1754  {
1755  hasOutstanding = true;
1756  break;
1757  }
1758  }
1759  }
1760  if(hasOutstanding)
1761  {
1762  __SUP_COUT__ << "Waiting for outstanding async macro tasks to complete "
1763  "before halting..."
1764  << __E__;
1765  usleep(500 * 1000);
1766  }
1767  }
1768  if(hasOutstanding)
1769  __SUP_COUT_WARN__ << "Timed out waiting for async macro tasks; "
1770  "proceeding with halt."
1771  << __E__;
1772  std::lock_guard<std::mutex> lock(asyncMacroMutex_);
1773  asyncMacroTasks_.clear();
1774  }
1775 
1776  // shutdown workloops first, then shutdown metric manager
1778 
1779  try
1780  {
1781  if(metricMan && metricMan->Initialized())
1782  {
1783  TLOG_DEBUG(7) << "Metric manager(" << metricMan << ") shutting down..."
1784  << __E__;
1785  metricMan
1786  ->shutdown(); // will set initilized_ to false with mutex, which should prevent races
1787  TLOG_DEBUG(7) << "Metric manager shutdown." << __E__;
1788  }
1789  else
1790  __SUP_COUT__ << "Metric manager(" << metricMan << ") already shutdown."
1791  << __E__;
1792 
1793  metricMan.reset(nullptr);
1794  }
1795  catch(...)
1796  {
1797  __SS__ << "Error shutting down metrics in FESupervisor::transitionHalting()"
1798  << __E__;
1799  try
1800  {
1801  throw;
1802  } //one more try to printout extra info
1803  catch(const std::exception& e)
1804  {
1805  ss << "Exception message: " << e.what();
1806  }
1807  catch(...)
1808  {
1809  }
1810  __SUP_COUT_ERR__ << ss.str();
1811  // ExceptionHandler(ExceptionHandlerRethrow::no, ss.str());
1812 
1813  //__SS_THROW_ONLY__;
1814  theStateMachine_.setErrorMessage(ss.str());
1815  throw toolbox::fsm::exception::Exception(
1816  "Transition Error" /*name*/,
1817  ss.str() /* message*/,
1818  "FESupervisor::transitionHalting" /*module*/,
1819  __LINE__ /*line*/,
1820  __FUNCTION__ /*function*/
1821  );
1822  }
1823 
1824  __SUP_COUT__ << "transitionHalting done." << __E__;
1825 } // end transitionHalting()
1826 
1827 //==============================================================================
1828 void FESupervisor::initDataPublishing(const std::string& endpoint,
1829  const std::string& topic)
1830 {
1831  if(dp_isInitialized_)
1832  {
1833  // silently re‑initialise – close old socket first.
1834  closeDataPublishing(false);
1835  }
1836 
1837  // Store for later use.
1838  dp_endpoint_ = endpoint;
1839  dp_topic_ = topic;
1840 
1841  // Bind the PUB socket.
1842  try
1843  {
1844  dp_socket_.bind(dp_endpoint_);
1845  }
1846  catch(const zmq::error_t& e)
1847  {
1848  __SUP_SS__ << "initDataPublishing() - bind to zmq '" + dp_endpoint_ +
1849  "' failed: " + e.what()
1850  << __E__;
1851  __SUP_SS_THROW__;
1852  }
1853 
1854  dp_isInitialized_ = true;
1855 } //end initDataPublishing()
1856 
1857 //==============================================================================
1858 void FESupervisor::closeDataPublishing(bool alsoCloseContext /* = true */)
1859 {
1860  if(dp_isInitialized_)
1861  {
1862  try
1863  {
1864  dp_socket_.set(zmq::sockopt::linger, 0); // discard unsent messages on close
1865  dp_socket_.close(); // close the PUB socket
1866  }
1867  catch(const zmq::error_t&)
1868  {
1869  // ignore – we are cleaning up.
1870  }
1871  try
1872  {
1873  if(alsoCloseContext)
1874  dp_context_.close(); // close the ZMQ context
1875  }
1876  catch(const zmq::error_t&)
1877  {
1878  // ignore.
1879  }
1880  dp_isInitialized_ = false;
1881  }
1882 } //end closeDataPublishing()
1883 
1884 //==============================================================================
1892 void FESupervisor::publishData(const char* dataPtr, size_t dataSize)
1893 {
1894  if(!dp_isInitialized_)
1895  {
1896  __SUP_SS__ << "publishData() called before zmq init()" << __E__;
1897  __SUP_SS_THROW__;
1898  }
1899 
1900  // ---- Topic frame (must be sent with sndmore) ----
1901  // Using zmq::buffer prevents an extra copy – it just points at the data.
1902  auto rc_topic = dp_socket_.send(zmq::buffer(dp_topic_), zmq::send_flags::sndmore);
1903  if(!rc_topic)
1904  {
1905  __SUP_SS__ << "publishData() - failed to send zmq topic" << __E__;
1906  __SUP_SS_THROW__;
1907  }
1908 
1909  // ---- Payload frame (final frame) ----
1910  // `dataPtr` may be nullptr only if `dataSize == 0` – ZeroMQ accepts an empty frame.
1911  if(dataPtr == nullptr && dataSize > 0)
1912  {
1913  __SUP_SS__ << "FESupervisor::publishData() - dataPtr is nullptr but dataSize is "
1914  << dataSize << __E__;
1915  __SUP_SS_THROW__;
1916  }
1917  auto rc_payload =
1918  dp_socket_.send(zmq::buffer(dataPtr, dataSize), zmq::send_flags::none);
1919  if(!rc_payload)
1920  {
1921  __SUP_SS__ << "publishData() - failed to send zmq payload" << __E__;
1922  __SUP_SS_THROW__;
1923  }
1924 } //end publishData()
static void insertMetricsBlock(std::ostream &out, std::string &tabStr, std::string &commentStr, const std::string &parentPath, ConfigurationTree daqNode)
insertMetricsBlock
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
T getValueWithDefault(const T &defaultValue) const
static void extractPermissionsMapFromString(const std::string &permissionsString, std::map< std::string, WebUsers::permissionLevel_t > &permissionsMap)
static bool doPermissionsGrantAccess(std::map< std::string, WebUsers::permissionLevel_t > &permissionLevelsMap, std::map< std::string, WebUsers::permissionLevel_t > &permissionThresholdsMap)
virtual void transitionHalting(toolbox::Event::Reference event)
virtual void request(const std::string &requestType, cgicc::Cgicc &cgiIn, HttpXmlDocument &xmlOut, const WebUsers::RequestUserInfo &userInfo)
void publishData(const char *dataPtr, size_t dataSize)
virtual void transitionHalting(toolbox::Event::Reference event) override
void initDataPublishing(const std::string &endpoint, const std::string &topic="test")
void closeDataPublishing(bool alsoCloseContext=true)
xoap::MessageReference macroMakerSupervisorRequest(xoap::MessageReference message)
const FEVInterface & getFEInterface(const std::string &interfaceID) const
getFEInterface
std::string getFEMacrosString(const std::string &supervisorName, const std::string &supervisorLid)
used by MacroMaker
FEVInterface * getFEInterfaceP(const std::string &interfaceID)
getFEInterfaceP
void runFEMacro(const std::string &interfaceID, const FEVInterface::frontEndMacroStruct_t &feMacro, const std::string &inputArgs, std::string &outputArgs)
used by MacroMaker and FE calling indirectly
unsigned int getInterfaceUniversalAddressSize(const std::string &interfaceID)
used by MacroMaker
void startFEMacroMultiDimensional(const std::string &requester, const std::string &interfaceID, const std::string &feMacroName, const bool enableSavingOutput, const std::string &outputFilePath, const std::string &outputFileRadix, const std::string &inputArgs)
used by iterator calling (i.e. FESupervisor)
void universalWrite(const std::string &interfaceID, char *address, char *writeValue)
used by MacroMaker
bool allFEWorkloopsAreDone(void)
used by Iterator, e.g.
void runFEMacroByFE(const std::string &callingInterfaceID, const std::string &interfaceID, const std::string &feMacroName, const std::string &inputArgs, std::string &outputArgs)
used by FE calling (i.e. FESupervisor)
unsigned int getInterfaceUniversalDataSize(const std::string &interfaceID)
used by MacroMaker
void universalRead(const std::string &interfaceID, char *address, char *returnValue)
used by MacroMaker
void startMacroMultiDimensional(const std::string &requester, const std::string &interfaceID, const std::string &macroName, const std::string &macroString, const bool enableSavingOutput, const std::string &outputFilePath, const std::string &outputFileRadix, const std::string &inputArgs)
used by iterator calling (i.e. FESupervisor)
bool checkMacroMultiDimensional(const std::string &interfaceID, const std::string &macroName)
used by iterator calling (i.e. FESupervisor)
std::mutex frontEndCommunicationReceiveMutex_
FE communication helpers.
void runMacro(const std::string &interfaceID, const std::string &macroObjectString, const std::string &inputArgs, std::string &outputArgs)
used by MacroMaker
std::string getFEListString(const std::string &supervisorLid)
used by MacroMaker
defines used also by OtsConfigurationWizardSupervisor
< members fully define a front-end macro function
Definition: FEVInterface.h:178
static std::string mapToString(const std::map< std::string, T > &mapToReturn, const std::string &primaryDelimeter=", ", const std::string &secondaryDelimeter=": ")