LCOV - code coverage report
Current view: top level - artdaq/ArtModules - ArtdaqInputHelper.hh (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 492 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 387 0

            Line data    Source code
       1              : #ifndef ARTDAQ_ARTDAQ_ARTMODULES_ARTDAQINPUTHELPER_HH_
       2              : #define ARTDAQ_ARTDAQ_ARTMODULES_ARTDAQINPUTHELPER_HH_
       3              : 
       4              : #include "TRACE/tracemf.h"  // Pre-empt TRACE/trace.h from Fragment.hh.
       5              : 
       6              : #include "artdaq-core/Data/Fragment.hh"
       7              : #include "artdaq/ArtModules/InputUtilities.hh"
       8              : 
       9              : #include "artdaq-core/Data/MetadataFragment.hh"
      10              : #include "artdaq-core/Data/detail/ParentageMap.hh"
      11              : #include "artdaq-core/Utilities/TimeUtils.hh"
      12              : #include "artdaq/ArtModules/ArtdaqFragmentNamingService.h"
      13              : #include "artdaq/ArtModules/ArtdaqSharedMemoryServiceInterface.h"
      14              : #include "artdaq/DAQdata/Globals.hh"
      15              : #include "artdaq/DAQdata/NetMonHeader.hh"
      16              : 
      17              : #include "art/Framework/Core/FileBlock.h"
      18              : #include "art/Framework/Core/ProductRegistryHelper.h"
      19              : #include "art/Framework/IO/Sources/SourceHelper.h"
      20              : #include "art/Framework/IO/Sources/put_product_in_principal.h"
      21              : #include "art/Framework/Principal/EventPrincipal.h"
      22              : #include "art/Framework/Principal/RunPrincipal.h"
      23              : #include "art/Framework/Principal/SubRunPrincipal.h"
      24              : #include "art/Framework/Services/Registry/ServiceHandle.h"
      25              : 
      26              : #include "art/Persistency/Provenance/ProcessHistoryRegistry.h"
      27              : #include "art_root_io/setup.h"
      28              : 
      29              : #include "canvas/Persistency/Common/EDProduct.h"
      30              : #include "canvas/Persistency/Provenance/BranchDescription.h"
      31              : #include "canvas/Persistency/Provenance/BranchKey.h"
      32              : #include "canvas/Persistency/Provenance/FileFormatVersion.h"
      33              : #if ART_HEX_VERSION < 0x31100
      34              : #include "canvas/Persistency/Provenance/History.h"
      35              : #else
      36              : #include "canvas/Persistency/Provenance/Compatibility/History.h"
      37              : #endif
      38              : #include "canvas/Persistency/Provenance/ParentageRegistry.h"
      39              : #include "canvas/Persistency/Provenance/ProcessHistory.h"
      40              : #include "canvas/Persistency/Provenance/ProcessHistoryID.h"
      41              : #include "canvas/Persistency/Provenance/ProductList.h"
      42              : #include "canvas/Persistency/Provenance/ProductProvenance.h"
      43              : 
      44              : #include "fhiclcpp/ParameterSet.h"
      45              : #include "fhiclcpp/ParameterSetID.h"
      46              : #include "fhiclcpp/ParameterSetRegistry.h"
      47              : 
      48              : #include <TBufferFile.h>
      49              : #include <TClass.h>
      50              : #include <TList.h>
      51              : #include <TStreamerInfo.h>
      52              : 
      53              : #include <sys/time.h>
      54              : #include <chrono>
      55              : #include <cstdio>
      56              : #include <iomanip>
      57              : #include <iostream>
      58              : #include <list>
      59              : #include <map>
      60              : #include <memory>
      61              : #include <set>
      62              : #include <sstream>
      63              : #include <string>
      64              : #include <unordered_map>
      65              : #include <utility>
      66              : #include <vector>
      67              : 
      68              : #define CAN_REINIT 0
      69              : 
      70              : namespace art {
      71              : template<typename U>
      72              : class ArtdaqInputHelper;
      73              : }  // namespace art
      74              : 
      75              : /**
      76              :  * \brief This template class provides a unified interface for reading data into art
      77              :  * \tparam U The class responsible for delivering data
      78              :  *
      79              :  * JCF, May-27-2016
      80              :  * ArtdaqInputHelper is a template class which takes, as a parameter, a
      81              :  * class which it uses to receive data; the instance of this class is
      82              :  * called "communicationWrapper_". As of this writing, this wrapper
      83              :  * class is implemented by NetMonWrapper (for reading data into the
      84              :  * aggregator from the eventbuilder) and TransferWrapper (for reading
      85              :  * data into an art process). This class presents a unified approach
      86              :  * to handling art provenance, regardless of the communication
      87              :  * protocol used to read data in.
      88              :  */
      89              : template<typename U>
      90              : class art::ArtdaqInputHelper
      91              : {
      92              : public:
      93              :         /**
      94              :          * \brief Copy Constructor is deleted
      95              :          */
      96              :         ArtdaqInputHelper(const ArtdaqInputHelper&) = delete;
      97              : 
      98              :         /**
      99              :          * \brief Copy Assignment operator is deleted
     100              :          * \return ArtdaqInputHelper copy
     101              :          */
     102              :         ArtdaqInputHelper& operator=(const ArtdaqInputHelper&) = delete;
     103              : 
     104              :         /**
     105              :          * \brief ArtdaqInputHelper Destructor
     106              :          */
     107              :         ~ArtdaqInputHelper();
     108              : 
     109              :         /**
     110              :          * \brief ArtdaqInputHelper Constructor
     111              :          * \param ps ParameterSet used to confiugre communication wrapper class
     112              :          * \param helper An art::ProductRegistryHelper for registering products
     113              :          * \param pm An art::SourceHelper for handling provenance
     114              :          */
     115              :         ArtdaqInputHelper(const fhicl::ParameterSet& ps, art::ProductRegistryHelper& helper, art::SourceHelper const& pm);
     116              : 
     117              :         /**
     118              :          * \brief Called by art to close the input source. No-Op
     119              :          */
     120              :         void closeCurrentFile();
     121              : 
     122              :         /**
     123              :          * \brief Emulate reading a file
     124              :          * \param fb Output art::FileBlock object
     125              :          */
     126              :         void readFile(const std::string&, art::FileBlock*& fb);
     127              : 
     128              :         /**
     129              :          * \brief Whether additional events are expected from the source
     130              :          * \return True if ArtdaqInputHelper has not been shut down
     131              :          */
     132              :         bool hasMoreData() const;
     133              : 
     134              :         /**
     135              :          * \brief Read the next event from the communication wrapper
     136              :          * \param inR RunPrincipal input pointer
     137              :          * \param inSR SubRunPrincipal input pointer
     138              :          * \param outR RunPrincipal output pointer
     139              :          * \param outSR SubRunPrincipal output pointer
     140              :          * \param outE EventPrincipal output pointer
     141              :          * \return Whether an event was successfully read from the communication wrapper
     142              :          */
     143              :         bool readNext(art::RunPrincipal* const inR, art::SubRunPrincipal* const inSR, art::RunPrincipal*& outR,
     144              :                       art::SubRunPrincipal*& outSR, art::EventPrincipal*& outE);
     145              : 
     146              : private:
     147              :         ArtdaqInputHelper(ArtdaqInputHelper&&) = delete;
     148              :         ArtdaqInputHelper& operator=(ArtdaqInputHelper&&) = delete;
     149              : 
     150              :         void readAndConstructPrincipal(std::unique_ptr<TBufferFile>&, artdaq::NetMonHeader::MessageType, art::RunPrincipal* const,
     151              :                                        art::SubRunPrincipal* const, art::RunPrincipal*&, art::SubRunPrincipal*&,
     152              :                                        art::EventPrincipal*&);
     153              : 
     154              :         bool constructPrincipal(std::shared_ptr<ArtdaqEvent>, art::RunPrincipal* const,
     155              :                                 art::SubRunPrincipal* const, art::RunPrincipal*&, art::SubRunPrincipal*&,
     156              :                                 art::EventPrincipal*&);
     157              : 
     158              :         template<class T>
     159              :         void readDataProducts(std::list<std::unique_ptr<TBufferFile>>&, T* const&);
     160              : 
     161              :         void putInPrincipal(RunPrincipal* const&, std::unique_ptr<EDProduct>&&, const BranchDescription&,
     162              :                             std::unique_ptr<const ProductProvenance>&&);
     163              : 
     164              :         void putInPrincipal(SubRunPrincipal* const&, std::unique_ptr<EDProduct>&&, const BranchDescription&,
     165              :                             std::unique_ptr<const ProductProvenance>&&);
     166              : 
     167              :         void putInPrincipal(EventPrincipal* const&, std::unique_ptr<EDProduct>&&, const BranchDescription&,
     168              :                             std::unique_ptr<const ProductProvenance>&&);
     169              : 
     170              :         std::pair<bool, bool> readFragments(std::unordered_map<artdaq::Fragment::type_t, std::unique_ptr<artdaq::Fragments>> const& eventMap, art::RunPrincipal* const theRun, art::SubRunPrincipal* const theSubRun, art::EventPrincipal* const theEvent);
     171              : 
     172              :         void readInitMessage();
     173              : 
     174              :         bool shutdownMsgReceived_;
     175              :         art::SourceHelper const& pm_;
     176              :         art::ProductRegistryHelper& helper_;
     177              :         U communicationWrapper_;
     178              :         ProductList* productList_;
     179              :         std::unique_ptr<art::History> history_to_use_;
     180              :         bool fragmentsOnlyMode_;
     181              :         std::string pretend_module_name;                       ///< The module name to store data under
     182              :         size_t bytesRead;                                      ///< running total of number of bytes received
     183              :         std::chrono::steady_clock::time_point last_read_time;  ///< Time last read was completed
     184              : };
     185              : 
     186              : template<typename U>
     187            0 : art::ArtdaqInputHelper<U>::ArtdaqInputHelper(const fhicl::ParameterSet& ps, art::ProductRegistryHelper& helper,
     188              :                                              art::SourceHelper const& pm)
     189            0 :     : shutdownMsgReceived_(false)
     190            0 :     , pm_(pm)
     191            0 :     , helper_(helper)
     192            0 :     , communicationWrapper_(ps)
     193            0 :     , productList_()
     194            0 :     , fragmentsOnlyMode_(false)
     195            0 :     , pretend_module_name(ps.get<std::string>("raw_data_label", "daq"))
     196            0 :     , bytesRead(0)
     197            0 :     , last_read_time(std::chrono::steady_clock::now())
     198              : {
     199            0 :         root::setup();
     200              :         // Instantiate ArtdaqSharedMemoryService to set up artdaq Globals and MetricManager
     201            0 :         art::ServiceHandle<ArtdaqSharedMemoryServiceInterface> shm;
     202              : 
     203              : #if 0
     204              :         volatile bool loop = true;
     205              :         while (loop)
     206              :         {
     207              :                 usleep(1000);
     208              :         }
     209              : #endif
     210              : 
     211              :         // JCF, May-27-2016
     212              : 
     213              :         // Something will have to be done about the labeling of this class,
     214              :         // since it's just a template class- the user will care about the
     215              :         // specific instantiation when it comes to messages
     216              : 
     217            0 :         TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "Begin: ArtdaqInputHelper::ArtdaqInputHelper("
     218            0 :                                                    << "const fhicl::ParameterSet& ps, "
     219            0 :                                                    << "art::ProductRegistryHelper& helper, "
     220            0 :                                                    << "const art::SourceHelper& pm)";
     221            0 :         readInitMessage();
     222              : 
     223            0 :         helper.reconstitutes<artdaq::detail::RawEventHeader, art::InEvent>(pretend_module_name, "RawEventHeader");
     224              : 
     225            0 :         if (ps.get<bool>("register_fragment_types", true))
     226              :         {
     227            0 :                 TLOG(TLVL_DEBUG + 32, "ArtdaqInputHelper") << "Registering known Fragment labels from ArtdaqFragmentNamingServiceInterface";
     228              : 
     229            0 :                 art::ServiceHandle<ArtdaqFragmentNamingServiceInterface> translator;
     230            0 :                 helper.reconstitutes<artdaq::Fragments, art::InEvent>(pretend_module_name, translator->GetUnidentifiedInstanceName());
     231              :                 // Workaround for #22979
     232            0 :                 helper.reconstitutes<artdaq::Fragments, art::InRun>(pretend_module_name, translator->GetUnidentifiedInstanceName());
     233            0 :                 helper.reconstitutes<artdaq::Fragments, art::InSubRun>(pretend_module_name, translator->GetUnidentifiedInstanceName());
     234              : 
     235            0 :                 helper.reconstitutes<std::vector<artdaq::ArtdaqMetadata>, art::InRun>(pretend_module_name, translator->GetUnidentifiedInstanceName());
     236            0 :                 helper.reconstitutes<std::vector<artdaq::ArtdaqMetadata>, art::InSubRun>(pretend_module_name, translator->GetUnidentifiedInstanceName());
     237            0 :                 helper.reconstitutes<std::vector<artdaq::ArtdaqMetadata>, art::InRun>(pretend_module_name, "StartOfRun");
     238            0 :                 helper.reconstitutes<std::vector<artdaq::ArtdaqMetadata>, art::InRun>(pretend_module_name, "EndOfRun");
     239            0 :                 helper.reconstitutes<std::vector<artdaq::ArtdaqMetadata>, art::InSubRun>(pretend_module_name, "StartOfSubrun");
     240            0 :                 helper.reconstitutes<std::vector<artdaq::ArtdaqMetadata>, art::InSubRun>(pretend_module_name, "EndOfSubrun");
     241              : 
     242            0 :                 std::set<std::string> instance_names = translator->GetAllProductInstanceNames();
     243            0 :                 for (const auto& set_iter : instance_names)
     244              :                 {
     245            0 :                         helper.reconstitutes<artdaq::Fragments, art::InEvent>(pretend_module_name, set_iter);
     246              :                 }
     247            0 :         }
     248              :         //
     249              :         //  Finished with init message.
     250              :         //
     251            0 :         TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "End:   ArtdaqInputHelper::ArtdaqInputHelper("
     252            0 :                                                    << "const fhicl::ParameterSet& ps, "
     253            0 :                                                    << "art::ProductRegistryHelper& helper, "
     254            0 :                                                    << "const art::SourceHelper& pm)";
     255            0 : }
     256              : 
     257              : template<typename U>
     258            0 : void art::ArtdaqInputHelper<U>::readInitMessage()
     259              : {
     260            0 :         TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "Going to receive init message";
     261            0 :         artdaq::FragmentPtrs initFrags = communicationWrapper_.receiveInitMessage();
     262            0 :         TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "Init message received";
     263              : 
     264            0 :         if (!initFrags.empty() && initFrags.front().get()->type() == artdaq::Fragment::EndOfDataFragmentType)
     265              :         {
     266            0 :                 TLOG_ERROR("ArtdaqInputHelper") << "Received EndOfData as first broadcast! This process neveer received any data!";
     267            0 :                 shutdownMsgReceived_ = true;
     268              :         }
     269              :         else
     270              :         {
     271            0 :                 if (initFrags.empty() || initFrags.back().get()->dataSize() == 0)
     272              :                 {
     273            0 :                         TLOG(TLVL_DEBUG + 32, "ArtdaqInputHelper") << "No init message received or zero-size init message: Fragments-only mode activated! This is an EventBuilder!";
     274            0 :                         fragmentsOnlyMode_ = true;
     275              :                 }
     276              :                 else
     277              :                 {
     278            0 :                         std::list<std::unique_ptr<TBufferFile>> msgs;
     279            0 :                         for (auto& initFrag : initFrags)
     280              :                         {
     281            0 :                                 auto header = initFrag->metadata<artdaq::NetMonHeader>();
     282            0 :                                 msgs.emplace_back(new TBufferFile(TBuffer::kRead, header->data_length, initFrag->dataBegin(), kFALSE, nullptr));
     283              :                         }
     284              : 
     285            0 :                         std::set<art::ProcessHistoryID> history_ids;
     286              : 
     287            0 :                         for (auto& msg : msgs)
     288              :                         {
     289              :                                 // This first unsigned long is the message type code, ignored here in the constructor
     290            0 :                                 ULong_t dummy = 0;
     291            0 :                                 msg->ReadULong(dummy);
     292              : 
     293              :                                 // ELF: 6/11/2019: This code is taken from TSocket::RecvStreamerInfos
     294            0 :                                 auto list = dynamic_cast<TList*>(msg->ReadObject(TList::Class()));
     295              : 
     296            0 :                                 TIter next(list);
     297              :                                 TStreamerInfo* info;
     298            0 :                                 TObjLink* lnk = list->FirstLink();
     299              :                                 // First call BuildCheck for regular class
     300            0 :                                 while (lnk)
     301              :                                 {
     302            0 :                                         info = dynamic_cast<TStreamerInfo*>(lnk->GetObject());
     303            0 :                                         TObject* element = info->GetElements()->UncheckedAt(0);
     304            0 :                                         Bool_t isstl = element && strcmp("This", element->GetName()) == 0;
     305            0 :                                         if (!isstl)
     306              :                                         {
     307            0 :                                                 info->BuildCheck();
     308            0 :                                                 TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: importing TStreamerInfo: " << info->GetName() << ", version = " << info->GetClassVersion();
     309              :                                         }
     310            0 :                                         lnk = lnk->Next();
     311              :                                 }
     312              :                                 // Then call BuildCheck for stl class
     313            0 :                                 lnk = list->FirstLink();
     314            0 :                                 while (lnk)
     315              :                                 {
     316            0 :                                         info = dynamic_cast<TStreamerInfo*>(lnk->GetObject());
     317            0 :                                         TObject* element = info->GetElements()->UncheckedAt(0);
     318            0 :                                         Bool_t isstl = element && strcmp("This", element->GetName()) == 0;
     319            0 :                                         if (isstl)
     320              :                                         {
     321            0 :                                                 info->BuildCheck();
     322            0 :                                                 TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: importing TStreamerInfo: " << info->GetName() << ", version = " << info->GetClassVersion();
     323              :                                         }
     324            0 :                                         lnk = lnk->Next();
     325              :                                 }
     326              :                                 // ELF: 6/11/2019: End TSocket snippet
     327              : 
     328              :                                 //
     329              :                                 //  Read the ParameterSetRegistry.
     330              :                                 //
     331            0 :                                 ULong_t ps_cnt = 0;
     332            0 :                                 msg->ReadULong(ps_cnt);
     333            0 :                                 TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: parameter set count: " << ps_cnt;
     334            0 :                                 TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: reading parameter sets ...";
     335            0 :                                 for (ULong_t I = 0; I < ps_cnt; ++I)
     336              :                                 {
     337            0 :                                         std::string pset_str = "";  // = ReadObjectAny<std::string>(msg, "std::string", "ArtdaqInputHelper::ArtdaqInputHelper");
     338            0 :                                         msg->ReadStdString(pset_str);
     339              : 
     340            0 :                                         TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: parameter set: " << pset_str;
     341              : 
     342            0 :                                         fhicl::ParameterSet pset;
     343            0 :                                         pset = fhicl::ParameterSet::make(pset_str);
     344              :                                         // Force id calculation.
     345            0 :                                         pset.id();
     346            0 :                                         fhicl::ParameterSetRegistry::put(pset);
     347              :                                 }
     348            0 :                                 TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: finished reading parameter sets.";
     349              : 
     350              :                                 //
     351              :                                 //  Read the MasterProductRegistry.
     352              :                                 //
     353            0 :                                 auto thisProductList = ReadObjectAny<art::ProductList>(
     354              :                                     msg, "std::map<art::BranchKey,art::BranchDescription>", "ArtdaqInputHelper::ArtdaqInputHelper");
     355            0 :                                 TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: Input Product list sz=" << thisProductList->size();
     356              : 
     357            0 :                                 bool productListInitialized = productList_ != nullptr;
     358            0 :                                 if (!productListInitialized) productList_ = thisProductList;
     359            0 :                                 for (auto I = thisProductList->begin(), E = thisProductList->end(); I != E; ++I)
     360              :                                 {
     361              : #ifndef __OPTIMIZE__
     362            0 :                                         TLOG(TLVL_DEBUG + 50, "ArtdaqInputHelper") << "Branch key: class: '" << I->first.friendlyClassName_ << "' modlbl: '"
     363            0 :                                                                                    << I->first.moduleLabel_ << "' instnm: '" << I->first.productInstanceName_ << "' procnm: '"
     364            0 :                                                                                    << I->first.processName_ << "', branch description name: " << I->second.wrappedName();
     365              : #endif
     366            0 :                                         if (productListInitialized)
     367              :                                         {
     368            0 :                                                 productList_->emplace(*I);
     369              :                                         }
     370              :                                 }
     371              : 
     372            0 :                                 TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: Reading ProcessHistory";
     373            0 :                                 auto phm = ReadObjectAny<art::ProcessHistoryMap>(
     374              :                                     msg, "std::map<const art::Hash<2>,art::ProcessHistory>", "ArtdaqInputHelper::ArtdaqInputHelper");
     375            0 :                                 printProcessMap(*phm, "ArtdaqInputHelper's ProcessHistoryMap");
     376              : 
     377            0 :                                 for (auto& proc_hist : *phm)
     378              :                                 {
     379            0 :                                         history_ids.insert(proc_hist.second.id());
     380              :                                 }
     381              : 
     382            0 :                                 ProcessHistoryRegistry::put(*phm);
     383            0 :                                 printProcessMap(ProcessHistoryRegistry::get(), "ArtdaqInputHelper's ProcessHistoryRegistry");
     384              : 
     385              :                                 //
     386              :                                 //  Read the ParentageRegistry.
     387              :                                 //
     388            0 :                                 TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: Reading ParentageMap";
     389            0 :                                 auto parentageMap = ReadObjectAny<ParentageMap>(msg, "art::ParentageMap", "ArtdaqInputHelper::ArtdaqInputHelper");
     390            0 :                                 ParentageRegistry::put(*parentageMap);
     391              :                         }
     392              : 
     393              :                         // We're going to make a fake History using the collected process histories!
     394            0 :                         art::ProcessHistory fake_process_history;
     395            0 :                         for (auto& hist : history_ids)
     396              :                         {
     397            0 :                                 if (!hist.isValid())
     398              :                                 {
     399            0 :                                         TLOG(TLVL_WARNING, "ArtdaqInputHelper") << "Encountered invalid history ID!";
     400            0 :                                         continue;
     401            0 :                                 }
     402              : 
     403            0 :                                 ProcessHistory thisProcessHistory;
     404            0 :                                 if (ProcessHistoryRegistry::get(hist, thisProcessHistory))
     405              :                                 {
     406            0 :                                         for (auto& conf : thisProcessHistory)
     407              :                                         {
     408            0 :                                                 if (auto e = fake_process_history.end();
     409            0 :                                                     std::find(fake_process_history.begin(), e, conf) == e)
     410              :                                                 {
     411            0 :                                                         fake_process_history.push_back(conf);
     412              :                                                 }
     413              :                                         }
     414              :                                 }
     415              :                         }
     416            0 :                         art::ProcessHistoryMap fake_process_history_map;
     417            0 :                         fake_process_history_map[fake_process_history.id()] = fake_process_history;
     418            0 :                         ProcessHistoryRegistry::put(fake_process_history_map);
     419            0 :                         printProcessMap(ProcessHistoryRegistry::get(), "ArtdaqInputHelper's ProcessHistoryRegistry w/fake history");
     420              : 
     421            0 :                         history_to_use_.reset(new History());
     422            0 :                         history_to_use_->setProcessHistoryID(fake_process_history.id());
     423              : 
     424            0 :                         TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper")
     425            0 :                             << "ArtdaqInputHelper: Product list sz=" << productList_->size();
     426              : #if 0
     427              :                         for (auto I = productList_->begin(), E = productList_->end(); I != E; ++I)
     428              :                         {
     429              :                                 TLOG(TLVL_DEBUG + 50, "ArtdaqInputHelper") << "Branch key: class: '" << I->first.friendlyClassName_ << "' modlbl: '"
     430              :                                                                            << I->first.moduleLabel_ << "' instnm: '" << I->first.productInstanceName_ << "' procnm: '"
     431              :                                                                            << I->first.processName_ << "', branch description name: " << I->second.wrappedName();
     432              :                         }
     433              : #endif
     434              :                         // helper now owns productList_!
     435              : 
     436            0 :                         helper_.productList(std::unique_ptr<art::ProductList>(productList_));
     437              : 
     438            0 :                         TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: got product list";
     439            0 :                 }
     440              :         }
     441            0 : }
     442              : 
     443              : template<typename U>
     444            0 : art::ArtdaqInputHelper<U>::~ArtdaqInputHelper()
     445            0 : {}
     446              : 
     447              : template<typename U>
     448            0 : void art::ArtdaqInputHelper<U>::closeCurrentFile()
     449              : {
     450            0 :         TLOG(TLVL_DEBUG + 34, "ArtdaqInputHelper") << "Begin/End: ArtdaqInputHelper::closeCurrentFile()";
     451            0 : }
     452              : 
     453              : template<typename U>
     454            0 : void art::ArtdaqInputHelper<U>::readFile(const std::string&, art::FileBlock*& fb)
     455              : {
     456            0 :         TLOG(TLVL_DEBUG + 35, "ArtdaqInputHelper") << "Begin: ArtdaqInputHelper::"
     457              :                                                       "readFile(const std::string& name, art::FileBlock*& fb)";
     458            0 :         fb = new art::FileBlock(art::FileFormatVersion(1, "ArtdaqInputHelper2013"), "");
     459            0 :         TLOG(TLVL_DEBUG + 35, "ArtdaqInputHelper") << "End:   ArtdaqInputHelper::"
     460              :                                                       "readFile(const std::string& name, art::FileBlock*& fb)";
     461            0 : }
     462              : 
     463              : template<typename U>
     464            0 : bool art::ArtdaqInputHelper<U>::hasMoreData() const
     465              : {
     466            0 :         TLOG(TLVL_DEBUG + 36, "ArtdaqInputHelper") << "Begin: ArtdaqInputHelper::hasMoreData()";
     467            0 :         if (shutdownMsgReceived_)
     468              :         {
     469            0 :                 TLOG(TLVL_DEBUG + 36, "ArtdaqInputHelper") << "ArtdaqInputHelper::hasMoreData(): "
     470              :                                                               "returning false on shutdownMsgReceived_.";
     471            0 :                 TLOG(TLVL_DEBUG + 36, "ArtdaqInputHelper") << "End:   ArtdaqInputHelper::hasMoreData()";
     472            0 :                 return false;
     473              :         }
     474            0 :         TLOG(TLVL_DEBUG + 32, "ArtdaqInputHelper") << "ArtdaqInputHelper::hasMoreData(): "
     475              :                                                       "returning true on not shutdownMsgReceived_.";
     476            0 :         TLOG(TLVL_DEBUG + 36, "ArtdaqInputHelper") << "End:   ArtdaqInputHelper::hasMoreData()";
     477            0 :         return true;
     478              : }
     479              : 
     480              : template<typename U>
     481            0 : void art::ArtdaqInputHelper<U>::readAndConstructPrincipal(std::unique_ptr<TBufferFile>& msg, artdaq::NetMonHeader::MessageType msg_type_code,
     482              :                                                           art::RunPrincipal* const inR, art::SubRunPrincipal* const inSR,
     483              :                                                           art::RunPrincipal*& outR, art::SubRunPrincipal*& outSR,
     484              :                                                           art::EventPrincipal*& outE)
     485              : {
     486              :         //
     487              :         //  Process the message.
     488              :         //
     489            0 :         std::unique_ptr<art::RunAuxiliary> run_aux;
     490            0 :         std::unique_ptr<art::SubRunAuxiliary> subrun_aux;
     491            0 :         std::unique_ptr<art::EventAuxiliary> event_aux;
     492              : 
     493              :         // Establish default 'results'
     494            0 :         outR = nullptr;
     495            0 :         outSR = nullptr;
     496            0 :         outE = nullptr;
     497              : 
     498            0 :         art::Timestamp currentTime = 0;
     499              :         timespec hi_res_time;
     500            0 :         int retcode = clock_gettime(CLOCK_REALTIME, &hi_res_time);
     501            0 :         TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "hi_res_time tv_sec = " << hi_res_time.tv_sec
     502            0 :                                                    << " tv_nsec = " << hi_res_time.tv_nsec << " (retcode = " << retcode << ")";
     503            0 :         if (retcode == 0)
     504              :         {
     505            0 :                 currentTime = ((hi_res_time.tv_sec & 0xffffffff) << 32) | (hi_res_time.tv_nsec & 0xffffffff);
     506              :         }
     507              :         else
     508              :         {
     509            0 :                 TLOG_ERROR("ArtdaqInputHelper")
     510            0 :                     << "Unable to fetch a high-resolution time with clock_gettime for art::SubRun Timestamp. ";
     511              :         }
     512              : 
     513            0 :         TLOG(TLVL_DEBUG + 37, "ArtdaqInputHelper") << "inR: " << static_cast<void*>(inR) << " run " << (inR ? std::to_string(inR->run()) : "invalid")
     514            0 :                                                    << ", inSR: " << static_cast<void*>(inSR) << " run " << (inSR ? std::to_string(inSR->run()) : "invalid")
     515            0 :                                                    << ", subrun " << (inSR ? std::to_string(inSR->subRun()) : "invalid");
     516              : 
     517              :         // Process Run Aux
     518            0 :         TLOG(TLVL_DEBUG + 37, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
     519            0 :                                                    << "processing Run auxiliary ...";
     520              : 
     521            0 :         run_aux.reset(ReadObjectAny<art::RunAuxiliary>(msg, "art::RunAuxiliary", "ArtdaqInputHelper::readAndConstructPrincipal"));
     522            0 :         run_aux->setProcessHistoryID(history_to_use_->processHistoryID());
     523            0 :         printProcessHistoryID("readAndConstructPrincipal", run_aux.get());
     524              : 
     525            0 :         TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
     526            0 :                                                    << "inR: " << static_cast<void*>(inR) << " run/expected "
     527            0 :                                                    << (inR ? std::to_string(inR->run()) : "invalid") << "/" << run_aux->run();
     528              : 
     529            0 :         if ((inR == nullptr) || !inR->runID().isValid() || (inR->run() != run_aux->run()))
     530              :         {
     531              :                 // New run, either we have no input RunPrincipal, or the
     532              :                 // input run number does not match the run number.
     533            0 :                 TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: making RunPrincipal ...";
     534            0 :                 outR = pm_.makeRunPrincipal(*run_aux);
     535              :         }
     536              : 
     537            0 :         TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
     538            0 :                                                    << "finished processing Run auxiliary.";
     539              : 
     540            0 :         if (msg_type_code != artdaq::NetMonHeader::MessageType::Run)  // SubRun or Event
     541              :         {
     542            0 :                 TLOG(TLVL_DEBUG + 38, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
     543            0 :                                                            << "processing SubRun auxiliary ...";
     544              : 
     545            0 :                 subrun_aux.reset(
     546            0 :                     ReadObjectAny<art::SubRunAuxiliary>(msg, "art::SubRunAuxiliary", "ArtdaqInputHelper::readAndConstructPrincipal"));
     547            0 :                 printProcessHistoryID("readAndConstructPrincipal", subrun_aux.get());
     548              : 
     549              :                 // HACK! Make the SR PHID match!
     550            0 :                 printProcessHistoryID("readAndConstructPrincipal", subrun_aux.get());
     551            0 :                 subrun_aux->setProcessHistoryID(run_aux->processHistoryID());
     552            0 :                 printProcessHistoryID("readAndConstructPrincipal", subrun_aux.get());
     553              : 
     554            0 :                 TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
     555            0 :                                                            << "inSR: " << static_cast<void*>(inSR) << " run/expected "
     556            0 :                                                            << (inSR ? std::to_string(inSR->run()) : "invalid") << "/" << subrun_aux->run()
     557            0 :                                                            << ", subrun/expected " << (inSR ? std::to_string(inSR->subRun()) : "invalid") << "/"
     558            0 :                                                            << subrun_aux->subRun();
     559              : 
     560            0 :                 art::SubRunID subrun_check(subrun_aux->run(), subrun_aux->subRun());
     561            0 :                 if (inSR == nullptr || !inSR->subRunID().isValid() || subrun_check != inSR->subRunID())
     562              :                 {
     563              :                         // New SubRun, either we have no input SubRunPrincipal, or the
     564              :                         // input subRun number does not match the subRun number.
     565            0 :                         TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
     566            0 :                                                                    << "making SubRunPrincipal ...";
     567            0 :                         outSR = pm_.makeSubRunPrincipal(*subrun_aux);
     568              :                 }
     569              : 
     570            0 :                 TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
     571            0 :                                                            << "finished processing SubRun auxiliary.";
     572              :         }
     573              : 
     574            0 :         if (msg_type_code == artdaq::NetMonHeader::MessageType::Event)
     575              :         {  // Event message.
     576              : 
     577            0 :                 TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
     578            0 :                                                            << "processing Event auxiliary ...";
     579              : 
     580            0 :                 event_aux.reset(
     581            0 :                     ReadObjectAny<art::EventAuxiliary>(msg, "art::EventAuxiliary", "ArtdaqInputHelper::readAndConstructPrincipal"));
     582              : 
     583            0 :                 TLOG(TLVL_DEBUG + 34, "ArtdaqInputHelper") << "readAndConstructPrincipal: making EventPrincipal ...";
     584              : #if ART_HEX_VERSION < 0x31100
     585              :                 auto historyPtr = std::unique_ptr<art::History>(new History(*(history_to_use_.get())));
     586              :                 if (!art::ProcessHistoryRegistry::get().count(history_to_use_->processHistoryID()))
     587              :                 {
     588              :                         TLOG_WARNING("ArtdaqInputHelper") << "Stored history is not in ProcessHistoryRegistry, this event may have issues!";
     589              :                 }
     590              :                 outE = pm_.makeEventPrincipal(*event_aux, std::move(historyPtr));
     591              : #else
     592            0 :                 if (!art::ProcessHistoryRegistry::get().count(history_to_use_->processHistoryID()))
     593              :                 {
     594            0 :                         TLOG_WARNING("ArtdaqInputHelper") << "Stored history is not in ProcessHistoryRegistry, this event may have issues!";
     595              :                 }
     596            0 :                 event_aux->setProcessHistoryID(history_to_use_->processHistoryID());
     597            0 :                 outE = pm_.makeEventPrincipal(*event_aux);
     598              : #endif
     599            0 :                 TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
     600            0 :                                                            << "finished processing Event auxiliary.";
     601              :         }
     602            0 :         else if (msg_type_code == artdaq::NetMonHeader::MessageType::Subrun)
     603              :         {
     604            0 :                 if (outSR == nullptr)
     605              :                 {
     606            0 :                         TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "SubrunDataFragment for current Subrun received, returning Flush event";
     607            0 :                         art::EventID const evid(art::EventID::flushEvent(inSR->subRunID()));
     608            0 :                         outE = pm_.makeEventPrincipal(evid, currentTime);
     609              :                 }
     610              :         }
     611            0 :         else if (msg_type_code == artdaq::NetMonHeader::MessageType::Run)
     612              :         {
     613            0 :                 if (outR == nullptr)
     614              :                 {
     615            0 :                         TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "RunDataFragment for current Run received, returning Flush subrun/event";
     616            0 :                         art::EventID const evid(art::EventID::flushEvent(inR->runID()));
     617            0 :                         outSR = pm_.makeSubRunPrincipal(evid.subRunID(), currentTime);
     618            0 :                         outE = pm_.makeEventPrincipal(evid, currentTime);
     619              :                 }
     620              :         }
     621            0 : }
     622              : 
     623              : template<typename U>
     624            0 : bool art::ArtdaqInputHelper<U>::constructPrincipal(std::shared_ptr<ArtdaqEvent> eventPtr, art::RunPrincipal* const inR, art::SubRunPrincipal* const inSR, art::RunPrincipal*& outR, art::SubRunPrincipal*& outSR, art::EventPrincipal*& outE)
     625              : {
     626              :         // We return false, indicating we're done reading, if:
     627              :         //   1) we did not obtain an event, because we timed out and were
     628              :         //      configured NOT to keep trying after a timeout, or
     629              :         //   2) the event we read was the end-of-data marker: a null
     630              :         //      pointer
     631            0 :         if (eventPtr->FirstFragmentType() == artdaq::Fragment::EndOfDataFragmentType)
     632              :         {
     633            0 :                 TLOG(TLVL_DEBUG + 32, "ArtdaqInputHelper") << "Received shutdown message, returning false";
     634            0 :                 shutdownMsgReceived_ = true;
     635            0 :                 return false;
     636              :         }
     637              : 
     638            0 :         if (!eventPtr->header)
     639              :         {
     640            0 :                 TLOG_ERROR("ArtdaqInputHelper") << "No RawEventHeader received, cannot construct principals!";
     641            0 :                 shutdownMsgReceived_ = true;
     642            0 :                 return false;
     643              :         }
     644              : 
     645              :         // Check the number of fragments in the RawEvent.  If we have a single
     646              :         // fragment and that fragment is marked as EndRun or EndSubrun we'll create
     647              :         // the special principals for that.
     648            0 :         art::Timestamp currentTime = 0;
     649              :         timespec hi_res_time;
     650            0 :         int retcode = clock_gettime(CLOCK_REALTIME, &hi_res_time);
     651            0 :         TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "hi_res_time tv_sec = " << hi_res_time.tv_sec
     652            0 :                                                    << " tv_nsec = " << hi_res_time.tv_nsec << " (retcode = " << retcode << ")";
     653            0 :         if (retcode == 0)
     654              :         {
     655            0 :                 currentTime = ((hi_res_time.tv_sec & 0xffffffff) << 32) | (hi_res_time.tv_nsec & 0xffffffff);
     656              :         }
     657              :         else
     658              :         {
     659            0 :                 TLOG_ERROR("ArtdaqInputHelper")
     660            0 :                     << "Unable to fetch a high-resolution time with clock_gettime for art::Event Timestamp. "
     661            0 :                     << "The art::Event Timestamp will be zero for event " << eventPtr->header->event_id;
     662              :         }
     663              : 
     664              :         // make new run if inR is 0 or if the run has changed
     665            0 :         if (inR == nullptr || !inR->runID().isValid() || inR->run() != eventPtr->header->run_id)
     666              :         {
     667            0 :                 TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "Making run principal with run_id " << eventPtr->header->run_id;
     668            0 :                 outR = pm_.makeRunPrincipal(eventPtr->header->run_id, currentTime);
     669              :         }
     670              : 
     671            0 :         if (eventPtr->FirstFragmentType() == artdaq::Fragment::EndOfRunFragmentType)
     672              :         {
     673            0 :                 TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "EndOfRunFragment received, returning Flush subrun/event";
     674            0 :                 art::EventID const evid(art::EventID::flushEvent(outR != nullptr ? outR->runID() : inR->runID()));
     675            0 :                 outSR = pm_.makeSubRunPrincipal(evid.subRunID(), currentTime);
     676            0 :                 outE = pm_.makeEventPrincipal(evid, currentTime);
     677            0 :                 return true;
     678              :         }
     679              : 
     680              :         // make new subrun if inSR is 0 or if the subrun has changed
     681            0 :         art::SubRunID subrun_check(eventPtr->header->run_id, eventPtr->header->subrun_id);
     682            0 :         if (inSR == nullptr || !inSR->subRunID().isValid() || subrun_check != inSR->subRunID())
     683              :         {
     684            0 :                 TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "Making subrun principal with subrun_id " << eventPtr->header->subrun_id;
     685            0 :                 outSR = pm_.makeSubRunPrincipal(eventPtr->header->run_id, eventPtr->header->subrun_id, currentTime);
     686              :         }
     687              : 
     688            0 :         if (eventPtr->FirstFragmentType() == artdaq::Fragment::EndOfSubrunFragmentType)
     689              :         {
     690            0 :                 TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "EndOfSubrunFragment received, returning Flush event";
     691            0 :                 art::EventID const evid(art::EventID::flushEvent(outSR != nullptr ? outSR->subRunID() : inSR->subRunID()));
     692            0 :                 outE = pm_.makeEventPrincipal(evid, currentTime);
     693            0 :                 return true;
     694              :         }
     695              : 
     696            0 :         TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "Making event principal with event_id " << eventPtr->header->event_id;
     697            0 :         outE = pm_.makeEventPrincipal(eventPtr->header->run_id, eventPtr->header->subrun_id, eventPtr->header->event_id, currentTime);
     698            0 :         return true;
     699              : }
     700              : 
     701              : template<typename U>
     702              : template<class T>
     703            0 : void art::ArtdaqInputHelper<U>::readDataProducts(std::list<std::unique_ptr<TBufferFile>>& msgs, T* const& outPrincipal)
     704              : {
     705            0 :         for (auto& msg : msgs)
     706              :         {
     707            0 :                 ULong_t prd_cnt = 0;
     708              :                 {
     709            0 :                         TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: reading data product count ...";
     710            0 :                         msg->ReadULong(prd_cnt);
     711            0 :                         TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: product count: " << prd_cnt;
     712              :                 }
     713              :                 //
     714              :                 //  Read the data products.
     715              :                 //
     716            0 :                 for (ULong_t I = 0; I < prd_cnt; ++I)
     717              :                 {
     718            0 :                         std::unique_ptr<BranchKey> bk;
     719              :                         {
     720            0 :                                 TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: Reading branch key.";
     721            0 :                                 bk.reset(ReadObjectAny<BranchKey>(msg, "art::BranchKey", "ArtdaqInputHelper::readDataProducts"));
     722              :                         }
     723              : 
     724              : #ifndef __OPTIMIZE__
     725            0 :                         TLOG(TLVL_DEBUG + 41, "ArtdaqInputHelper") << "readDataProducts: got product class: '" << bk->friendlyClassName_ << "' modlbl: '"
     726            0 :                                                                    << bk->moduleLabel_ << "' instnm: '" << bk->productInstanceName_ << "' procnm: '"
     727            0 :                                                                    << bk->processName_;
     728              : #endif
     729            0 :                         ProductList::const_iterator iter;
     730              :                         {
     731            0 :                                 TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: looking up product ...";
     732            0 :                                 iter = productList_->find(*bk);
     733            0 :                                 if (iter == productList_->end())
     734              :                                 {
     735            0 :                                         throw art::Exception(art::errors::ProductNotFound)  // NOLINT(cert-err60-cpp)
     736              :                                             << "No product is registered for\n"
     737            0 :                                             << "  process name:                '" << bk->processName_ << "'\n"
     738            0 :                                             << "  module label:                '" << bk->moduleLabel_ << "'\n"
     739            0 :                                             << "  product friendly class name: '" << bk->friendlyClassName_ << "'\n"
     740            0 :                                             << "  product instance name:       '" << bk->productInstanceName_ << "'\n";
     741              :                                 }
     742              :                         }
     743              :                         // Note: This must be a reference to the unique copy in
     744              :                         //       the master product registry!
     745            0 :                         const BranchDescription& bd = iter->second;
     746            0 :                         std::unique_ptr<EDProduct> prd;
     747              :                         {
     748            0 :                                 TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: Reading product with wrapped name: " << bd.wrappedName()
     749            0 :                                                                            << ", TClass = " << static_cast<void*>(TClass::GetClass(bd.wrappedName().c_str()));
     750              : 
     751              :                                 // JCF, May-25-2016
     752              :                                 // Currently unclear why the templatized version of ReadObjectAny doesn't work here...
     753              : 
     754              :                                 //          prd.reset(ReadObjectAny<EDProduct>(msg, bd.wrappedName()));
     755              : 
     756            0 :                                 void* p = msg->ReadObjectAny(TClass::GetClass(bd.wrappedName().c_str()));
     757            0 :                                 auto pp = reinterpret_cast<EDProduct*>(p);  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     758              : 
     759            0 :                                 TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: After ReadObjectAny(prd): p=" << p << ", EDProduct::isPresent: " << pp->isPresent();
     760            0 :                                 prd.reset(pp);
     761            0 :                                 p = nullptr;
     762              :                         }
     763            0 :                         std::unique_ptr<const ProductProvenance> prdprov;
     764              :                         {
     765            0 :                                 TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: Reading product provenance.";
     766            0 :                                 prdprov.reset(ReadObjectAny<ProductProvenance>(msg, "art::ProductProvenance", "ArtdaqInputHelper::readDataProducts"));
     767              :                         }
     768              : 
     769              :                         {
     770            0 :                                 TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: inserting product: class: '" << bd.friendlyClassName()
     771            0 :                                                                            << "' modlbl: '" << bd.moduleLabel() << "' instnm: '" << bd.productInstanceName()
     772            0 :                                                                            << "' procnm: '" << bd.processName() << "' id: '" << bd.productID() << "'";
     773            0 :                                 putInPrincipal(outPrincipal, std::move(prd), bd, std::move(prdprov));
     774              :                         }
     775              :                 }
     776              :         }
     777            0 : }
     778              : 
     779              : template<typename U>
     780            0 : void art::ArtdaqInputHelper<U>::putInPrincipal(RunPrincipal* const& rp, std::unique_ptr<EDProduct>&& prd,
     781              :                                                const BranchDescription& bd,
     782              :                                                std::unique_ptr<const ProductProvenance>&& prdprov)
     783              : {
     784            0 :         rp->put(bd, std::move(prdprov), std::move(prd), std::make_unique<RangeSet>(RangeSet::forRun(rp->runID())));
     785            0 : }
     786              : 
     787              : template<typename U>
     788            0 : void art::ArtdaqInputHelper<U>::putInPrincipal(SubRunPrincipal* const& srp, std::unique_ptr<EDProduct>&& prd,
     789              :                                                const BranchDescription& bd,
     790              :                                                std::unique_ptr<const ProductProvenance>&& prdprov)
     791              : {
     792            0 :         srp->put(bd, std::move(prdprov), std::move(prd), std::make_unique<RangeSet>(RangeSet::forSubRun(srp->subRunID())));
     793            0 : }
     794              : 
     795              : template<typename U>
     796            0 : void art::ArtdaqInputHelper<U>::putInPrincipal(EventPrincipal* const& ep, std::unique_ptr<EDProduct>&& prd,
     797              :                                                const BranchDescription& bd,
     798              :                                                std::unique_ptr<const ProductProvenance>&& prdprov)
     799              : {
     800            0 :         TLOG(TLVL_DEBUG + 42, "ArtdaqInputHelper") << "EventPrincipal size before put: " << ep->size();
     801              : 
     802            0 :         ep->put(bd, std::move(prdprov), std::move(prd), std::make_unique<RangeSet>(RangeSet::invalid()));
     803              : 
     804            0 :         TLOG(TLVL_DEBUG + 42, "ArtdaqInputHelper") << "EventPrincipal size after put: " << ep->size();
     805            0 : }
     806              : 
     807              : template<typename U>
     808            0 : std::pair<bool, bool> art::ArtdaqInputHelper<U>::readFragments(std::unordered_map<artdaq::Fragment::type_t, std::unique_ptr<artdaq::Fragments>> const& eventMap, art::RunPrincipal* const theRun, art::SubRunPrincipal* const theSubRun, art::EventPrincipal* const theEvent)
     809              : {
     810              :         // Now read in Fragments
     811            0 :         double fragmentLatency = 0;
     812            0 :         double fragmentLatencyMax = 0.0;
     813            0 :         size_t fragmentCount = 0;
     814              : 
     815            0 :         bool eventProductsRead = false;
     816            0 :         bool subrunProductsRead = false;
     817              : 
     818            0 :         art::ServiceHandle<ArtdaqFragmentNamingServiceInterface> translator;
     819              : 
     820              :         // insert the Fragments of each type into the EventPrincipal
     821            0 :         for (auto& fragmentTypePair : eventMap)
     822              :         {
     823            0 :                 auto type_code = fragmentTypePair.first;
     824            0 :                 if (artdaq::Fragment::isSystemFragmentType(type_code) && type_code != artdaq::Fragment::ContainerFragmentType && type_code != artdaq::Fragment::EmptyFragmentType)
     825              :                 {
     826            0 :                         if (type_code == artdaq::Fragment::EndOfRunFragmentType)
     827              :                         {
     828            0 :                                 std::unordered_map<std::string, std::unique_ptr<std::vector<artdaq::ArtdaqMetadata>>> metadata_coll;
     829            0 :                                 for (auto& frag : *fragmentTypePair.second)
     830              :                                 {
     831            0 :                                         artdaq::MetadataFragment mf(frag);
     832            0 :                                         auto md = mf.get_metadata();
     833              : 
     834            0 :                                         std::pair<bool, std::string> instance_name_result =
     835              :                                             translator->GetInstanceNameForFragment(frag);
     836            0 :                                         std::string label = instance_name_result.second;
     837            0 :                                         if (!instance_name_result.first)
     838              :                                         {
     839            0 :                                                 TLOG_WARNING("ArtdaqInputHelper")
     840            0 :                                                     << "UnknownFragmentType: The product instance name mapping for fragment type \"" << static_cast<int>(type_code)
     841            0 :                                                     << "\" is not known. Fragments of this "
     842            0 :                                                     << "type will be stored in the event with an instance name of \"" << label << "\".";
     843              :                                         }
     844            0 :                                         if (!metadata_coll.count(label))
     845              :                                         {
     846            0 :                                                 TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Creating output ArtdaqMetadata storage for label " << label;
     847            0 :                                                 metadata_coll[label] = std::make_unique<std::vector<artdaq::ArtdaqMetadata>>();
     848              :                                         }
     849            0 :                                         TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Adding Fragment " << frag.fragmentID() << " to storage with label " << label << " (sz=" << metadata_coll[label]->size() + 1 << ")";
     850            0 :                                         metadata_coll[label]->push_back(md);
     851              :                                 }
     852            0 :                                 for (auto& type : metadata_coll)
     853              :                                 {
     854            0 :                                         TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Adding " << type.second->size() << " ArtdaqMetadatas with label " << type.first << " to Run.";
     855            0 :                                         put_product_in_principal(std::move(type.second), *theRun, pretend_module_name, type.first);
     856              :                                 }
     857            0 :                         }
     858            0 :                         else if (type_code == artdaq::Fragment::EndOfSubrunFragmentType)
     859              :                         {
     860            0 :                                 std::unordered_map<std::string, std::unique_ptr<std::vector<artdaq::ArtdaqMetadata>>> metadata_coll;
     861            0 :                                 for (auto& frag : *fragmentTypePair.second)
     862              :                                 {
     863            0 :                                         artdaq::MetadataFragment mf(frag);
     864            0 :                                         auto md = mf.get_metadata();
     865              : 
     866            0 :                                         std::pair<bool, std::string> instance_name_result =
     867              :                                             translator->GetInstanceNameForFragment(frag);
     868            0 :                                         std::string label = instance_name_result.second;
     869            0 :                                         if (!instance_name_result.first)
     870              :                                         {
     871            0 :                                                 TLOG_WARNING("ArtdaqInputHelper")
     872            0 :                                                     << "UnknownFragmentType: The product instance name mapping for fragment type \"" << static_cast<int>(type_code)
     873            0 :                                                     << "\" is not known. Fragments of this "
     874            0 :                                                     << "type will be stored in the event with an instance name of \"" << label << "\".";
     875              :                                         }
     876            0 :                                         if (!metadata_coll.count(label))
     877              :                                         {
     878            0 :                                                 TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Creating output ArtdaqMetadata storage for label " << label;
     879            0 :                                                 metadata_coll[label] = std::make_unique<std::vector<artdaq::ArtdaqMetadata>>();
     880              :                                         }
     881            0 :                                         TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Adding Fragment " << frag.fragmentID() << " to storage with label " << label << " (sz=" << metadata_coll[label]->size() + 1 << ")";
     882            0 :                                         metadata_coll[label]->push_back(md);
     883              :                                 }
     884            0 :                                 for (auto& type : metadata_coll)
     885              :                                 {
     886            0 :                                         TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Adding " << type.second->size() << " ArtdaqMetadatas with label " << type.first << " to SubRun.";
     887            0 :                                         put_product_in_principal(std::move(type.second), *theSubRun, pretend_module_name, type.first);
     888            0 :                                         subrunProductsRead = true;
     889              :                                 }
     890            0 :                         }
     891              :                         else
     892              :                         {
     893            0 :                                 TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "Skipping system Fragment with type " << static_cast<int>(type_code) << " ( " << translator->GetInstanceNameForType(type_code) << " )";
     894              :                         }
     895            0 :                         continue;
     896            0 :                 }
     897            0 :                 TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "type is " << static_cast<int>(type_code) << ", number of fragments is " << fragmentTypePair.second->size();
     898              : 
     899            0 :                 std::unordered_map<std::string, std::unique_ptr<artdaq::Fragments>> derived_fragments;
     900            0 :                 for (auto& frag : *fragmentTypePair.second)
     901              :                 {
     902            0 :                         TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Processing Fragment with ID " << frag.fragmentID();
     903            0 :                         bytesRead += frag.sizeBytes();
     904            0 :                         auto latency_s = frag.getLatency(true);
     905            0 :                         double latency = latency_s.tv_sec + (latency_s.tv_nsec / 1000000000.0);
     906              : 
     907            0 :                         fragmentLatency += latency;
     908            0 :                         fragmentCount++;
     909            0 :                         if (latency > fragmentLatencyMax) fragmentLatencyMax = latency;
     910              : 
     911            0 :                         std::pair<bool, std::string> instance_name_result =
     912              :                             translator->GetInstanceNameForFragment(frag);
     913            0 :                         std::string label = instance_name_result.second;
     914            0 :                         if (!instance_name_result.first)
     915              :                         {
     916            0 :                                 TLOG_WARNING("ArtdaqInputHelper")
     917            0 :                                     << "UnknownFragmentType: The product instance name mapping for fragment type \"" << static_cast<int>(type_code)
     918            0 :                                     << "\" is not known. Fragments of this "
     919            0 :                                     << "type will be stored in the event with an instance name of \"" << label << "\".";
     920              :                         }
     921            0 :                         if (!derived_fragments.count(label))
     922              :                         {
     923            0 :                                 TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Creating output Fragment storage for label " << label;
     924            0 :                                 derived_fragments[label] = std::make_unique<artdaq::Fragments>();
     925              :                         }
     926            0 :                         TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Adding Fragment " << frag.fragmentID() << " to storage with label " << label << " (sz=" << derived_fragments[label]->size() + 1 << ")";
     927            0 :                         derived_fragments[label]->emplace_back(std::move(frag));
     928              :                 }
     929            0 :                 for (auto& type : derived_fragments)
     930              :                 {
     931            0 :                         TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Adding " << type.second->size() << " Fragments with label " << type.first << " to event.";
     932            0 :                         put_product_in_principal(std::move(type.second), *theEvent, pretend_module_name, type.first);
     933            0 :                         eventProductsRead = true;
     934              :                 }
     935              :         }
     936              : 
     937            0 :         if (metricMan)
     938              :         {
     939            0 :                 metricMan->sendMetric("bytesRead", bytesRead, "B", 3, artdaq::MetricMode::LastPoint);
     940              : 
     941            0 :                 metricMan->sendMetric("ArtdaqInputHelper Latency", fragmentLatency / fragmentCount, "s", 4, artdaq::MetricMode::Average);
     942            0 :                 metricMan->sendMetric("ArtdaqInputHelper Maximum Latency", fragmentLatencyMax, "s", 4, artdaq::MetricMode::Maximum);
     943              :         }
     944              : 
     945            0 :         return std::make_pair(subrunProductsRead, eventProductsRead);
     946              : }
     947              : 
     948              : template<typename U>
     949            0 : bool art::ArtdaqInputHelper<U>::readNext(art::RunPrincipal* const inR, art::SubRunPrincipal* const inSR,
     950              :                                          art::RunPrincipal*& outR, art::SubRunPrincipal*& outSR, art::EventPrincipal*& outE)
     951              : {
     952            0 :         TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "Begin: ArtdaqInputHelper::readNext";
     953              : 
     954            0 :         auto read_start_time = std::chrono::steady_clock::now();
     955              : 
     956            0 :         std::shared_ptr<ArtdaqEvent> eventMap = communicationWrapper_.receiveMessages();
     957            0 :         auto got_event_time = std::chrono::steady_clock::now();
     958              : 
     959            0 :         if (eventMap == nullptr)
     960              :         {
     961            0 :                 TLOG(TLVL_ERROR, "ArtdaqInputHelper") << "No Fragments received! Aborting...";
     962            0 :                 shutdownMsgReceived_ = true;
     963            0 :                 TLOG(TLVL_DEBUG + 45, "ArtdaqInputHelper") << "End:   ArtdaqInputHelper::readNext";
     964            0 :                 return false;
     965              :         }
     966              : 
     967            0 :         if (eventMap->FirstFragmentType() == artdaq::Fragment::EndOfDataFragmentType)
     968              :         {
     969            0 :                 TLOG(TLVL_ERROR, "ArtdaqInputHelper") << "Shutdown message received!";
     970            0 :                 shutdownMsgReceived_ = true;
     971            0 :                 TLOG(TLVL_DEBUG + 45, "ArtdaqInputHelper") << "End:   ArtdaqInputHelper::readNext";
     972            0 :                 return false;
     973              :         }
     974              : 
     975            0 :         if (eventMap->FirstFragmentType() == artdaq::Fragment::InitFragmentType)
     976              :         {
     977              : #if CAN_REINIT
     978              :                 TLOG(TLVL_INFO, "ArtdaqInputHelper") << "Additional Init Message received! Attempting to register new products...";
     979              :                 readInitMessage();
     980              : #else
     981            0 :                 TLOG(TLVL_WARNING, "ArtdaqInputHelper") << "Received additional Init Message! Check init_fragment_count configuration!";
     982              : #endif
     983            0 :                 TLOG(TLVL_DEBUG + 45, "ArtdaqInputHelper") << "End:   ArtdaqInputHelper::readNext";
     984            0 :                 return true;
     985              :         }
     986              : 
     987            0 :         if (fragmentsOnlyMode_)
     988              :         {
     989            0 :                 if (eventMap->fragments.count(artdaq::Fragment::DataFragmentType) || eventMap->fragments.count(artdaq::Fragment::RunDataFragmentType) || eventMap->fragments.count(artdaq::Fragment::SubrunDataFragmentType))
     990              :                 {
     991            0 :                         TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "ArtdaqInputHelper::readNext unexpectedly got a message with a DataFragment. This art Event will NOT be reconstructed!";
     992              :                 }
     993              : 
     994            0 :                 auto firstFragmentType = eventMap->FirstFragmentType();
     995            0 :                 TLOG(TLVL_DEBUG + 32, "ArtdaqInputHelper") << "First Fragment type is " << static_cast<int>(firstFragmentType);
     996            0 :                 if (constructPrincipal(eventMap, inR, inSR, outR, outSR, outE))
     997              :                 {
     998            0 :                         readFragments(eventMap->fragments, outR ? outR : inR, outSR ? outSR : inSR, outE);
     999              :                 }
    1000              :         }
    1001              :         else
    1002              :         {
    1003            0 :                 std::list<std::unique_ptr<TBufferFile>> msgs;
    1004            0 :                 if (eventMap->fragments.count(artdaq::Fragment::RunDataFragmentType))
    1005            0 :                         for (auto& dataFrag : *(eventMap->fragments[artdaq::Fragment::RunDataFragmentType]))
    1006              :                         {
    1007            0 :                                 if (!dataFrag.hasMetadata()) continue;
    1008            0 :                                 auto header = dataFrag.metadata<artdaq::NetMonHeader>();
    1009            0 :                                 msgs.emplace_back(new TBufferFile(TBuffer::kRead, header->data_length, dataFrag.dataBegin(), kFALSE, nullptr));
    1010              :                         }
    1011            0 :                 if (eventMap->fragments.count(artdaq::Fragment::SubrunDataFragmentType))
    1012            0 :                         for (auto& dataFrag : *(eventMap->fragments[artdaq::Fragment::SubrunDataFragmentType]))
    1013              :                         {
    1014            0 :                                 if (!dataFrag.hasMetadata()) continue;
    1015            0 :                                 auto header = dataFrag.metadata<artdaq::NetMonHeader>();
    1016            0 :                                 msgs.emplace_back(new TBufferFile(TBuffer::kRead, header->data_length, dataFrag.dataBegin(), kFALSE, nullptr));
    1017              :                         }
    1018            0 :                 if (eventMap->fragments.count(artdaq::Fragment::DataFragmentType))
    1019            0 :                         for (auto& dataFrag : *(eventMap->fragments[artdaq::Fragment::DataFragmentType]))
    1020              :                         {
    1021            0 :                                 if (!dataFrag.hasMetadata()) continue;
    1022            0 :                                 auto header = dataFrag.metadata<artdaq::NetMonHeader>();
    1023            0 :                                 msgs.emplace_back(new TBufferFile(TBuffer::kRead, header->data_length, dataFrag.dataBegin(), kFALSE, nullptr));
    1024              :                         }
    1025              : 
    1026              :                 //
    1027              :                 //  Read message type code.
    1028              :                 //
    1029            0 :                 artdaq::NetMonHeader::MessageType msg_type_code = artdaq::NetMonHeader::MessageType::Invalid;
    1030            0 :                 ULong_t msg_type_code_tmp = 0;
    1031            0 :                 for (auto& msg : msgs)
    1032              :                 {
    1033            0 :                         TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "ArtdaqInputHelper::readNext: "
    1034            0 :                                                                    << "getting message type code ...";
    1035            0 :                         msg->ReadULong(msg_type_code_tmp);
    1036            0 :                         TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "ArtdaqInputHelper::readNext: "
    1037            0 :                                                                    << "message type: " << msg_type_code_tmp;
    1038              : 
    1039            0 :                         if (msg_type_code == artdaq::NetMonHeader::MessageType::Invalid)
    1040            0 :                                 msg_type_code = static_cast<artdaq::NetMonHeader::MessageType>(msg_type_code_tmp);
    1041            0 :                         else if (msg_type_code != static_cast<artdaq::NetMonHeader::MessageType>(msg_type_code_tmp))
    1042              :                         {
    1043            0 :                                 TLOG(TLVL_ERROR, "ArtdaqInputHelper") << "ArtdaqInputHelper::readNext: Received conflicting message type codes! Aborting...";
    1044              : 
    1045            0 :                                 shutdownMsgReceived_ = true;
    1046            0 :                                 TLOG(TLVL_DEBUG + 45, "ArtdaqInputHelper") << "End:   ArtdaqInputHelper::readNext";
    1047            0 :                                 return false;
    1048              :                         }
    1049              :                 }
    1050              : 
    1051            0 :                 for (auto& msg : msgs)
    1052              :                 {
    1053            0 :                         readAndConstructPrincipal(msg, msg_type_code, inR, inSR, outR, outSR, outE);
    1054              :                 }
    1055              :                 //
    1056              :                 //  Read per-event metadata needed to construct principal.
    1057              :                 //
    1058            0 :                 if (msg_type_code == artdaq::NetMonHeader::MessageType::Run)
    1059              :                 {
    1060              :                         // EndRun message.
    1061              :                         // FIXME: We need to merge these into the input RunPrincipal.
    1062            0 :                         readDataProducts(msgs, outR ? outR : inR);
    1063              :                 }
    1064            0 :                 else if (msg_type_code == artdaq::NetMonHeader::MessageType::Subrun)
    1065              :                 {
    1066              :                         // FIXME: We need to merge these into the input SubRunPrincipal.
    1067            0 :                         readDataProducts(msgs, outSR ? outSR : inSR);
    1068              :                 }
    1069            0 :                 else if (msg_type_code == artdaq::NetMonHeader::MessageType::Event)
    1070              :                 {
    1071              :                         // Event message.
    1072            0 :                         readDataProducts(msgs, outE);
    1073              : 
    1074            0 :                         if (eventMap->fragments.size() > 1)
    1075              :                         {
    1076            0 :                                 readFragments(eventMap->fragments, outR ? outR : inR, outSR ? outSR : inSR, outE);
    1077              :                         }
    1078              :                 }
    1079              :                 else
    1080              :                 {
    1081              :                         // Did not have a valid message, try again
    1082            0 :                         return false;
    1083              :                 }
    1084            0 :         }
    1085              : 
    1086            0 :         if (outE != nullptr)
    1087              :         {
    1088            0 :                 auto artHdrPtr = std::make_unique<artdaq::detail::RawEventHeader>();
    1089            0 :                 auto daqHdrPtr = eventMap->header;
    1090              : 
    1091            0 :                 if (daqHdrPtr != nullptr)
    1092              :                 {
    1093            0 :                         memcpy(artHdrPtr.get(), daqHdrPtr.get(), sizeof(artdaq::detail::RawEventHeader));
    1094            0 :                         put_product_in_principal(std::move(artHdrPtr), *outE, pretend_module_name, "RawEventHeader");
    1095              :                 }
    1096            0 :         }
    1097              : 
    1098            0 :         auto read_finish_time = std::chrono::steady_clock::now();
    1099            0 :         TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "readNext: bytesRead=" << bytesRead
    1100            0 :                                                    << " metricMan=" << static_cast<void*>(metricMan.get());
    1101            0 :         if (metricMan)
    1102              :         {
    1103            0 :                 metricMan->sendMetric("Avg Processing Time", artdaq::TimeUtils::GetElapsedTime(last_read_time, read_start_time),
    1104              :                                       "s", 2, artdaq::MetricMode::Average);
    1105            0 :                 metricMan->sendMetric("Avg Input Wait Time", artdaq::TimeUtils::GetElapsedTime(read_start_time, got_event_time),
    1106              :                                       "s", 3, artdaq::MetricMode::Average);
    1107            0 :                 metricMan->sendMetric("Avg Read Time", artdaq::TimeUtils::GetElapsedTime(got_event_time, read_finish_time), "s",
    1108              :                                       3, artdaq::MetricMode::Average);
    1109              :         }
    1110              : 
    1111            0 :         TLOG(TLVL_DEBUG + 48, "ArtdaqInputHelper") << "End:   ArtdaqInputHelper::readNext ret=" << std::boolalpha << (outR || outSR || outE);
    1112            0 :         last_read_time = std::chrono::steady_clock::now();
    1113            0 :         return outR || outSR || outE;
    1114            0 : }
    1115              : 
    1116              : #endif  // ARTDAQ_ARTDAQ_ARTMODULES_ARTDAQINPUTHELPER_HH_
        

Generated by: LCOV version 2.0-1