1 #include "otsdaq/DataManager/DataManager.h"
2 #include "otsdaq/ConfigurationInterface/ConfigurationManager.h"
3 #include "otsdaq/DataManager/CircularBuffer.h"
4 #include "otsdaq/DataManager/DataConsumer.h"
5 #include "otsdaq/DataManager/DataProducerBase.h"
6 #include "otsdaq/DataManager/MakeDataProcessor.h"
7 #include "otsdaq/Macros/CoutMacros.h"
8 #include "otsdaq/MessageFacility/MessageFacility.h"
18 DataManager::DataManager(
const ConfigurationTree& theXDAQContextConfigTree,
const std::string& supervisorConfigurationPath)
19 :
Configurable(theXDAQContextConfigTree, supervisorConfigurationPath)
21 , parentSupervisorHasFrontends_(false)
23 __CFG_COUT__ <<
"Constructed." << __E__;
28 DataManager::~DataManager(
void)
30 __CFG_COUT__ <<
"Destructor." << __E__;
32 __CFG_COUT__ <<
"Destructed." << __E__;
38 *out <<
"Buffer count: " << buffers_.size() << __E__;
39 for(
auto& bufferPair : buffers_)
42 <<
"Buffer '" << bufferPair.first <<
"' status=" << bufferPair.second.status_
43 <<
" producers=" << bufferPair.second.producers_.size()
44 <<
" consumers=" << bufferPair.second.consumers_.size() << __E__;
47 <<
"Producers:" << __E__;
48 for(
auto& producer : bufferPair.second.producers_)
50 *out <<
"\t\t\t" << producer->getProcessorID() <<
" ["
51 << bufferPair.second.buffer_->getProducerBufferSize(
52 producer->getProcessorID())
56 <<
"Consumers:" << __E__;
57 for(
auto& consumer : bufferPair.second.consumers_)
59 *out <<
"\t\t\t" << consumer->getProcessorID() << __E__;
67 const std::string transitionName =
"Configuring";
69 const std::string COL_NAME_bufferGroupLink =
"LinkToDataBufferTable";
70 const std::string COL_NAME_processorGroupLink =
"LinkToDataProcessorTable";
71 const std::string COL_NAME_processorType =
"ProcessorType";
72 const std::string COL_NAME_processorPlugin =
"ProcessorPluginName";
73 const std::string COL_NAME_processorLink =
"LinkToProcessorTable";
74 const std::string COL_NAME_appUID =
"ApplicationUID";
76 __CFG_COUT__ << transitionName <<
" DataManager" << __E__;
77 __CFG_COUT__ <<
"Path: " << theConfigurationPath_ +
"/" + COL_NAME_bufferGroupLink
83 for(
const auto& buffer :
84 theXDAQContextConfigTree_
85 .getNode(theConfigurationPath_ +
"/" + COL_NAME_bufferGroupLink)
88 __CFG_COUT__ <<
"Data Buffer Name: " << buffer.first << __E__;
89 if(buffer.second.getNode(TableViewColumnInfo::COL_NAME_STATUS).getValue<
bool>())
91 std::vector<unsigned int> producersVectorLocation;
92 std::vector<unsigned int> consumersVectorLocation;
93 auto bufferConfigurationList =
94 buffer.second.getNode(COL_NAME_processorGroupLink)
96 unsigned int location = 0;
97 for(
const auto& bufferConfiguration : bufferConfigurationList)
99 __CFG_COUT__ <<
"Processor id: " << bufferConfiguration.first << __E__;
100 if(bufferConfiguration.second
101 .getNode(TableViewColumnInfo::COL_NAME_STATUS)
104 if(bufferConfiguration.second.getNode(COL_NAME_processorType)
105 .getValue<std::string>() ==
"Producer")
107 producersVectorLocation.push_back(location);
109 else if(bufferConfiguration.second.getNode(COL_NAME_processorType)
110 .getValue<std::string>() ==
"Consumer")
112 consumersVectorLocation.push_back(location);
116 __CFG_SS__ <<
"Node ProcessorType in "
117 << bufferConfiguration.first <<
" of type "
118 << bufferConfiguration.second
119 .getNode(COL_NAME_processorPlugin)
120 .getValue<std::string>()
121 <<
" is invalid. The only accepted types are Producer "
124 __CFG_COUT_ERR__ << ss.str();
133 producersVectorLocation.size() ==
136 __CFG_SS__ <<
"Node Data Buffer " << buffer.first <<
" has "
137 << producersVectorLocation.size() <<
" Producers"
138 <<
" and " << consumersVectorLocation.size() <<
" Consumers"
139 <<
" there must be at least 1 Producer "
141 "for the buffer!" << __E__;
142 __CFG_COUT_ERR__ << ss.str();
148 <<
"Parent supervisor has front-ends, so FE-producers may "
149 <<
"be instantiated in the configure steps of the FESupervisor."
152 configureBuffer<std::string, std::map<std::string, std::string> >(
155 for(
auto& producerLocation : producersVectorLocation)
159 __CFG_COUT__ <<
"Creating producer... "
160 << bufferConfigurationList[producerLocation].first << __E__;
176 bufferConfigurationList[producerLocation]
177 .second.getNode(COL_NAME_processorPlugin)
178 .getValue<std::string>(),
179 theXDAQContextConfigTree_.getBackNode(theConfigurationPath_)
183 bufferConfigurationList[producerLocation].first,
184 theXDAQContextConfigTree_,
185 theConfigurationPath_ +
"/" + COL_NAME_bufferGroupLink +
"/" +
186 buffer.first +
"/" + COL_NAME_processorGroupLink +
"/" +
187 bufferConfigurationList[producerLocation].first +
"/" +
188 COL_NAME_processorLink));
192 __CFG_SS__ <<
"Construction failed for producer '"
193 << bufferConfigurationList[producerLocation].first
194 <<
"!' Null pointer returned." << __E__;
202 __CFG_COUT__ << ss.str() << __E__;
205 catch(
const std::bad_cast& e)
207 __CFG_SS__ <<
"Failed to instantiate producer plugin named '"
208 << bufferConfigurationList[producerLocation].first
210 << bufferConfigurationList[producerLocation]
211 .second.getNode(COL_NAME_processorPlugin)
212 .getValue<std::string>()
213 <<
"' due to the following error: \n"
214 << e.what() << __E__;
215 __CFG_COUT_ERR__ << ss.str();
218 catch(
const cet::exception& e)
220 __CFG_SS__ <<
"Failed to instantiate producer plugin named '"
221 << bufferConfigurationList[producerLocation].first
223 << bufferConfigurationList[producerLocation]
224 .second.getNode(COL_NAME_processorPlugin)
225 .getValue<std::string>()
226 <<
"' due to the following error: \n"
227 << e.what() << __E__;
228 __CFG_COUT_ERR__ << ss.str();
231 catch(
const std::runtime_error& e)
233 __CFG_SS__ <<
"Failed to instantiate producer plugin named '"
234 << bufferConfigurationList[producerLocation].first
236 << bufferConfigurationList[producerLocation]
237 .second.getNode(COL_NAME_processorPlugin)
238 .getValue<std::string>()
239 <<
"' due to the following error: \n"
240 << e.what() << __E__;
241 __CFG_COUT_ERR__ << ss.str();
246 __CFG_SS__ <<
"Failed to instantiate producer plugin named '"
247 << bufferConfigurationList[producerLocation].first
249 << bufferConfigurationList[producerLocation]
250 .second.getNode(COL_NAME_processorPlugin)
251 .getValue<std::string>()
252 <<
"' due to an unknown error." << __E__;
257 catch(
const std::exception& e)
259 ss <<
"Exception message: " << e.what();
264 __CFG_COUT_ERR__ << ss.str();
269 __CFG_COUT__ << bufferConfigurationList[producerLocation].first
270 <<
" has been created!" << __E__;
273 for(
auto& consumerLocation : consumersVectorLocation)
277 __CFG_COUT__ <<
"Creating consumer... "
278 << bufferConfigurationList[consumerLocation].first << __E__;
296 bufferConfigurationList[consumerLocation]
297 .second.getNode(COL_NAME_processorPlugin)
298 .getValue<std::string>(),
299 theXDAQContextConfigTree_.getBackNode(theConfigurationPath_)
303 bufferConfigurationList[consumerLocation].first,
304 theXDAQContextConfigTree_,
305 theConfigurationPath_ +
"/" + COL_NAME_bufferGroupLink +
"/" +
306 buffer.first +
"/" + COL_NAME_processorGroupLink +
"/" +
307 bufferConfigurationList[consumerLocation].first +
"/" +
308 COL_NAME_processorLink));
312 __CFG_SS__ <<
"Construction failed for consumer '"
313 << bufferConfigurationList[consumerLocation].first
314 <<
"!' Null pointer returned." << __E__;
321 __CFG_COUT__ << ss.str() << __E__;
324 catch(
const std::bad_cast& e)
326 __CFG_SS__ <<
"Failed to instantiate consumer plugin named '"
327 << bufferConfigurationList[consumerLocation].first
329 << bufferConfigurationList[consumerLocation]
330 .second.getNode(COL_NAME_processorPlugin)
331 .getValue<std::string>()
332 <<
"' due to the following error: \n"
333 << e.what() << __E__;
334 __CFG_COUT_ERR__ << ss.str();
337 catch(
const cet::exception& e)
339 __CFG_SS__ <<
"Failed to instantiate consumer plugin named '"
340 << bufferConfigurationList[consumerLocation].first
342 << bufferConfigurationList[consumerLocation]
343 .second.getNode(COL_NAME_processorPlugin)
344 .getValue<std::string>()
345 <<
"' due to the following error: \n"
346 << e.what() << __E__;
347 __CFG_COUT_ERR__ << ss.str();
350 catch(
const std::runtime_error& e)
352 __CFG_SS__ <<
"Failed to instantiate consumer plugin named '"
353 << bufferConfigurationList[consumerLocation].first
355 << bufferConfigurationList[consumerLocation]
356 .second.getNode(COL_NAME_processorPlugin)
357 .getValue<std::string>()
358 <<
"' due to the following error: \n"
359 << e.what() << __E__;
360 __CFG_COUT_ERR__ << ss.str();
365 __CFG_SS__ <<
"Failed to instantiate consumer plugin named '"
366 << bufferConfigurationList[consumerLocation].first
368 << bufferConfigurationList[consumerLocation]
369 .second.getNode(COL_NAME_processorPlugin)
370 .getValue<std::string>()
371 <<
"' due to an unknown error." << __E__;
376 catch(
const std::exception& e)
378 ss <<
"Exception message: " << e.what();
383 __CFG_COUT_ERR__ << ss.str();
387 __CFG_COUT__ << bufferConfigurationList[consumerLocation].first
388 <<
" has been created!" << __E__;
392 DataManager::configureAllBuffers();
396 void DataManager::halt(
void)
398 const std::string transitionName =
"Halting";
400 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
409 __CFG_COUT_WARN__ <<
"An error occurred while halting the Data Manager, ignoring."
415 __CFG_COUT__ << transitionName <<
" DataManager stopped. Now destruct buffers..."
423 void DataManager::pause(
void)
425 const std::string transitionName =
"Pausing";
427 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
429 DataManager::pauseAllBuffers();
433 void DataManager::resume(
void)
435 const std::string transitionName =
"Resuming";
437 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
439 DataManager::resumeAllBuffers();
443 void DataManager::start(std::string runNumber)
445 const std::string transitionName =
"Starting";
447 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
449 DataManager::startAllBuffers(runNumber);
453 void DataManager::stop()
455 const std::string transitionName =
"Stopping";
457 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
459 DataManager::stopAllBuffers();
467 DataManager::stopAllBuffers();
469 for(
auto& bufferPair : buffers_)
473 for(
auto& producer : bufferPair.second.producers_)
475 bufferPair.second.producers_.clear();
477 for(
auto& consumer : bufferPair.second.consumers_)
479 bufferPair.second.consumers_.clear();
481 delete bufferPair.second.buffer_;
642 const std::string& feProducerID)
644 __CFG_COUT__ <<
"Un-Registering FE-producer '" << feProducerID <<
"' from buffer '"
645 << bufferID <<
"'..." << __E__;
647 auto bufferIt = buffers_.find(bufferID);
648 if(bufferIt == buffers_.end())
650 __CFG_SS__ <<
"While Un-Registering FE-producer '" << feProducerID
651 <<
",' buffer '" << bufferID <<
"' not found!" << __E__;
660 for(
auto feProducerIt = bufferIt->second.producers_.begin();
661 feProducerIt != bufferIt->second.producers_.end();
664 if((*feProducerIt)->getProcessorID() == feProducerID)
668 bufferIt->second.producers_.erase(feProducerIt);
673 __CFG_COUT__ <<
"Un-Registered FE-producer '" << feProducerID <<
"' from buffer '"
674 << bufferID <<
".'" << __E__;
678 __CFG_COUT__ << ss.str() << __E__;
692 __CFG_COUT__ <<
"Registering producer '" << producer->
getProcessorID()
693 <<
"' to buffer '" << bufferUID <<
"'..." << __E__;
695 auto bufferIt = buffers_.find(bufferUID);
696 if(bufferIt == buffers_.end())
698 __CFG_SS__ <<
"Can't find buffer UID '" + bufferUID <<
"' for producer '"
700 <<
".' Make sure that your configuration is correct!" << __E__;
702 ss <<
"\n\n Here is the list of buffers:" << __E__;
703 for(
const auto& bufferPair : buffers_)
704 ss << bufferPair.first << __E__;
711 __CFG_SS__ <<
"Before!" << __E__;
713 __CFG_COUT__ << ss.str() << __E__;
716 __CFG_COUTV__(producer->getBufferSize());
717 bufferIt->second.buffer_->registerProducer(producer, producer->getBufferSize());
718 bufferIt->second.producers_.push_back(producer);
721 __CFG_SS__ <<
"After!" << __E__;
723 __CFG_COUT__ << ss.str() << __E__;
730 __CFG_COUT__ <<
"Registering consumer '" << consumer->
getProcessorID()
731 <<
"' to buffer '" << bufferUID <<
"'..." << __E__;
733 auto bufferIt = buffers_.find(bufferUID);
734 if(bufferIt == buffers_.end())
736 __CFG_SS__ <<
"Can't find buffer UID '" + bufferUID <<
"' for consumer '"
738 <<
".' Make sure that your configuration is correct!" << __E__;
740 ss <<
"\n\n Here is the list of buffers:" << __E__;
741 for(
const auto& bufferPair : buffers_)
742 ss << bufferPair.first << __E__;
748 __CFG_SS__ <<
"Before!" << __E__;
750 __CFG_COUT__ << ss.str() << __E__;
753 bufferIt->second.buffer_->registerConsumer(consumer);
754 bufferIt->second.consumers_.push_back(consumer);
757 __CFG_SS__ <<
"After!" << __E__;
759 __CFG_COUT__ << ss.str() << __E__;
764 void DataManager::configureAllBuffers(
void)
766 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
767 configureBuffer(it->first);
771 void DataManager::startAllBuffers(
const std::string& runNumber)
773 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
774 startBuffer(it->first, runNumber);
778 void DataManager::stopAllBuffers(
void)
780 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
781 stopBuffer(it->first);
785 void DataManager::resumeAllBuffers(
void)
787 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
788 resumeBuffer(it->first);
792 void DataManager::pauseAllBuffers(
void)
794 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
795 pauseBuffer(it->first);
799 void DataManager::configureBuffer(
const std::string& bufferUID)
801 __CFG_COUT__ <<
"Configuring... " << bufferUID << __E__;
803 for(
auto& it : buffers_[bufferUID].consumers_)
812 __CFG_COUT_WARN__ <<
"An error occurred while configuring consumer '"
813 << it->getProcessorID() <<
"'..." << __E__;
818 for(
auto& it : buffers_[bufferUID].producers_)
827 __CFG_COUT_WARN__ <<
"An error occurred while starting producer '"
828 << it->getProcessorID() <<
"'..." << __E__;
833 buffers_[bufferUID].status_ = Initialized;
838 void DataManager::startBuffer(
const std::string& bufferUID, std::string runNumber)
840 __CFG_COUT__ <<
"Starting... " << bufferUID << __E__;
842 buffers_[bufferUID].buffer_->reset();
843 for(
auto& it : buffers_[bufferUID].consumers_)
848 it->startProcessingData(runNumber);
852 __CFG_COUT_WARN__ <<
"An error occurred while starting consumer '"
853 << it->getProcessorID() <<
"'..." << __E__;
858 for(
auto& it : buffers_[bufferUID].producers_)
863 it->startProcessingData(runNumber);
867 __CFG_COUT_WARN__ <<
"An error occurred while starting producer '"
868 << it->getProcessorID() <<
"'..." << __E__;
873 buffers_[bufferUID].status_ = Running;
878 void DataManager::stopBuffer(
const std::string& bufferUID)
880 __CFG_COUT__ <<
"Stopping... " << bufferUID << __E__;
882 __CFG_COUT__ <<
"Stopping producers..." << __E__;
883 for(
auto& it : buffers_[bufferUID].producers_)
888 it->stopProcessingData();
892 __CFG_COUT_WARN__ <<
"An error occurred while stopping producer '"
893 << it->getProcessorID() <<
"'..." << __E__;
899 unsigned int timeOut = 0;
900 const unsigned int ratio = 100;
901 const unsigned int sleepTime = 1000 * ratio;
902 unsigned int totalSleepTime =
905 .buffer_->getTotalNumberOfSubBuffers();
906 if(totalSleepTime < 5000000)
907 totalSleepTime = 5000000;
908 while(!buffers_[bufferUID].buffer_->isEmpty())
911 timeOut += sleepTime;
912 if(timeOut > totalSleepTime)
914 __CFG_COUT__ <<
"Couldn't flush all buffers! Timing out after "
915 << totalSleepTime / 1000000. <<
" seconds!" << __E__;
916 buffers_[bufferUID].buffer_->isEmpty();
920 __CFG_COUT__ <<
"Stopping consumers, buffer MUST BE EMPTY. Is buffer empty? "
921 << (buffers_[bufferUID].buffer_->isEmpty() ?
"yes" :
"no") << __E__;
923 for(
auto& it : buffers_[bufferUID].consumers_)
928 it->stopProcessingData();
932 __CFG_COUT_WARN__ <<
"An error occurred while stopping consumer '"
933 << it->getProcessorID() <<
"'..." << __E__;
938 buffers_[bufferUID].buffer_->reset();
939 buffers_[bufferUID].status_ = Initialized;
943 void DataManager::resumeBuffer(
const std::string& bufferUID)
945 __CFG_COUT__ <<
"Resuming... " << bufferUID << __E__;
947 for(
auto& it : buffers_[bufferUID].consumers_)
948 it->resumeProcessingData();
949 for(
auto& it : buffers_[bufferUID].producers_)
950 it->resumeProcessingData();
952 buffers_[bufferUID].status_ = Running;
956 void DataManager::pauseBuffer(
const std::string& bufferUID)
958 __CFG_COUT__ <<
"Pausing... " << bufferUID << __E__;
960 for(
auto& it : buffers_[bufferUID].producers_)
961 it->pauseProcessingData();
963 unsigned int timeOut = 0;
964 const unsigned int sleepTime = 1000;
965 while(!buffers_[bufferUID].buffer_->isEmpty())
968 timeOut += sleepTime;
970 sleepTime * buffers_[bufferUID].buffer_->getTotalNumberOfSubBuffers())
975 __CFG_COUT__ <<
"Couldn't flush all buffers! Timing out after "
976 << buffers_[bufferUID].buffer_->getTotalNumberOfSubBuffers() *
978 <<
" seconds!" << __E__;
982 for(
auto& it : buffers_[bufferUID].consumers_)
983 it->pauseProcessingData();
984 buffers_[bufferUID].status_ = Initialized;
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
void getValue(T &value) const
void unregisterFEProducer(const std::string &bufferID, const std::string &feProducerID)
void registerProducer(const std::string &bufferUID, DataProducerBase *producer)
The data manager becomes the owner of the producer object!
bool parentSupervisorHasFrontends_
void dumpStatus(std::ostream *out=(std::ostream *)&(std::cout)) const
void registerConsumer(const std::string &bufferUID, DataConsumer *consumer)
The data manager becomes the owner of the consumer object!
void destroyBuffers(void)
!!!!!Delete all Buffers and all the pointers of the producers and consumers
virtual void configure(void)
State Machine Methods.
const std::string & getProcessorID(void) const
Getters.